This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/2.0.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a6547154f3b04bca19ad75324a617cbd13e9f888 Author: Caideyipi <[email protected]> AuthorDate: Wed Jul 23 18:58:38 2025 +0800 Pipe: Optimized the degrading logger & Deleted useless UT & Copied some historical filter logic from dev/1.3 (#16019) * logger * fix-ut * Update PipeHistoricalDataRegionTsFileAndDeletionExtractor.java (cherry picked from commit 08ab2c9dcde2748165660a3308d058f63be1b80f) --- ...oricalDataRegionTsFileAndDeletionExtractor.java | 22 +++------ .../PipeRealtimeDataRegionHybridExtractor.java | 14 ++++-- .../PipeDataNodeRemainingEventAndTimeOperator.java | 4 ++ .../iotdb/db/pipe/connector/PipeConnectorTest.java | 54 ---------------------- 4 files changed, 22 insertions(+), 72 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java index 7b765c143a2..b9d536db936 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java @@ -51,6 +51,7 @@ import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; +import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.utils.DateTimeUtils; @@ -387,21 +388,6 @@ public class PipeHistoricalDataRegionTsFileAndDeletionExtractor } } - private void flushDataRegionAllTsFiles() { - final DataRegion dataRegion = - StorageEngine.getInstance().getDataRegion(new DataRegionId(dataRegionId)); - if (Objects.isNull(dataRegion)) { - return; - } - - dataRegion.writeLock("Pipe: create historical TsFile extractor"); - try { - dataRegion.syncCloseAllWorkingTsFileProcessors(); - } finally { - dataRegion.writeUnlock(); - } - } - /** * IoTV2 will only resend event that contains un-replicated local write data. So we only extract * ProgressIndex containing local writes for comparison to prevent misjudgment on whether @@ -563,6 +549,9 @@ public class PipeHistoricalDataRegionTsFileAndDeletionExtractor // Some resource may not be closed due to the control of // PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore them. !resource.isClosed() + && Optional.ofNullable(resource.getProcessor()) + .map(TsFileProcessor::alreadyMarkedClosing) + .orElse(true) || mayTsFileContainUnprocessedData(resource) && isTsFileResourceOverlappedWithTimeRange(resource) && mayTsFileResourceOverlappedWithPattern(resource))) @@ -585,6 +574,9 @@ public class PipeHistoricalDataRegionTsFileAndDeletionExtractor // Some resource may not be closed due to the control of // PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore them. !resource.isClosed() + && Optional.ofNullable(resource.getProcessor()) + .map(TsFileProcessor::alreadyMarkedClosing) + .orElse(true) || mayTsFileContainUnprocessedData(resource) && isTsFileResourceOverlappedWithTimeRange(resource) && mayTsFileResourceOverlappedWithPattern(resource))) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java index 8ee2f7a15a1..f1ddff90c67 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java @@ -30,6 +30,8 @@ import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor; import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper; import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch; +import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeOperator; +import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.pipe.api.event.Event; @@ -40,6 +42,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Objects; +import java.util.Optional; public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegionExtractor { @@ -218,13 +221,18 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio final boolean mayInsertNodeMemoryReachDangerousThreshold = floatingMemoryUsageInByte * pipeCount >= totalFloatingMemorySizeInBytes; if (mayInsertNodeMemoryReachDangerousThreshold && event.mayExtractorUseTablets(this)) { + final PipeDataNodeRemainingEventAndTimeOperator operator = + PipeDataNodeSinglePipeMetrics.getInstance().remainingEventAndTimeOperatorMap.get(pipeID); LOGGER.info( - "Pipe task {}@{} canNotUseTabletAnyMore(1) for tsFile {}: The memory usage of the insert node {} has reached the dangerous threshold {}", + "Pipe task {}@{} canNotUseTabletAnyMore(1) for tsFile {}: The memory usage of the insert node {} has reached the dangerous threshold of single pipe {}, event count: {}", pipeName, dataRegionId, event.getTsFileEpoch().getFilePath(), - floatingMemoryUsageInByte * pipeCount, - totalFloatingMemorySizeInBytes); + floatingMemoryUsageInByte, + totalFloatingMemorySizeInBytes / pipeCount, + Optional.ofNullable(operator) + .map(PipeDataNodeRemainingEventAndTimeOperator::getInsertNodeEventCount) + .orElse(0)); } return mayInsertNodeMemoryReachDangerousThreshold; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java index 0308e9b5b63..996b71cfc3d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java @@ -116,6 +116,10 @@ public class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOper return remainingEvents >= 0 ? remainingEvents : 0; } + public int getInsertNodeEventCount() { + return insertNodeEventCount.get(); + } + long getRemainingEvents() { final long remainingEvents = tsfileEventCount.get() diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeConnectorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeConnectorTest.java index f06c5de1168..5d9e444da24 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeConnectorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeConnectorTest.java @@ -26,7 +26,6 @@ import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionA import org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBDataRegionSyncConnector; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; -import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException; import org.junit.Assert; import org.junit.Test; @@ -35,24 +34,6 @@ import java.util.HashMap; public class PipeConnectorTest { - @Test(expected = PipeParameterNotValidException.class) - public void testIoTDBLegacyPipeConnectorToSelf() throws Exception { - try (IoTDBLegacyPipeConnector connector = new IoTDBLegacyPipeConnector()) { - connector.validate( - new PipeParameterValidator( - new PipeParameters( - new HashMap<String, String>() { - { - put( - PipeConnectorConstant.CONNECTOR_KEY, - BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName()); - put(PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY, "127.0.0.1"); - put(PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY, "6667"); - } - }))); - } - } - @Test public void testIoTDBLegacyPipeConnectorToOthers() { try (IoTDBLegacyPipeConnector connector = new IoTDBLegacyPipeConnector()) { @@ -73,24 +54,6 @@ public class PipeConnectorTest { } } - @Test(expected = PipeParameterNotValidException.class) - public void testIoTDBThriftSyncConnectorToSelf() throws Exception { - try (IoTDBDataRegionSyncConnector connector = new IoTDBDataRegionSyncConnector()) { - connector.validate( - new PipeParameterValidator( - new PipeParameters( - new HashMap<String, String>() { - { - put( - PipeConnectorConstant.CONNECTOR_KEY, - BuiltinPipePlugin.IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName()); - put(PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY, "127.0.0.1"); - put(PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY, "6667"); - } - }))); - } - } - @Test public void testIoTDBThriftSyncConnectorToOthers() { try (IoTDBDataRegionSyncConnector connector = new IoTDBDataRegionSyncConnector()) { @@ -111,23 +74,6 @@ public class PipeConnectorTest { } } - @Test(expected = PipeParameterNotValidException.class) - public void testIoTDBThriftAsyncConnectorToSelf() throws Exception { - try (IoTDBDataRegionAsyncConnector connector = new IoTDBDataRegionAsyncConnector()) { - connector.validate( - new PipeParameterValidator( - new PipeParameters( - new HashMap<String, String>() { - { - put( - PipeConnectorConstant.CONNECTOR_KEY, - BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName()); - put(PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY, "127.0.0.1:6667"); - } - }))); - } - } - @Test public void testIoTDBThriftAsyncConnectorToOthers() { try (IoTDBDataRegionAsyncConnector connector = new IoTDBDataRegionAsyncConnector()) {
