This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch fix-pipe-cause-wal-pin in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8f581253ca17d3327d982198444f2e98d8956d78 Author: Steve Yurong Su <[email protected]> AuthorDate: Thu Sep 28 17:50:35 2023 +0800 NEW HYBRID MODE --- .../PipeRealtimeDataRegionHybridExtractor.java | 96 ++++++++-------------- 1 file changed, 32 insertions(+), 64 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java index e4dbf60f56c..4473773a802 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java @@ -81,39 +81,27 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio // size of wal buffer), the write operation will be throttled, so we should not extract any // more tablet events. // 3. The number of tsfile events in the pending queue has exceeded the limit. - event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE); + event + .getTsFileEpoch() + .migrateState( + this, + state -> + state.equals(TsFileEpoch.State.EMPTY) ? TsFileEpoch.State.USING_TSFILE : state); } - final TsFileEpoch.State state = event.getTsFileEpoch().getState(this); - switch (state) { - case USING_TSFILE: - // Ignore the tablet event. - event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName(), false); - break; - case EMPTY: - case USING_TABLET: - if (!pendingQueue.waitedOffer(event)) { - // this would not happen, but just in case. - // pendingQueue is unbounded, so it should never reach capacity. - final String errorMessage = - String.format( - "extractTabletInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor %s " - + "has reached capacity, discard tablet event %s, current state %s", - this, event, event.getTsFileEpoch().getState(this)); - LOGGER.error(errorMessage); - PipeAgent.runtime() - .report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage)); - - // Ignore the tablet event. - event.decreaseReferenceCount( - PipeRealtimeDataRegionHybridExtractor.class.getName(), false); - } - break; - default: - throw new UnsupportedOperationException( - String.format( - "Unsupported state %s for hybrid realtime extractor %s", - state, PipeRealtimeDataRegionHybridExtractor.class.getName())); + if (!pendingQueue.waitedOffer(event)) { + // this would not happen, but just in case. + // pendingQueue is unbounded, so it should never reach capacity. + final String errorMessage = + String.format( + "extractTabletInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor %s " + + "has reached capacity, discard tablet event %s, current state %s", + this, event, event.getTsFileEpoch().getState(this)); + LOGGER.error(errorMessage); + PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage)); + + // Ignore the tablet event. + event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName(), false); } } @@ -125,39 +113,19 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio state -> state.equals(TsFileEpoch.State.EMPTY) ? TsFileEpoch.State.USING_TSFILE : state); - final TsFileEpoch.State state = event.getTsFileEpoch().getState(this); - switch (state) { - case EMPTY: - case USING_TSFILE: - if (!pendingQueue.waitedOffer(event)) { - // this would not happen, but just in case. - // pendingQueue is unbounded, so it should never reach capacity. - final String errorMessage = - String.format( - "extractTsFileInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor %s " - + "has reached capacity, discard TsFile event %s, current state %s", - this, event, event.getTsFileEpoch().getState(this)); - LOGGER.error(errorMessage); - PipeAgent.runtime() - .report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage)); - - // Ignore the tsfile event. - event.decreaseReferenceCount( - PipeRealtimeDataRegionHybridExtractor.class.getName(), false); - } - break; - case USING_TABLET: - // All the tablet events have been extracted, so we can ignore the tsFile event. - // Report this event for SimpleProgressIndex, which does not have progressIndex for wal. - // This report won't affect IoTProgressIndex since the previous wal events have been - // successfully transferred here. - event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName(), true); - break; - default: - throw new UnsupportedOperationException( - String.format( - "Unsupported state %s for hybrid realtime extractor %s", - state, PipeRealtimeDataRegionHybridExtractor.class.getName())); + if (!pendingQueue.waitedOffer(event)) { + // this would not happen, but just in case. + // pendingQueue is unbounded, so it should never reach capacity. + final String errorMessage = + String.format( + "extractTsFileInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor %s " + + "has reached capacity, discard TsFile event %s, current state %s", + this, event, event.getTsFileEpoch().getState(this)); + LOGGER.error(errorMessage); + PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage)); + + // Ignore the tsfile event. + event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName(), false); } }
