This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch fix-pipe-ref-issue in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1b588fb9efa54aafd7ddae3edfbc5dae5c9f86fa Author: Steve Yurong Su <[email protected]> AuthorDate: Fri Aug 4 17:59:31 2023 +0800 Pipe: referrence count is not correctly increase / decrease --- .../PipeRealtimeDataRegionHybridExtractor.java | 57 ++++++++++++++-------- .../PipeRealtimeDataRegionLogExtractor.java | 23 +++++---- .../PipeRealtimeDataRegionTsFileExtractor.java | 25 ++++++---- 3 files changed, 67 insertions(+), 38 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java index ffefb13daf0..026a3334d54 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java @@ -73,23 +73,35 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio private void extractTabletInsertion(PipeRealtimeEvent event) { if (isApproachingCapacity()) { - event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE); // if the pending queue is approaching capacity, we should not extract any more tablet events. // all the data represented by the tablet events should be carried by the following tsfile // event. + event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE); + LOGGER.info( + "extractTabletInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor " + + "is approaching capacity, discard tablet event {}, change state of tsfile epoch to {}", + event, + event.getTsFileEpoch().getState(this)); + + // Ignore the tablet event. + event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()); return; } if (!event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE) && !pendingQueue.waitedOffer(event)) { - LOGGER.warn( - "extractTabletInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor {} " - + "has reached capacity, discard tablet event {}, current state {}", - this, - event, - event.getTsFileEpoch().getState(this)); // this would not happen, but just in case. - // UnboundedBlockingPendingQueue is unbounded, so it should never reach capacity. + // pendingQueue is unbounded, so it should never reach capacity. + final String errorMessage = + String.format( + "extractTabletInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor %s " + + "has reached capacity, discard tablet event %s, current state %s", + this, event, event.getTsFileEpoch().getState(this)); + LOGGER.error(errorMessage); + PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage)); + + // Ignore the tablet event. + event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()); } } @@ -102,14 +114,18 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio state.equals(TsFileEpoch.State.EMPTY) ? TsFileEpoch.State.USING_TSFILE : state); if (!pendingQueue.waitedOffer(event)) { - LOGGER.warn( - "extractTsFileInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor {} " - + "has reached capacity, discard TsFile event {}, current state {}", - this, - event, - event.getTsFileEpoch().getState(this)); // this would not happen, but just in case. - // ListenableUnblockingPendingQueue is unbounded, so it should never reach capacity. + // pendingQueue is unbounded, so it should never reach capacity. + final String errorMessage = + String.format( + "extractTsFileInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor %s " + + "has reached capacity, discard TsFile event %s, current state %s", + this, event, event.getTsFileEpoch().getState(this)); + LOGGER.error(errorMessage); + PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage)); + + // Ignore the tsfile event. + event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()); } } @@ -138,8 +154,8 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio eventToSupply.getClass(), this)); } - realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()); if (suppliedEvent != null) { + realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()); return suppliedEvent; } @@ -166,7 +182,10 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio // this event is not reliable anymore. but the data represented by this event // has been carried by the following tsfile event, so we can just discard this event. event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE); - LOGGER.warn("Increase reference count for event {} error.", event); + LOGGER.warn( + "Discard tablet event {} because it is not reliable anymore. " + + "Change the state of TsFileEpoch to USING_TSFILE.", + event); return null; } } @@ -182,7 +201,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio state -> { // this would not happen, but just in case. if (state.equals(TsFileEpoch.State.EMPTY)) { - LOGGER.warn( + LOGGER.error( String.format("EMPTY TsFileEpoch when supplying TsFile Event %s", event)); return TsFileEpoch.State.USING_TSFILE; } @@ -202,7 +221,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio + "the reference count can not be increased, " + "the data represented by this event is lost", event.getEvent()); - LOGGER.warn(errorMessage); + LOGGER.error(errorMessage); PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage)); return null; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java index 74ff0c533b1..17891add76d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java @@ -48,18 +48,23 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TABLET); if (!(event.getEvent() instanceof TabletInsertionEvent)) { + event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName()); return; } if (!pendingQueue.waitedOffer(event)) { - LOGGER.warn( - "extract: pending queue of PipeRealtimeDataRegionLogExtractor {} " - + "has reached capacity, discard tablet event {}, current state {}", - this, - event, - event.getTsFileEpoch().getState(this)); // this would not happen, but just in case. - // ListenableUnblockingPendingQueue is unbounded, so it should never reach capacity. + // pendingQueue is unbounded, so it should never reach capacity. + final String errorMessage = + String.format( + "extract: pending queue of PipeRealtimeDataRegionLogExtractor %s " + + "has reached capacity, discard tablet event %s, current state %s", + this, event, event.getTsFileEpoch().getState(this)); + LOGGER.error(errorMessage); + PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage)); + + // ignore this event. + event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName()); } } @@ -93,12 +98,12 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx + "the reference count can not be increased, " + "the data represented by this event is lost", realtimeEvent.getEvent()); - LOGGER.warn(errorMessage); + LOGGER.error(errorMessage); PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage)); } - realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName()); if (suppliedEvent != null) { + realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName()); return suppliedEvent; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java index bab1ea2ec46..5e97a338df1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java @@ -48,18 +48,23 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE); if (!(event.getEvent() instanceof TsFileInsertionEvent)) { + event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName()); return; } if (!pendingQueue.waitedOffer(event)) { - LOGGER.warn( - "extract: pending queue of PipeRealtimeDataRegionTsFileExtractor {} " - + "has reached capacity, discard TsFile event {}, current state {}", - this, - event, - event.getTsFileEpoch().getState(this)); - // this would not happen, but just in case. - // ListenableUnblockingPendingQueue is unbounded, so it should never reach capacity. + // This would not happen, but just in case. + // Pending is unbounded, so it should never reach capacity. + final String errorMessage = + String.format( + "extract: pending queue of PipeRealtimeDataRegionTsFileExtractor %s " + + "has reached capacity, discard TsFile event %s, current state %s", + this, event, event.getTsFileEpoch().getState(this)); + LOGGER.error(errorMessage); + PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage)); + + // Ignore the event. + event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName()); } } @@ -93,12 +98,12 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio + "the reference count can not be increased, " + "the data represented by this event is lost", realtimeEvent.getEvent()); - LOGGER.warn(errorMessage); + LOGGER.error(errorMessage); PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage)); } - realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName()); if (suppliedEvent != null) { + realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName()); return suppliedEvent; }
