This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/2.0.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit adef295839ab38c1c10705d7049ebeb01a05417d Author: Caideyipi <[email protected]> AuthorDate: Fri Jul 25 17:03:03 2025 +0800 Pipe: Simplified the hybrid down-grading logic (#16033) * simplify * comp (cherry picked from commit 2a8192a428da3516e44078566f750cdf52f88ff5) --- .../PipeRealtimeDataRegionHybridExtractor.java | 231 ++++----------------- .../apache/iotdb/commons/conf/CommonConfig.java | 49 ----- .../iotdb/commons/pipe/config/PipeConfig.java | 22 -- .../iotdb/commons/pipe/config/PipeDescriptor.java | 16 -- 4 files changed, 40 insertions(+), 278 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java index f1ddff90c67..516ab4d05ef 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java @@ -20,19 +20,16 @@ package org.apache.iotdb.db.pipe.extractor.dataregion.realtime; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException; -import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.event.ProgressReportEvent; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; -import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor; import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper; import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch; import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeOperator; import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; -import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; @@ -41,7 +38,6 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Objects; import java.util.Optional; public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegionExtractor { @@ -82,7 +78,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio private void extractTabletInsertion(final PipeRealtimeEvent event) { TsFileEpoch.State state; - if (canNotUseTabletAnyMore(event)) { + if (canNotUseTabletAnymore(event)) { event.getTsFileEpoch().migrateState(this, curState -> TsFileEpoch.State.USING_TSFILE); PipeTsFileEpochProgressIndexKeeper.getInstance() .registerProgressIndex(dataRegionId, pipeName, event.getTsFileEpoch().getResource()); @@ -162,7 +158,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio return TsFileEpoch.State.USING_TSFILE; case USING_BOTH: default: - return canNotUseTabletAnyMore(event) + return canNotUseTabletAnymore(event) ? TsFileEpoch.State.USING_TSFILE : TsFileEpoch.State.USING_BOTH; } @@ -171,9 +167,10 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio final TsFileEpoch.State state = event.getTsFileEpoch().getState(this); switch (state) { case USING_TABLET: - // Though the data in tsfile event has been extracted in tablet mode, we still need to - // extract the tsfile event to help to determine isTsFileEventCountInQueueExceededLimit(). - // The extracted tsfile event will be discarded in supplyTsFileInsertion. + // If the state is USING_TABLET, discard the event + PipeTsFileEpochProgressIndexKeeper.getInstance() + .eliminateProgressIndex(dataRegionId, pipeName, event.getTsFileEpoch().getFilePath()); + return; case EMPTY: case USING_TSFILE: case USING_BOTH: @@ -202,17 +199,9 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio } } - private boolean canNotUseTabletAnyMore(final PipeRealtimeEvent event) { - // In the following 4 cases, we should not extract this tablet event. all the data - // represented by the tablet event should be carried by the following tsfile event: - // the write operation will be throttled, so we should not extract any more tablet events. - // 1. The shallow memory usage of the insert node has reached the dangerous threshold. - // 2. Deprecated logics (unused by default) - return mayInsertNodeMemoryReachDangerousThreshold(event) - || canNotUseTabletAnymoreDeprecated(event); - } - - private boolean mayInsertNodeMemoryReachDangerousThreshold(final PipeRealtimeEvent event) { + // If the insertNode's memory has reached the dangerous threshold, we should not extract any + // tablets. + private boolean canNotUseTabletAnymore(final PipeRealtimeEvent event) { final long floatingMemoryUsageInByte = PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(pipeName); final long pipeCount = PipeDataNodeAgent.task().getPipeCount(); @@ -224,7 +213,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio final PipeDataNodeRemainingEventAndTimeOperator operator = PipeDataNodeSinglePipeMetrics.getInstance().remainingEventAndTimeOperatorMap.get(pipeID); LOGGER.info( - "Pipe task {}@{} canNotUseTabletAnyMore(1) for tsFile {}: The memory usage of the insert node {} has reached the dangerous threshold of single pipe {}, event count: {}", + "Pipe task {}@{} canNotUseTabletAnyMore for tsFile {}: The memory usage of the insert node {} has reached the dangerous threshold of single pipe {}, event count: {}", pipeName, dataRegionId, event.getTsFileEpoch().getFilePath(), @@ -237,83 +226,6 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio return mayInsertNodeMemoryReachDangerousThreshold; } - /** - * These judgements are deprecated, and are only reserved for manual operation and compatibility. - */ - @Deprecated - private boolean canNotUseTabletAnymoreDeprecated(final PipeRealtimeEvent event) { - // In the following 5 cases, we should not extract any more tablet events. all the data - // represented by the tablet events should be carried by the following tsfile event: - // 1. The number of historical tsFile events to transfer has exceeded the limit. - // 2. The number of realtime tsfile events to transfer has exceeded the limit. - // 3. The number of linked tsFiles has reached the dangerous threshold. - return isHistoricalTsFileEventCountExceededLimit(event) - || isRealtimeTsFileEventCountExceededLimit(event) - || mayTsFileLinkedCountReachDangerousThreshold(event); - } - - private boolean isHistoricalTsFileEventCountExceededLimit(final PipeRealtimeEvent event) { - if (PipeConfig.getInstance().getPipeMaxAllowedHistoricalTsFilePerDataRegion() - == Integer.MAX_VALUE) { - return false; - } - final IoTDBDataRegionExtractor extractor = - PipeDataRegionExtractorMetrics.getInstance().getExtractorMap().get(getTaskID()); - final boolean isHistoricalTsFileEventCountExceededLimit = - Objects.nonNull(extractor) - && extractor.getHistoricalTsFileInsertionEventCount() - >= PipeConfig.getInstance().getPipeMaxAllowedHistoricalTsFilePerDataRegion(); - if (isHistoricalTsFileEventCountExceededLimit && event.mayExtractorUseTablets(this)) { - LOGGER.info( - "Pipe task {}@{} canNotUseTabletAnymoreDeprecated(1) for tsFile {}: The number of historical tsFile events {} has exceeded the limit {}", - pipeName, - dataRegionId, - event.getTsFileEpoch().getFilePath(), - extractor.getHistoricalTsFileInsertionEventCount(), - PipeConfig.getInstance().getPipeMaxAllowedHistoricalTsFilePerDataRegion()); - } - return isHistoricalTsFileEventCountExceededLimit; - } - - private boolean isRealtimeTsFileEventCountExceededLimit(final PipeRealtimeEvent event) { - if (PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion() - == Integer.MAX_VALUE) { - return false; - } - final boolean isRealtimeTsFileEventCountExceededLimit = - pendingQueue.getTsFileInsertionEventCount() - >= PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion(); - if (isRealtimeTsFileEventCountExceededLimit && event.mayExtractorUseTablets(this)) { - LOGGER.info( - "Pipe task {}@{} canNotUseTabletAnymoreDeprecated(2) for tsFile {}: The number of realtime tsFile events {} has exceeded the limit {}", - pipeName, - dataRegionId, - event.getTsFileEpoch().getFilePath(), - pendingQueue.getTsFileInsertionEventCount(), - PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion()); - } - return isRealtimeTsFileEventCountExceededLimit; - } - - private boolean mayTsFileLinkedCountReachDangerousThreshold(final PipeRealtimeEvent event) { - if (PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount() == Long.MAX_VALUE) { - return false; - } - final boolean mayTsFileLinkedCountReachDangerousThreshold = - PipeDataNodeResourceManager.tsfile().getLinkedTsFileCount(pipeName) - >= PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount(); - if (mayTsFileLinkedCountReachDangerousThreshold && event.mayExtractorUseTablets(this)) { - LOGGER.info( - "Pipe task {}@{} canNotUseTabletAnymoreDeprecated(3) for tsFile {}: The number of linked tsFiles {} has reached the dangerous threshold {}", - pipeName, - dataRegionId, - event.getTsFileEpoch().getFilePath(), - PipeDataNodeResourceManager.tsfile().getLinkedTsFileCount(pipeName), - PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount()); - } - return mayTsFileLinkedCountReachDangerousThreshold; - } - @Override public Event supply() { PipeRealtimeEvent realtimeEvent = (PipeRealtimeEvent) pendingQueue.directPoll(); @@ -355,103 +267,40 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio } private Event supplyTabletInsertion(final PipeRealtimeEvent event) { - event - .getTsFileEpoch() - .migrateState( - this, - state -> { - switch (state) { - case EMPTY: - return canNotUseTabletAnyMore(event) - ? TsFileEpoch.State.USING_TSFILE - : TsFileEpoch.State.USING_TABLET; - case USING_TSFILE: - return canNotUseTabletAnyMore(event) - ? TsFileEpoch.State.USING_TSFILE - : TsFileEpoch.State.USING_BOTH; - case USING_TABLET: - case USING_BOTH: - default: - return state; - } - }); - - final TsFileEpoch.State state = event.getTsFileEpoch().getState(this); - if (state == TsFileEpoch.State.USING_TSFILE) { - PipeTsFileEpochProgressIndexKeeper.getInstance() - .registerProgressIndex(dataRegionId, pipeName, event.getTsFileEpoch().getResource()); - } - - switch (state) { - case USING_TSFILE: - // If the state is USING_TSFILE, discard the event and poll the next one. - return null; - case EMPTY: - case USING_TABLET: - case USING_BOTH: - default: - if (event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName())) { - return event.getEvent(); - } else { - // If the event's reference count can not be increased, it means the data represented by - // this event is not reliable anymore. but the data represented by this event - // has been carried by the following tsfile event, so we can just discard this event. - event.getTsFileEpoch().migrateState(this, s -> TsFileEpoch.State.USING_BOTH); - LOGGER.warn( - "Discard tablet event {} because it is not reliable anymore. " - + "Change the state of TsFileEpoch to USING_TSFILE.", - event); - return null; - } + if (event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName())) { + return event.getEvent(); + } else { + // If the event's reference count can not be increased, it means the data represented by + // this event is not reliable anymore. but the data represented by this event + // has been carried by the following tsfile event, so we can just discard this event. + event.getTsFileEpoch().migrateState(this, s -> TsFileEpoch.State.USING_BOTH); + LOGGER.warn( + "Discard tablet event {} because it is not reliable anymore. " + + "Change the state of TsFileEpoch to USING_BOTH.", + event); + return null; } } private Event supplyTsFileInsertion(final PipeRealtimeEvent event) { - event - .getTsFileEpoch() - .migrateState( - this, - state -> { - // This would not happen, but just in case. - if (state.equals(TsFileEpoch.State.EMPTY)) { - LOGGER.error( - String.format("EMPTY TsFileEpoch when supplying TsFile Event %s", event)); - return TsFileEpoch.State.USING_TSFILE; - } - return state; - }); - - final TsFileEpoch.State state = event.getTsFileEpoch().getState(this); - switch (state) { - case USING_TABLET: - // If the state is USING_TABLET, discard the event and poll the next one. - PipeTsFileEpochProgressIndexKeeper.getInstance() - .eliminateProgressIndex(dataRegionId, pipeName, event.getTsFileEpoch().getFilePath()); - return null; - case EMPTY: - case USING_TSFILE: - case USING_BOTH: - default: - if (event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName())) { - return event.getEvent(); - } else { - // If the event's reference count can not be increased, it means the data represented by - // this event is not reliable anymore. the data has been lost. we simply discard this - // event - // and report the exception to PipeRuntimeAgent. - final String errorMessage = - String.format( - "TsFile Event %s can not be supplied because " - + "the reference count can not be increased, " - + "the data represented by this event is lost", - event.getEvent()); - LOGGER.error(errorMessage); - PipeDataNodeAgent.runtime() - .report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage)); - PipeTsFileEpochProgressIndexKeeper.getInstance() - .eliminateProgressIndex(dataRegionId, pipeName, event.getTsFileEpoch().getFilePath()); - return null; - } + if (event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName())) { + return event.getEvent(); + } else { + // If the event's reference count can not be increased, it means the data represented by + // this event is not reliable anymore. the data has been lost. we simply discard this + // event and report the exception to PipeRuntimeAgent. + final String errorMessage = + String.format( + "TsFile Event %s can not be supplied because " + + "the reference count can not be increased, " + + "the data represented by this event is lost", + event.getEvent()); + LOGGER.error(errorMessage); + PipeDataNodeAgent.runtime() + .report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage)); + PipeTsFileEpochProgressIndexKeeper.getInstance() + .eliminateProgressIndex(dataRegionId, pipeName, event.getTsFileEpoch().getFilePath()); + return null; } } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index a2bd1ee0f28..f9697bd585f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -295,10 +295,6 @@ public class CommonConfig { private int pipeReceiverReqDecompressedMaxLengthInBytes = 1073741824; // 1GB - private int pipeMaxAllowedHistoricalTsFilePerDataRegion = Integer.MAX_VALUE; // Deprecated - private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = Integer.MAX_VALUE; // Deprecated - private long pipeMaxAllowedLinkedTsFileCount = Long.MAX_VALUE; // Deprecated - private double pipeMetaReportMaxLogNumPerRound = 0.1; private int pipeMetaReportMaxLogIntervalRounds = 360; private int pipeTsFilePinMaxLogNumPerRound = 10; @@ -1499,51 +1495,6 @@ public class CommonConfig { return pipeReceiverReqDecompressedMaxLengthInBytes; } - public int getPipeMaxAllowedHistoricalTsFilePerDataRegion() { - return pipeMaxAllowedHistoricalTsFilePerDataRegion; - } - - public void setPipeMaxAllowedHistoricalTsFilePerDataRegion( - int pipeMaxAllowedPendingTsFileEpochPerDataRegion) { - if (this.pipeMaxAllowedHistoricalTsFilePerDataRegion - == pipeMaxAllowedPendingTsFileEpochPerDataRegion) { - return; - } - this.pipeMaxAllowedHistoricalTsFilePerDataRegion = - pipeMaxAllowedPendingTsFileEpochPerDataRegion; - logger.info( - "pipeMaxAllowedHistoricalTsFilePerDataRegion is set to {}", - pipeMaxAllowedPendingTsFileEpochPerDataRegion); - } - - public int getPipeMaxAllowedPendingTsFileEpochPerDataRegion() { - return pipeMaxAllowedPendingTsFileEpochPerDataRegion; - } - - public void setPipeMaxAllowedPendingTsFileEpochPerDataRegion( - int pipeExtractorPendingQueueTsfileLimit) { - if (this.pipeMaxAllowedPendingTsFileEpochPerDataRegion - == pipeExtractorPendingQueueTsfileLimit) { - return; - } - this.pipeMaxAllowedPendingTsFileEpochPerDataRegion = pipeExtractorPendingQueueTsfileLimit; - logger.info( - "pipeMaxAllowedPendingTsFileEpochPerDataRegion is set to {}.", - pipeMaxAllowedPendingTsFileEpochPerDataRegion); - } - - public long getPipeMaxAllowedLinkedTsFileCount() { - return pipeMaxAllowedLinkedTsFileCount; - } - - public void setPipeMaxAllowedLinkedTsFileCount(long pipeMaxAllowedLinkedTsFileCount) { - if (this.pipeMaxAllowedLinkedTsFileCount == pipeMaxAllowedLinkedTsFileCount) { - return; - } - this.pipeMaxAllowedLinkedTsFileCount = pipeMaxAllowedLinkedTsFileCount; - logger.info("pipeMaxAllowedLinkedTsFileCount is set to {}", pipeMaxAllowedLinkedTsFileCount); - } - public double getPipeMetaReportMaxLogNumPerRound() { return pipeMetaReportMaxLogNumPerRound; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index ca99d29a1d0..770e0e959b2 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -343,20 +343,6 @@ public class PipeConfig { return COMMON_CONFIG.getPipeReceiverReqDecompressedMaxLengthInBytes(); } - /////////////////////////////// Hybrid Mode /////////////////////////////// - - public int getPipeMaxAllowedHistoricalTsFilePerDataRegion() { - return COMMON_CONFIG.getPipeMaxAllowedHistoricalTsFilePerDataRegion(); - } - - public int getPipeMaxAllowedPendingTsFileEpochPerDataRegion() { - return COMMON_CONFIG.getPipeMaxAllowedPendingTsFileEpochPerDataRegion(); - } - - public long getPipeMaxAllowedLinkedTsFileCount() { - return COMMON_CONFIG.getPipeMaxAllowedLinkedTsFileCount(); - } - /////////////////////////////// Logger /////////////////////////////// public double getPipeMetaReportMaxLogNumPerRound() { @@ -588,14 +574,6 @@ public class PipeConfig { "PipeReceiverReqDecompressedMaxLengthInBytes: {}", getPipeReceiverReqDecompressedMaxLengthInBytes()); - LOGGER.info( - "PipeMaxAllowedHistoricalTsFilePerDataRegion: {}", - getPipeMaxAllowedHistoricalTsFilePerDataRegion()); - LOGGER.info( - "PipeMaxAllowedPendingTsFileEpochPerDataRegion: {}", - getPipeMaxAllowedPendingTsFileEpochPerDataRegion()); - LOGGER.info("PipeMaxAllowedLinkedTsFileCount: {}", getPipeMaxAllowedLinkedTsFileCount()); - LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}", getPipeMetaReportMaxLogNumPerRound()); LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}", getPipeMetaReportMaxLogIntervalRounds()); LOGGER.info("PipeTsFilePinMaxLogNumPerRound: {}", getPipeTsFilePinMaxLogNumPerRound()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java index 3e31864e3d8..7e27838618b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java @@ -433,22 +433,6 @@ public class PipeDescriptor { "pipe_receiver_req_decompressed_max_length_in_bytes", String.valueOf(config.getPipeReceiverReqDecompressedMaxLengthInBytes())))); - config.setPipeMaxAllowedHistoricalTsFilePerDataRegion( - Integer.parseInt( - properties.getProperty( - "pipe_max_allowed_historical_tsfile_per_data_region", - String.valueOf(config.getPipeMaxAllowedHistoricalTsFilePerDataRegion())))); - config.setPipeMaxAllowedPendingTsFileEpochPerDataRegion( - Integer.parseInt( - properties.getProperty( - "pipe_max_allowed_pending_tsfile_epoch_per_data_region", - String.valueOf(config.getPipeMaxAllowedPendingTsFileEpochPerDataRegion())))); - config.setPipeMaxAllowedLinkedTsFileCount( - Long.parseLong( - properties.getProperty( - "pipe_max_allowed_linked_tsfile_count", - String.valueOf(config.getPipeMaxAllowedLinkedTsFileCount())))); - config.setPipeMemoryAllocateMaxRetries( Integer.parseInt( properties.getProperty(
