This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/2.0.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f1517cade60680050a2b2a3b1c69ed3e822ef2b0 Author: Caideyipi <[email protected]> AuthorDate: Fri Aug 1 14:11:10 2025 +0800 Pipe: Fixed the default param of single entry of disruptor queue & Banned memory checks from some missing pipe ITs & Do not check non-user pipes (#16069) * Update CommonConfig.java * refactor * try-fix * test * revert-pom * fix * fix (cherry picked from commit 19810e5efaff1c5e363bc72b81d75a41792b04ac) --- .../it/env/cluster/config/MppCommonConfig.java | 6 + .../env/cluster/config/MppSharedCommonConfig.java | 7 + .../it/env/remote/config/RemoteCommonConfig.java | 5 + .../org/apache/iotdb/itbase/env/CommonConfig.java | 2 + .../manual/AbstractPipeTableModelDualManualIT.java | 2 + .../manual/basic/IoTDBPipePermissionIT.java | 6 +- .../manual/basic/IoTDBPipeProtocolIT.java | 42 +++--- .../tablemodel/manual/basic/IoTDBPipeSourceIT.java | 15 ++- .../manual/enhanced/IoTDBPipeClusterIT.java | 14 +- .../treemodel/auto/basic/IoTDBPipeProcessorIT.java | 2 + .../treemodel/auto/basic/IoTDBPipeProtocolIT.java | 2 + .../treemodel/auto/basic/IoTDBPipeSourceIT.java | 2 + .../auto/enhanced/IoTDBPipeAutoConflictIT.java | 2 + .../auto/enhanced/IoTDBPipeClusterIT.java | 2 + .../auto/enhanced/IoTDBPipeIdempotentIT.java | 2 + .../auto/enhanced/IoTDBPipeSinkCompressionIT.java | 2 + .../auto/enhanced/IoTDBPipeWithLoadIT.java | 2 + .../manual/AbstractPipeDualTreeModelManualIT.java | 2 + .../it/cluster/IoTDBSubscriptionRestartIT.java | 1 + .../it/local/AbstractSubscriptionLocalIT.java | 1 + .../it/triple/AbstractSubscriptionTripleIT.java | 1 + .../apache/iotdb/tools/it/ExportTsFileTestIT.java | 1 + .../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 142 ++++++++++++--------- ...istoricalDataRegionTsFileAndDeletionSource.java | 1 - .../listener/PipeInsertionDataNodeListener.java | 4 + .../apache/iotdb/commons/conf/CommonConfig.java | 4 +- .../commons/pipe/agent/task/PipeTaskAgent.java | 2 + 27 files changed, 177 insertions(+), 97 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java index 78baab77218..ef3856b8068 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java @@ -453,6 +453,12 @@ public class MppCommonConfig extends MppBaseConfig implements CommonConfig { return this; } + @Override + public CommonConfig setPipeMemoryManagementEnabled(boolean pipeMemoryManagementEnabled) { + setProperty("pipe_memory_management_enabled", String.valueOf(pipeMemoryManagementEnabled)); + return this; + } + @Override public CommonConfig setIsPipeEnableMemoryCheck(boolean isPipeEnableMemoryCheck) { setProperty("pipe_enable_memory_checked", String.valueOf(isPipeEnableMemoryCheck)); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java index 6db1003f5b9..59578423c61 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java @@ -466,6 +466,13 @@ public class MppSharedCommonConfig implements CommonConfig { return this; } + @Override + public CommonConfig setPipeMemoryManagementEnabled(boolean pipeMemoryManagementEnabled) { + dnConfig.setPipeMemoryManagementEnabled(pipeMemoryManagementEnabled); + cnConfig.setPipeMemoryManagementEnabled(pipeMemoryManagementEnabled); + return this; + } + @Override public CommonConfig setIsPipeEnableMemoryCheck(boolean isPipeEnableMemoryCheck) { dnConfig.setIsPipeEnableMemoryCheck(isPipeEnableMemoryCheck); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java index 55a772a60da..62526dca5b2 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java @@ -328,6 +328,11 @@ public class RemoteCommonConfig implements CommonConfig { return this; } + @Override + public CommonConfig setPipeMemoryManagementEnabled(boolean pipeMemoryManagementEnabled) { + return this; + } + @Override public CommonConfig setIsPipeEnableMemoryCheck(boolean isPipeEnableMemoryCheck) { return this; diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java index 6f02ebfc770..ba503551e9c 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java @@ -146,6 +146,8 @@ public interface CommonConfig { CommonConfig setSchemaRegionPerDataNode(double schemaRegionPerDataNode); + CommonConfig setPipeMemoryManagementEnabled(boolean pipeMemoryManagementEnabled); + CommonConfig setIsPipeEnableMemoryCheck(boolean isPipeEnableMemoryCheck); CommonConfig setPipeAirGapReceiverEnabled(boolean isPipeAirGapReceiverEnabled); diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java index dd921f33bc7..91fea245335 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java @@ -49,6 +49,7 @@ public abstract class AbstractPipeTableModelDualManualIT { .setAutoCreateSchemaEnabled(true) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); receiverEnv .getConfig() @@ -56,6 +57,7 @@ public abstract class AbstractPipeTableModelDualManualIT { .setAutoCreateSchemaEnabled(true) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); // 10 min, assert that the operations will not time out diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java index 7a3f90e687a..8c56fd6a799 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java @@ -68,7 +68,9 @@ public class IoTDBPipePermissionIT extends AbstractPipeTableModelDualManualIT { .setDefaultSchemaRegionGroupNumPerDatabase(1) .setTimestampPrecision("ms") .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) - .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS); + .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setPipeMemoryManagementEnabled(false) + .setIsPipeEnableMemoryCheck(false); receiverEnv .getConfig() .getCommonConfig() @@ -77,6 +79,8 @@ public class IoTDBPipePermissionIT extends AbstractPipeTableModelDualManualIT { .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) + .setPipeMemoryManagementEnabled(false) + .setIsPipeEnableMemoryCheck(false) .setSchemaReplicationFactor(3) .setDataReplicationFactor(2); diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java index 4e18091bd07..939e84b9f49 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java @@ -76,7 +76,10 @@ public class IoTDBPipeProtocolIT extends AbstractPipeTableModelDualManualIT { .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setDataRegionConsensusProtocolClass(dataRegionConsensus) .setSchemaReplicationFactor(schemaRegionReplicationFactor) - .setDataReplicationFactor(dataRegionReplicationFactor); + .setDataReplicationFactor(dataRegionReplicationFactor) + .setDnConnectionTimeoutMs(600000) + .setPipeMemoryManagementEnabled(false) + .setIsPipeEnableMemoryCheck(false); receiverEnv .getConfig() .getCommonConfig() @@ -85,11 +88,10 @@ public class IoTDBPipeProtocolIT extends AbstractPipeTableModelDualManualIT { .setSchemaRegionConsensusProtocolClass(schemaRegionConsensus) .setDataRegionConsensusProtocolClass(dataRegionConsensus) .setSchemaReplicationFactor(schemaRegionReplicationFactor) - .setDataReplicationFactor(dataRegionReplicationFactor); - - // 10 min, assert that the operations will not time out - senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000); - receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000); + .setDataReplicationFactor(dataRegionReplicationFactor) + .setDnConnectionTimeoutMs(600000) + .setPipeMemoryManagementEnabled(false) + .setIsPipeEnableMemoryCheck(false); senderEnv.initClusterEnvironment(configNodesNum, dataNodesNum); receiverEnv.initClusterEnvironment(configNodesNum, dataNodesNum); @@ -170,7 +172,10 @@ public class IoTDBPipeProtocolIT extends AbstractPipeTableModelDualManualIT { .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) .setSchemaReplicationFactor(3) - .setDataReplicationFactor(2); + .setDataReplicationFactor(2) + .setDnConnectionTimeoutMs(600000) + .setPipeMemoryManagementEnabled(false) + .setIsPipeEnableMemoryCheck(false); receiverEnv .getConfig() .getCommonConfig() @@ -179,11 +184,10 @@ public class IoTDBPipeProtocolIT extends AbstractPipeTableModelDualManualIT { .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) .setSchemaReplicationFactor(1) - .setDataReplicationFactor(1); - - // 10 min, assert that the operations will not time out - senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000); - receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000); + .setDataReplicationFactor(1) + .setDnConnectionTimeoutMs(600000) + .setPipeMemoryManagementEnabled(false) + .setIsPipeEnableMemoryCheck(false); senderEnv.initClusterEnvironment(3, 3); receiverEnv.initClusterEnvironment(1, 1); @@ -379,7 +383,10 @@ public class IoTDBPipeProtocolIT extends AbstractPipeTableModelDualManualIT { .setDataReplicationFactor(1) .setEnableSeqSpaceCompaction(false) .setEnableUnseqSpaceCompaction(false) - .setEnableCrossSpaceCompaction(false); + .setEnableCrossSpaceCompaction(false) + .setDnConnectionTimeoutMs(600000) + .setPipeMemoryManagementEnabled(false) + .setIsPipeEnableMemoryCheck(false); receiverEnv .getConfig() .getCommonConfig() @@ -389,11 +396,10 @@ public class IoTDBPipeProtocolIT extends AbstractPipeTableModelDualManualIT { .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) .setSchemaReplicationFactor(3) - .setDataReplicationFactor(2); - - // 10 min, assert that the operations will not time out - senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000); - receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000); + .setDataReplicationFactor(2) + .setDnConnectionTimeoutMs(600000) + .setPipeMemoryManagementEnabled(false) + .setIsPipeEnableMemoryCheck(false); senderEnv.initClusterEnvironment(1, 1); receiverEnv.initClusterEnvironment(1, 3); diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java index 1b43ce561c0..83727deb1b8 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java @@ -71,17 +71,20 @@ public class IoTDBPipeSourceIT extends AbstractPipeTableModelDualManualIT { // Disable sender compaction for tsfile determination in loose range test .setEnableSeqSpaceCompaction(false) .setEnableUnseqSpaceCompaction(false) - .setEnableCrossSpaceCompaction(false); + .setEnableCrossSpaceCompaction(false) + .setDnConnectionTimeoutMs(600000) + .setPipeMemoryManagementEnabled(false) + .setIsPipeEnableMemoryCheck(false); + receiverEnv .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) - .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS); - - // 10 min, assert that the operations will not time out - senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000); - receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000); + .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setDnConnectionTimeoutMs(600000) + .setPipeMemoryManagementEnabled(false) + .setIsPipeEnableMemoryCheck(false); senderEnv.initClusterEnvironment(); receiverEnv.initClusterEnvironment(); diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java index 985bfe752b6..3911b1f4666 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java @@ -74,7 +74,10 @@ public class IoTDBPipeClusterIT extends AbstractPipeTableModelDualManualIT { .setAutoCreateSchemaEnabled(true) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) - .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS); + .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) + .setDnConnectionTimeoutMs(600000) + .setPipeMemoryManagementEnabled(false) + .setIsPipeEnableMemoryCheck(false); receiverEnv .getConfig() @@ -84,11 +87,10 @@ public class IoTDBPipeClusterIT extends AbstractPipeTableModelDualManualIT { .setSchemaReplicationFactor(3) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) - .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS); - - // 10 min, assert that the operations will not time out - senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000); - receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000); + .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) + .setDnConnectionTimeoutMs(600000) + .setPipeMemoryManagementEnabled(false) + .setIsPipeEnableMemoryCheck(false); senderEnv.initClusterEnvironment(3, 3, 180); receiverEnv.initClusterEnvironment(3, 3, 180); diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java index 9c755c1b0bf..95dc04df4c6 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java @@ -62,6 +62,7 @@ public class IoTDBPipeProcessorIT extends AbstractPipeDualTreeModelAutoIT { .setTimestampPrecision("ms") .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); receiverEnv .getConfig() @@ -69,6 +70,7 @@ public class IoTDBPipeProcessorIT extends AbstractPipeDualTreeModelAutoIT { .setAutoCreateSchemaEnabled(true) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); // 10 min, assert that the operations will not time out diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java index 4358bc2a203..1cb822d0aa5 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java @@ -77,6 +77,7 @@ public class IoTDBPipeProtocolIT extends AbstractPipeDualTreeModelAutoIT { .setDataRegionConsensusProtocolClass(dataRegionConsensus) .setSchemaReplicationFactor(schemaRegionReplicationFactor) .setDataReplicationFactor(dataRegionReplicationFactor) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); receiverEnv .getConfig() @@ -87,6 +88,7 @@ public class IoTDBPipeProtocolIT extends AbstractPipeDualTreeModelAutoIT { .setDataRegionConsensusProtocolClass(dataRegionConsensus) .setSchemaReplicationFactor(schemaRegionReplicationFactor) .setDataReplicationFactor(dataRegionReplicationFactor) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); // 10 min, assert that the operations will not time out diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java index 060f96f36a9..fb766a1fedf 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java @@ -73,6 +73,7 @@ public class IoTDBPipeSourceIT extends AbstractPipeDualTreeModelAutoIT { .setEnableSeqSpaceCompaction(false) .setEnableUnseqSpaceCompaction(false) .setEnableCrossSpaceCompaction(false) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); receiverEnv .getConfig() @@ -80,6 +81,7 @@ public class IoTDBPipeSourceIT extends AbstractPipeDualTreeModelAutoIT { .setAutoCreateSchemaEnabled(true) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); // 10 min, assert that the operations will not time out diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeAutoConflictIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeAutoConflictIT.java index ac70fc47349..e12b96ada17 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeAutoConflictIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeAutoConflictIT.java @@ -62,6 +62,7 @@ public class IoTDBPipeAutoConflictIT extends AbstractPipeDualTreeModelAutoIT { .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); receiverEnv .getConfig() @@ -70,6 +71,7 @@ public class IoTDBPipeAutoConflictIT extends AbstractPipeDualTreeModelAutoIT { .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); // 10 min, assert that the operations will not time out diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java index afde95f701b..4b16ab5ca1b 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java @@ -77,6 +77,7 @@ public class IoTDBPipeClusterIT extends AbstractPipeDualTreeModelAutoIT { .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); receiverEnv @@ -88,6 +89,7 @@ public class IoTDBPipeClusterIT extends AbstractPipeDualTreeModelAutoIT { .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); // 10 min, assert that the operations will not time out diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java index 6034a9be97d..63b81881b8d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java @@ -68,6 +68,7 @@ public class IoTDBPipeIdempotentIT extends AbstractPipeDualTreeModelAutoIT { .setDefaultSchemaRegionGroupNumPerDatabase(1) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); receiverEnv .getConfig() @@ -75,6 +76,7 @@ public class IoTDBPipeIdempotentIT extends AbstractPipeDualTreeModelAutoIT { .setAutoCreateSchemaEnabled(true) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); // 10 min, assert that the operations will not time out diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java index 9cdeb224151..0a35386c5f8 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java @@ -69,6 +69,7 @@ public class IoTDBPipeSinkCompressionIT extends AbstractPipeDualTreeModelAutoIT .setAutoCreateSchemaEnabled(true) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); receiverEnv @@ -78,6 +79,7 @@ public class IoTDBPipeSinkCompressionIT extends AbstractPipeDualTreeModelAutoIT .setPipeAirGapReceiverEnabled(true) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); // 10 min, assert that the operations will not time out diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeWithLoadIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeWithLoadIT.java index ea3d4252c44..1001dc79fd4 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeWithLoadIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeWithLoadIT.java @@ -63,6 +63,7 @@ public class IoTDBPipeWithLoadIT extends AbstractPipeDualTreeModelAutoIT { .setEnableSeqSpaceCompaction(false) .setEnableUnseqSpaceCompaction(false) .setEnableCrossSpaceCompaction(false) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); receiverEnv .getConfig() @@ -70,6 +71,7 @@ public class IoTDBPipeWithLoadIT extends AbstractPipeDualTreeModelAutoIT { .setAutoCreateSchemaEnabled(true) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); // 10 min, assert that the operations will not time out diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java index bf0fe81482d..44983d23df3 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java @@ -57,6 +57,7 @@ public abstract class AbstractPipeDualTreeModelManualIT { .setAutoCreateSchemaEnabled(false) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); receiverEnv .getConfig() @@ -64,6 +65,7 @@ public abstract class AbstractPipeDualTreeModelManualIT { .setAutoCreateSchemaEnabled(false) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); // 10 min, assert that the operations will not time out diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/cluster/IoTDBSubscriptionRestartIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/cluster/IoTDBSubscriptionRestartIT.java index c118166847c..2042e0fa1ce 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/cluster/IoTDBSubscriptionRestartIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/cluster/IoTDBSubscriptionRestartIT.java @@ -83,6 +83,7 @@ public class IoTDBSubscriptionRestartIT extends AbstractSubscriptionIT { .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) .setSchemaReplicationFactor(3) .setDataReplicationFactor(2) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); EnvFactory.getEnv().initClusterEnvironment(3, 3); diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/AbstractSubscriptionLocalIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/AbstractSubscriptionLocalIT.java index 7b564023498..fe667480c86 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/AbstractSubscriptionLocalIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/AbstractSubscriptionLocalIT.java @@ -37,6 +37,7 @@ public abstract class AbstractSubscriptionLocalIT extends AbstractSubscriptionIT .getConfig() .getCommonConfig() .setSubscriptionEnabled(true) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); EnvFactory.getEnv().initClusterEnvironment(); diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java index b2cfede0945..bf57f904c26 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java @@ -74,6 +74,7 @@ public abstract class AbstractSubscriptionTripleIT extends AbstractSubscriptionI sender .getConfig() .getCommonConfig() + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false) .setSubscriptionPrefetchTsFileBatchMaxDelayInMs(500) .setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(32 * 1024); diff --git a/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java b/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java index da8ffdc6823..3c84a8fd9f9 100644 --- a/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java @@ -57,6 +57,7 @@ public class ExportTsFileTestIT extends AbstractScriptIT { .getConfig() .getCommonConfig() .setSubscriptionEnabled(true) + .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false); EnvFactory.getEnv().initClusterEnvironment(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 1286d26f6c1..f8cbe0b45b9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -30,6 +30,7 @@ import org.apache.iotdb.commons.consensus.SchemaRegionId; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.MetaProgressIndex; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.agent.task.PipeTask; import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta; @@ -170,20 +171,20 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { final PipeTaskMeta pipeTaskMeta) throws IllegalPathException { if (pipeTaskMeta.getLeaderNodeId() == CONFIG.getDataNodeId()) { - final PipeParameters extractorParameters = pipeStaticMeta.getExtractorParameters(); + final PipeParameters sourceParameters = pipeStaticMeta.getExtractorParameters(); final DataRegionId dataRegionId = new DataRegionId(consensusGroupId); final boolean needConstructDataRegionTask = StorageEngine.getInstance().getAllDataRegionIds().contains(dataRegionId) && DataRegionListeningFilter.shouldDataRegionBeListened( - extractorParameters, dataRegionId); + sourceParameters, dataRegionId); final boolean needConstructSchemaRegionTask = SchemaEngine.getInstance() .getAllSchemaRegionIds() .contains(new SchemaRegionId(consensusGroupId)) && SchemaRegionListeningFilter.shouldSchemaRegionBeListened( - consensusGroupId, extractorParameters); + consensusGroupId, sourceParameters); - // Advance the extractor parameters parsing logic to avoid creating un-relevant pipeTasks + // Advance the source parameters parsing logic to avoid creating un-relevant pipeTasks if ( // For external source PipeRuntimeMeta.isSourceExternal(consensusGroupId) @@ -426,7 +427,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { || pipeTaskMap.entrySet().stream() .filter(entry -> dataRegionIds.contains(entry.getKey())) .allMatch(entry -> ((PipeDataNodeTask) entry.getValue()).isCompleted()); - final String extractorModeValue = + final String sourceModeValue = pipeMeta .getStaticMeta() .getExtractorParameters() @@ -438,9 +439,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair( pipeMeta.getStaticMeta().getExtractorParameters()) .getLeft() - && (extractorModeValue.equalsIgnoreCase( - PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE) - || extractorModeValue.equalsIgnoreCase( + && (sourceModeValue.equalsIgnoreCase(PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE) + || sourceModeValue.equalsIgnoreCase( PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE)); final boolean isCompleted = isAllDataRegionCompleted && includeDataAndNeedDrop; @@ -595,12 +595,12 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { Arrays.asList(EXTRACTOR_MODE_SNAPSHOT_KEY, SOURCE_MODE_SNAPSHOT_KEY), EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE); } else { - final String extractorModeValue = + final String sourceModeValue = parameters.getStringOrDefault( Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY), EXTRACTOR_MODE_DEFAULT_VALUE); isSnapshotMode = - extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE) - || extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE); + sourceModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE) + || sourceModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE); } return isSnapshotMode; } @@ -689,24 +689,24 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { @Override protected void calculateMemoryUsage( - final PipeParameters extractorParameters, + final PipeStaticMeta staticMeta, + final PipeParameters sourceParameters, final PipeParameters processorParameters, - final PipeParameters connectorParameters) { - if (!PipeConfig.getInstance().isPipeEnableMemoryCheck()) { + final PipeParameters sinkParameters) { + if (!PipeConfig.getInstance().isPipeEnableMemoryCheck() + || !isInnerSource(sourceParameters) + || !PipeType.USER.equals(staticMeta.getPipeType())) { return; } - calculateInsertNodeQueueMemory(extractorParameters, processorParameters, connectorParameters); + calculateInsertNodeQueueMemory(sourceParameters); long needMemory = 0; - needMemory += - calculateTsFileParserMemory(extractorParameters, processorParameters, connectorParameters); - needMemory += - calculateSinkBatchMemory(extractorParameters, processorParameters, connectorParameters); - needMemory += - calculateSendTsFileReadBufferMemory( - extractorParameters, processorParameters, connectorParameters); + needMemory += calculateTsFileParserMemory(sourceParameters, sinkParameters); + needMemory += calculateSinkBatchMemory(sinkParameters); + needMemory += calculateSendTsFileReadBufferMemory(sourceParameters, sinkParameters); + needMemory += calculateAssignerMemory(sourceParameters); PipeMemoryManager pipeMemoryManager = PipeDataNodeResourceManager.memory(); final long freeMemorySizeInBytes = pipeMemoryManager.getFreeMemorySizeInBytes(); @@ -727,13 +727,22 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { } } - private void calculateInsertNodeQueueMemory( - final PipeParameters extractorParameters, - final PipeParameters processorParameters, - final PipeParameters connectorParameters) { + private boolean isInnerSource(final PipeParameters sourceParameters) { + final String pluginName = + sourceParameters + .getStringOrDefault( + Arrays.asList(PipeSourceConstant.EXTRACTOR_KEY, PipeSourceConstant.SOURCE_KEY), + BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName()) + .toLowerCase(); + + return pluginName.equals(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName()) + || pluginName.equals(BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName()); + } + + private void calculateInsertNodeQueueMemory(final PipeParameters sourceParameters) { - // Realtime extractor is enabled by default, so we only need to check the source realtime - if (!extractorParameters.getBooleanOrDefault( + // Realtime source is enabled by default, so we only need to check the source realtime + if (!sourceParameters.getBooleanOrDefault( Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY), EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) { return; @@ -741,7 +750,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { // If the realtime mode is batch or file, we do not need to allocate memory final String realtimeMode = - extractorParameters.getStringByKeys( + sourceParameters.getStringByKeys( PipeSourceConstant.EXTRACTOR_REALTIME_MODE_KEY, PipeSourceConstant.SOURCE_REALTIME_MODE_KEY); if (PipeSourceConstant.EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE.equals(realtimeMode) @@ -764,53 +773,50 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { } private long calculateTsFileParserMemory( - final PipeParameters extractorParameters, - final PipeParameters processorParameters, - final PipeParameters connectorParameters) { + final PipeParameters sourceParameters, final PipeParameters sinkParameters) { - // If the extractor is not history, we do not need to allocate memory + // If the source is not history, we do not need to allocate memory boolean isExtractorHistory = - extractorParameters.getBooleanOrDefault( + sourceParameters.getBooleanOrDefault( SystemConstant.RESTART_KEY, SystemConstant.RESTART_DEFAULT_VALUE) - || extractorParameters.getBooleanOrDefault( + || sourceParameters.getBooleanOrDefault( Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, SOURCE_HISTORY_ENABLE_KEY), EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE); - // If the extractor is history, and has start/end time, we need to allocate memory + // If the source is history, and has start/end time, we need to allocate memory boolean isTSFileParser = isExtractorHistory - && extractorParameters.hasAnyAttributes( + && sourceParameters.hasAnyAttributes( EXTRACTOR_HISTORY_START_TIME_KEY, SOURCE_HISTORY_START_TIME_KEY); isTSFileParser = isTSFileParser || (isExtractorHistory - && extractorParameters.hasAnyAttributes( + && sourceParameters.hasAnyAttributes( EXTRACTOR_HISTORY_END_TIME_KEY, SOURCE_HISTORY_END_TIME_KEY)); - // if the extractor has start/end time, we need to allocate memory + // if the source has start/end time, we need to allocate memory isTSFileParser = isTSFileParser - || extractorParameters.hasAnyAttributes( - SOURCE_START_TIME_KEY, EXTRACTOR_START_TIME_KEY); + || sourceParameters.hasAnyAttributes(SOURCE_START_TIME_KEY, EXTRACTOR_START_TIME_KEY); isTSFileParser = isTSFileParser - || extractorParameters.hasAnyAttributes(SOURCE_END_TIME_KEY, EXTRACTOR_END_TIME_KEY); + || sourceParameters.hasAnyAttributes(SOURCE_END_TIME_KEY, EXTRACTOR_END_TIME_KEY); - // If the extractor has pattern or path, we need to allocate memory + // If the source has pattern or path, we need to allocate memory isTSFileParser = isTSFileParser - || extractorParameters.hasAnyAttributes(EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY); + || sourceParameters.hasAnyAttributes(EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY); isTSFileParser = - isTSFileParser || extractorParameters.hasAnyAttributes(EXTRACTOR_PATH_KEY, SOURCE_PATH_KEY); + isTSFileParser || sourceParameters.hasAnyAttributes(EXTRACTOR_PATH_KEY, SOURCE_PATH_KEY); - // If the extractor is not hybrid, we do need to allocate memory + // If the source is not hybrid, we do need to allocate memory isTSFileParser = isTSFileParser || !PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE.equals( - connectorParameters.getStringOrDefault( + sinkParameters.getStringOrDefault( Arrays.asList( PipeSinkConstant.CONNECTOR_FORMAT_KEY, PipeSinkConstant.SINK_FORMAT_KEY), PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE)); @@ -822,15 +828,12 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { return PipeConfig.getInstance().getTsFileParserMemory(); } - private long calculateSinkBatchMemory( - final PipeParameters extractorParameters, - final PipeParameters processorParameters, - final PipeParameters connectorParameters) { + private long calculateSinkBatchMemory(final PipeParameters sinkParameters) { - // If the connector format is tsfile , we need to use batch + // If the sink format is tsfile , we need to use batch boolean needUseBatch = PipeSinkConstant.CONNECTOR_FORMAT_TS_FILE_VALUE.equals( - connectorParameters.getStringOrDefault( + sinkParameters.getStringOrDefault( Arrays.asList( PipeSinkConstant.CONNECTOR_FORMAT_KEY, PipeSinkConstant.SINK_FORMAT_KEY), PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE)); @@ -839,9 +842,9 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { return PipeConfig.getInstance().getSinkBatchMemoryTsFile(); } - // If the connector is batch mode, we need to use batch + // If the sink is batch mode, we need to use batch needUseBatch = - connectorParameters.getBooleanOrDefault( + sinkParameters.getBooleanOrDefault( Arrays.asList( PipeSinkConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY, PipeSinkConstant.SINK_IOTDB_BATCH_MODE_ENABLE_KEY), @@ -855,23 +858,21 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { } private long calculateSendTsFileReadBufferMemory( - final PipeParameters extractorParameters, - final PipeParameters processorParameters, - final PipeParameters connectorParameters) { - // If the extractor is history enable, we need to transfer tsfile + final PipeParameters sourceParameters, final PipeParameters sinkParameters) { + // If the source is history enable, we need to transfer tsfile boolean needTransferTsFile = - extractorParameters.getBooleanOrDefault( + sourceParameters.getBooleanOrDefault( SystemConstant.RESTART_KEY, SystemConstant.RESTART_DEFAULT_VALUE) - || extractorParameters.getBooleanOrDefault( + || sourceParameters.getBooleanOrDefault( Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, SOURCE_HISTORY_ENABLE_KEY), EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE); String format = - connectorParameters.getStringOrDefault( + sinkParameters.getStringOrDefault( Arrays.asList(PipeSinkConstant.CONNECTOR_FORMAT_KEY, PipeSinkConstant.SINK_FORMAT_KEY), PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE); - // If the connector format is tsfile and hybrid, we need to transfer tsfile + // If the sink format is tsfile and hybrid, we need to transfer tsfile needTransferTsFile = needTransferTsFile || PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE.equals(format) @@ -883,4 +884,19 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { return PipeConfig.getInstance().getSendTsFileReadBuffer(); } + + private long calculateAssignerMemory(final PipeParameters sourceParameters) { + try { + if (!PipeInsertionDataNodeListener.getInstance().isEmpty() + || !DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(sourceParameters) + .getLeft()) { + return 0; + } + return PipeConfig.getInstance().getPipeExtractorAssignerDisruptorRingBufferSize() + * PipeConfig.getInstance().getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes() + * Math.min(StorageEngine.getInstance().getDataRegionNumber(), 10); + } catch (final IllegalPathException e) { + return 0; + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java index b14de1699ed..af38d626f4e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java @@ -119,7 +119,6 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource LoggerFactory.getLogger(PipeHistoricalDataRegionTsFileAndDeletionSource.class); private static final Map<Integer, Long> DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP = new HashMap<>(); - private static final long PIPE_MIN_FLUSH_INTERVAL_IN_MS = 2000; private static final String TREE_MODEL_EVENT_TABLE_NAME_PREFIX = PATH_ROOT + PATH_SEPARATOR; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java index 088909f5888..dff966e72e8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java @@ -172,6 +172,10 @@ public class PipeInsertionDataNodeListener { PipeRealtimeEventFactory.createRealtimeEvent(key, shouldPrintMessage))); } + public boolean isEmpty() { + return dataRegionId2Assigner.isEmpty(); + } + //////////////////////////// Permission change //////////////////////////// public void invalidateAllCache() { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index f9697bd585f..604e65e153b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -251,8 +251,8 @@ public class CommonConfig { private long pipeMaxWaitFinishTime = 10 * 1000; - private int pipeExtractorAssignerDisruptorRingBufferSize = 65536; - private long pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes = 50; // 50B + private int pipeExtractorAssignerDisruptorRingBufferSize = 128; + private long pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes = 72 * KB; private long pipeExtractorMatcherCacheSize = 1024; private int pipeConnectorHandshakeTimeoutMs = 10 * 1000; // 10 seconds diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java index 7848ba96769..779871def33 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java @@ -478,6 +478,7 @@ public abstract class PipeTaskAgent { final long creationTime = pipeMetaFromCoordinator.getStaticMeta().getCreationTime(); calculateMemoryUsage( + pipeMetaFromCoordinator.getStaticMeta(), pipeMetaFromCoordinator.getStaticMeta().getExtractorParameters(), pipeMetaFromCoordinator.getStaticMeta().getProcessorParameters(), pipeMetaFromCoordinator.getStaticMeta().getConnectorParameters()); @@ -523,6 +524,7 @@ public abstract class PipeTaskAgent { } protected void calculateMemoryUsage( + final PipeStaticMeta staticMeta, final PipeParameters extractorParameters, final PipeParameters processorParameters, final PipeParameters connectorParameters) {
