This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 19810e5efaf Pipe: Fixed the default param of single entry of disruptor
queue & Banned memory checks from some missing pipe ITs & Do not check non-user
pipes (#16069)
19810e5efaf is described below
commit 19810e5efaff1c5e363bc72b81d75a41792b04ac
Author: Caideyipi <[email protected]>
AuthorDate: Fri Aug 1 14:11:10 2025 +0800
Pipe: Fixed the default param of single entry of disruptor queue & Banned
memory checks from some missing pipe ITs & Do not check non-user pipes (#16069)
* Update CommonConfig.java
* refactor
* try-fix
* test
* revert-pom
* fix
* fix
---
.../it/env/cluster/config/MppCommonConfig.java | 6 +
.../env/cluster/config/MppSharedCommonConfig.java | 7 +
.../it/env/remote/config/RemoteCommonConfig.java | 5 +
.../org/apache/iotdb/itbase/env/CommonConfig.java | 2 +
.../manual/AbstractPipeTableModelDualManualIT.java | 2 +
.../manual/basic/IoTDBPipePermissionIT.java | 2 +
.../manual/basic/IoTDBPipeProtocolIT.java | 42 +++---
.../tablemodel/manual/basic/IoTDBPipeSourceIT.java | 14 +-
.../manual/enhanced/IoTDBPipeClusterIT.java | 14 +-
.../treemodel/auto/basic/IoTDBPipeProcessorIT.java | 2 +
.../treemodel/auto/basic/IoTDBPipeProtocolIT.java | 2 +
.../treemodel/auto/basic/IoTDBPipeSourceIT.java | 2 +
.../auto/enhanced/IoTDBPipeAutoConflictIT.java | 2 +
.../auto/enhanced/IoTDBPipeClusterIT.java | 2 +
.../auto/enhanced/IoTDBPipeIdempotentIT.java | 2 +
.../auto/enhanced/IoTDBPipeSinkCompressionIT.java | 2 +
.../auto/enhanced/IoTDBPipeWithLoadIT.java | 2 +
.../manual/AbstractPipeDualTreeModelManualIT.java | 2 +
.../it/cluster/IoTDBSubscriptionRestartIT.java | 1 +
.../it/local/AbstractSubscriptionLocalIT.java | 1 +
.../it/triple/AbstractSubscriptionTripleIT.java | 1 +
.../apache/iotdb/tools/it/ExportTsFileTestIT.java | 1 +
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 142 ++++++++++++---------
...istoricalDataRegionTsFileAndDeletionSource.java | 1 -
.../listener/PipeInsertionDataNodeListener.java | 4 +
.../apache/iotdb/commons/conf/CommonConfig.java | 4 +-
.../commons/pipe/agent/task/PipeTaskAgent.java | 2 +
27 files changed, 173 insertions(+), 96 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index c6cbc091c2e..04977b309fb 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -453,6 +453,12 @@ public class MppCommonConfig extends MppBaseConfig
implements CommonConfig {
return this;
}
+ @Override
+ public CommonConfig setPipeMemoryManagementEnabled(boolean
pipeMemoryManagementEnabled) {
+ setProperty("pipe_memory_management_enabled",
String.valueOf(pipeMemoryManagementEnabled));
+ return this;
+ }
+
@Override
public CommonConfig setIsPipeEnableMemoryCheck(boolean
isPipeEnableMemoryCheck) {
setProperty("pipe_enable_memory_checked",
String.valueOf(isPipeEnableMemoryCheck));
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index 2fdf256f11b..37295b92ab4 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -466,6 +466,13 @@ public class MppSharedCommonConfig implements CommonConfig
{
return this;
}
+ @Override
+ public CommonConfig setPipeMemoryManagementEnabled(boolean
pipeMemoryManagementEnabled) {
+ dnConfig.setPipeMemoryManagementEnabled(pipeMemoryManagementEnabled);
+ cnConfig.setPipeMemoryManagementEnabled(pipeMemoryManagementEnabled);
+ return this;
+ }
+
@Override
public CommonConfig setIsPipeEnableMemoryCheck(boolean
isPipeEnableMemoryCheck) {
dnConfig.setIsPipeEnableMemoryCheck(isPipeEnableMemoryCheck);
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index ba82d7d9c77..2db51185bf6 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -328,6 +328,11 @@ public class RemoteCommonConfig implements CommonConfig {
return this;
}
+ @Override
+ public CommonConfig setPipeMemoryManagementEnabled(boolean
pipeMemoryManagementEnabled) {
+ return this;
+ }
+
@Override
public CommonConfig setIsPipeEnableMemoryCheck(boolean
isPipeEnableMemoryCheck) {
return this;
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index 370fca6216b..1623f83c7a7 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -146,6 +146,8 @@ public interface CommonConfig {
CommonConfig setSchemaRegionPerDataNode(double schemaRegionPerDataNode);
+ CommonConfig setPipeMemoryManagementEnabled(boolean
pipeMemoryManagementEnabled);
+
CommonConfig setIsPipeEnableMemoryCheck(boolean isPipeEnableMemoryCheck);
CommonConfig setPipeAirGapReceiverEnabled(boolean
isPipeAirGapReceiverEnabled);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
index 899c123c56e..49b5c6d368a 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
@@ -50,6 +50,7 @@ public abstract class AbstractPipeTableModelDualManualIT {
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setEnforceStrongPassword(false)
+ .setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false);
receiverEnv
.getConfig()
@@ -58,6 +59,7 @@ public abstract class AbstractPipeTableModelDualManualIT {
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setEnforceStrongPassword(false)
+ .setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false);
// 10 min, assert that the operations will not time out
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
index 7f5d63f26fb..83616911639 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
@@ -69,6 +69,7 @@ public class IoTDBPipePermissionIT extends
AbstractPipeTableModelDualManualIT {
.setTimestampPrecision("ms")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false);
receiverEnv
.getConfig()
@@ -78,6 +79,7 @@ public class IoTDBPipePermissionIT extends
AbstractPipeTableModelDualManualIT {
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+ .setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false)
.setSchemaReplicationFactor(3)
.setDataReplicationFactor(2);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java
index 4e18091bd07..939e84b9f49 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java
@@ -76,7 +76,10 @@ public class IoTDBPipeProtocolIT extends
AbstractPipeTableModelDualManualIT {
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataRegionConsensusProtocolClass(dataRegionConsensus)
.setSchemaReplicationFactor(schemaRegionReplicationFactor)
- .setDataReplicationFactor(dataRegionReplicationFactor);
+ .setDataReplicationFactor(dataRegionReplicationFactor)
+ .setDnConnectionTimeoutMs(600000)
+ .setPipeMemoryManagementEnabled(false)
+ .setIsPipeEnableMemoryCheck(false);
receiverEnv
.getConfig()
.getCommonConfig()
@@ -85,11 +88,10 @@ public class IoTDBPipeProtocolIT extends
AbstractPipeTableModelDualManualIT {
.setSchemaRegionConsensusProtocolClass(schemaRegionConsensus)
.setDataRegionConsensusProtocolClass(dataRegionConsensus)
.setSchemaReplicationFactor(schemaRegionReplicationFactor)
- .setDataReplicationFactor(dataRegionReplicationFactor);
-
- // 10 min, assert that the operations will not time out
- senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
- receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
+ .setDataReplicationFactor(dataRegionReplicationFactor)
+ .setDnConnectionTimeoutMs(600000)
+ .setPipeMemoryManagementEnabled(false)
+ .setIsPipeEnableMemoryCheck(false);
senderEnv.initClusterEnvironment(configNodesNum, dataNodesNum);
receiverEnv.initClusterEnvironment(configNodesNum, dataNodesNum);
@@ -170,7 +172,10 @@ public class IoTDBPipeProtocolIT extends
AbstractPipeTableModelDualManualIT {
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
.setSchemaReplicationFactor(3)
- .setDataReplicationFactor(2);
+ .setDataReplicationFactor(2)
+ .setDnConnectionTimeoutMs(600000)
+ .setPipeMemoryManagementEnabled(false)
+ .setIsPipeEnableMemoryCheck(false);
receiverEnv
.getConfig()
.getCommonConfig()
@@ -179,11 +184,10 @@ public class IoTDBPipeProtocolIT extends
AbstractPipeTableModelDualManualIT {
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
.setSchemaReplicationFactor(1)
- .setDataReplicationFactor(1);
-
- // 10 min, assert that the operations will not time out
- senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
- receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
+ .setDataReplicationFactor(1)
+ .setDnConnectionTimeoutMs(600000)
+ .setPipeMemoryManagementEnabled(false)
+ .setIsPipeEnableMemoryCheck(false);
senderEnv.initClusterEnvironment(3, 3);
receiverEnv.initClusterEnvironment(1, 1);
@@ -379,7 +383,10 @@ public class IoTDBPipeProtocolIT extends
AbstractPipeTableModelDualManualIT {
.setDataReplicationFactor(1)
.setEnableSeqSpaceCompaction(false)
.setEnableUnseqSpaceCompaction(false)
- .setEnableCrossSpaceCompaction(false);
+ .setEnableCrossSpaceCompaction(false)
+ .setDnConnectionTimeoutMs(600000)
+ .setPipeMemoryManagementEnabled(false)
+ .setIsPipeEnableMemoryCheck(false);
receiverEnv
.getConfig()
.getCommonConfig()
@@ -389,11 +396,10 @@ public class IoTDBPipeProtocolIT extends
AbstractPipeTableModelDualManualIT {
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
.setSchemaReplicationFactor(3)
- .setDataReplicationFactor(2);
-
- // 10 min, assert that the operations will not time out
- senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
- receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
+ .setDataReplicationFactor(2)
+ .setDnConnectionTimeoutMs(600000)
+ .setPipeMemoryManagementEnabled(false)
+ .setIsPipeEnableMemoryCheck(false);
senderEnv.initClusterEnvironment(1, 1);
receiverEnv.initClusterEnvironment(1, 3);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java
index bbb9a26ee91..5dcf2d0e8e4 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java
@@ -71,7 +71,10 @@ public class IoTDBPipeSourceIT extends
AbstractPipeTableModelDualManualIT {
// Disable sender compaction for tsfile determination in loose range
test
.setEnableSeqSpaceCompaction(false)
.setEnableUnseqSpaceCompaction(false)
- .setEnableCrossSpaceCompaction(false);
+ .setEnableCrossSpaceCompaction(false)
+ .setDnConnectionTimeoutMs(600000)
+ .setPipeMemoryManagementEnabled(false)
+ .setIsPipeEnableMemoryCheck(false);
senderEnv.getConfig().getConfigNodeConfig().setLeaderDistributionPolicy("HASH");
receiverEnv
@@ -79,11 +82,10 @@ public class IoTDBPipeSourceIT extends
AbstractPipeTableModelDualManualIT {
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
-
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
-
- // 10 min, assert that the operations will not time out
- senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
- receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setDnConnectionTimeoutMs(600000)
+ .setPipeMemoryManagementEnabled(false)
+ .setIsPipeEnableMemoryCheck(false);
senderEnv.initClusterEnvironment();
receiverEnv.initClusterEnvironment();
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
index 01dc50a5ce0..12f2857c7c1 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
@@ -74,7 +74,10 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
.setAutoCreateSchemaEnabled(true)
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
- .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
+ .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+ .setDnConnectionTimeoutMs(600000)
+ .setPipeMemoryManagementEnabled(false)
+ .setIsPipeEnableMemoryCheck(false);
receiverEnv
.getConfig()
@@ -84,11 +87,10 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
.setSchemaReplicationFactor(3)
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
- .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
-
- // 10 min, assert that the operations will not time out
- senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
- receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
+ .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+ .setDnConnectionTimeoutMs(600000)
+ .setPipeMemoryManagementEnabled(false)
+ .setIsPipeEnableMemoryCheck(false);
senderEnv.initClusterEnvironment(3, 3, 180);
receiverEnv.initClusterEnvironment(3, 3, 180);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java
index bfa5581d290..6017556a265 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java
@@ -62,6 +62,7 @@ public class IoTDBPipeProcessorIT extends
AbstractPipeDualTreeModelAutoIT {
.setTimestampPrecision("ms")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false);
receiverEnv
.getConfig()
@@ -69,6 +70,7 @@ public class IoTDBPipeProcessorIT extends
AbstractPipeDualTreeModelAutoIT {
.setAutoCreateSchemaEnabled(true)
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false);
// 10 min, assert that the operations will not time out
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java
index a823a3e2972..1249bf90e78 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java
@@ -77,6 +77,7 @@ public class IoTDBPipeProtocolIT extends
AbstractPipeDualTreeModelAutoIT {
.setDataRegionConsensusProtocolClass(dataRegionConsensus)
.setSchemaReplicationFactor(schemaRegionReplicationFactor)
.setDataReplicationFactor(dataRegionReplicationFactor)
+ .setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false);
receiverEnv
.getConfig()
@@ -87,6 +88,7 @@ public class IoTDBPipeProtocolIT extends
AbstractPipeDualTreeModelAutoIT {
.setDataRegionConsensusProtocolClass(dataRegionConsensus)
.setSchemaReplicationFactor(schemaRegionReplicationFactor)
.setDataReplicationFactor(dataRegionReplicationFactor)
+ .setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false);
// 10 min, assert that the operations will not time out
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
index 32aa520ddf2..caad46ea625 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
@@ -73,6 +73,7 @@ public class IoTDBPipeSourceIT extends
AbstractPipeDualTreeModelAutoIT {
.setEnableSeqSpaceCompaction(false)
.setEnableUnseqSpaceCompaction(false)
.setEnableCrossSpaceCompaction(false)
+ .setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false);
senderEnv.getConfig().getConfigNodeConfig().setLeaderDistributionPolicy("HASH");
receiverEnv
@@ -81,6 +82,7 @@ public class IoTDBPipeSourceIT extends
AbstractPipeDualTreeModelAutoIT {
.setAutoCreateSchemaEnabled(true)
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false);
receiverEnv.getConfig().getConfigNodeConfig().setLeaderDistributionPolicy("HASH");
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeAutoConflictIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeAutoConflictIT.java
index cecbdadeb45..c4747df7713 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeAutoConflictIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeAutoConflictIT.java
@@ -63,6 +63,7 @@ public class IoTDBPipeAutoConflictIT extends
AbstractPipeDualTreeModelAutoIT {
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+ .setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false);
receiverEnv
.getConfig()
@@ -71,6 +72,7 @@ public class IoTDBPipeAutoConflictIT extends
AbstractPipeDualTreeModelAutoIT {
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+ .setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false);
// 10 min, assert that the operations will not time out
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java
index c7c8bf73c44..2656b38f046 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java
@@ -79,6 +79,7 @@ public class IoTDBPipeClusterIT extends
AbstractPipeDualTreeModelAutoIT {
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+ .setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false);
senderEnv.getConfig().getConfigNodeConfig().setLeaderDistributionPolicy("HASH");
@@ -91,6 +92,7 @@ public class IoTDBPipeClusterIT extends
AbstractPipeDualTreeModelAutoIT {
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+ .setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false);
// 10 min, assert that the operations will not time out
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java
index b2c23dbed6a..10148979047 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java
@@ -69,6 +69,7 @@ public class IoTDBPipeIdempotentIT extends
AbstractPipeDualTreeModelAutoIT {
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setEnforceStrongPassword(false)
+ .setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false);
receiverEnv
@@ -78,6 +79,7 @@ public class IoTDBPipeIdempotentIT extends
AbstractPipeDualTreeModelAutoIT {
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setEnforceStrongPassword(false)
+ .setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false);
// 10 min, assert that the operations will not time out
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
index 81991cd16f7..7e508c4b3eb 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
@@ -69,6 +69,7 @@ public class IoTDBPipeSinkCompressionIT extends
AbstractPipeDualTreeModelAutoIT
.setAutoCreateSchemaEnabled(true)
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false);
receiverEnv
@@ -78,6 +79,7 @@ public class IoTDBPipeSinkCompressionIT extends
AbstractPipeDualTreeModelAutoIT
.setPipeAirGapReceiverEnabled(true)
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false);
// 10 min, assert that the operations will not time out
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeWithLoadIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeWithLoadIT.java
index 6c44f80d52d..aa1c2b7974c 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeWithLoadIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeWithLoadIT.java
@@ -63,6 +63,7 @@ public class IoTDBPipeWithLoadIT extends
AbstractPipeDualTreeModelAutoIT {
.setEnableSeqSpaceCompaction(false)
.setEnableUnseqSpaceCompaction(false)
.setEnableCrossSpaceCompaction(false)
+ .setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false);
receiverEnv
.getConfig()
@@ -70,6 +71,7 @@ public class IoTDBPipeWithLoadIT extends
AbstractPipeDualTreeModelAutoIT {
.setAutoCreateSchemaEnabled(true)
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false);
// 10 min, assert that the operations will not time out
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java
index 3894281f1b2..d18a3340eb4 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java
@@ -57,6 +57,7 @@ public abstract class AbstractPipeDualTreeModelManualIT {
.setAutoCreateSchemaEnabled(false)
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false);
senderEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:1");
@@ -66,6 +67,7 @@ public abstract class AbstractPipeDualTreeModelManualIT {
.setAutoCreateSchemaEnabled(false)
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false);
// 10 min, assert that the operations will not time out
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/cluster/IoTDBSubscriptionRestartIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/cluster/IoTDBSubscriptionRestartIT.java
index c118166847c..2042e0fa1ce 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/cluster/IoTDBSubscriptionRestartIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/cluster/IoTDBSubscriptionRestartIT.java
@@ -83,6 +83,7 @@ public class IoTDBSubscriptionRestartIT extends
AbstractSubscriptionIT {
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
.setSchemaReplicationFactor(3)
.setDataReplicationFactor(2)
+ .setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false);
EnvFactory.getEnv().initClusterEnvironment(3, 3);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/AbstractSubscriptionLocalIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/AbstractSubscriptionLocalIT.java
index 7b564023498..fe667480c86 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/AbstractSubscriptionLocalIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/AbstractSubscriptionLocalIT.java
@@ -37,6 +37,7 @@ public abstract class AbstractSubscriptionLocalIT extends
AbstractSubscriptionIT
.getConfig()
.getCommonConfig()
.setSubscriptionEnabled(true)
+ .setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false);
EnvFactory.getEnv().initClusterEnvironment();
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java
index 3f41c63f96c..bc0e6f90f3c 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java
@@ -74,6 +74,7 @@ public abstract class AbstractSubscriptionTripleIT extends
AbstractSubscriptionI
sender
.getConfig()
.getCommonConfig()
+ .setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false)
.setSubscriptionPrefetchTsFileBatchMaxDelayInMs(500)
.setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(32 * 1024);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java
b/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java
index da8ffdc6823..3c84a8fd9f9 100644
---
a/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java
@@ -57,6 +57,7 @@ public class ExportTsFileTestIT extends AbstractScriptIT {
.getConfig()
.getCommonConfig()
.setSubscriptionEnabled(true)
+ .setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false);
EnvFactory.getEnv().initClusterEnvironment();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 1286d26f6c1..f8cbe0b45b9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MetaProgressIndex;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.agent.task.PipeTask;
import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
@@ -170,20 +171,20 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
final PipeTaskMeta pipeTaskMeta)
throws IllegalPathException {
if (pipeTaskMeta.getLeaderNodeId() == CONFIG.getDataNodeId()) {
- final PipeParameters extractorParameters =
pipeStaticMeta.getExtractorParameters();
+ final PipeParameters sourceParameters =
pipeStaticMeta.getExtractorParameters();
final DataRegionId dataRegionId = new DataRegionId(consensusGroupId);
final boolean needConstructDataRegionTask =
StorageEngine.getInstance().getAllDataRegionIds().contains(dataRegionId)
&& DataRegionListeningFilter.shouldDataRegionBeListened(
- extractorParameters, dataRegionId);
+ sourceParameters, dataRegionId);
final boolean needConstructSchemaRegionTask =
SchemaEngine.getInstance()
.getAllSchemaRegionIds()
.contains(new SchemaRegionId(consensusGroupId))
&& SchemaRegionListeningFilter.shouldSchemaRegionBeListened(
- consensusGroupId, extractorParameters);
+ consensusGroupId, sourceParameters);
- // Advance the extractor parameters parsing logic to avoid creating
un-relevant pipeTasks
+ // Advance the source parameters parsing logic to avoid creating
un-relevant pipeTasks
if (
// For external source
PipeRuntimeMeta.isSourceExternal(consensusGroupId)
@@ -426,7 +427,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
|| pipeTaskMap.entrySet().stream()
.filter(entry -> dataRegionIds.contains(entry.getKey()))
.allMatch(entry -> ((PipeDataNodeTask)
entry.getValue()).isCompleted());
- final String extractorModeValue =
+ final String sourceModeValue =
pipeMeta
.getStaticMeta()
.getExtractorParameters()
@@ -438,9 +439,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(
pipeMeta.getStaticMeta().getExtractorParameters())
.getLeft()
- && (extractorModeValue.equalsIgnoreCase(
- PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE)
- || extractorModeValue.equalsIgnoreCase(
+ &&
(sourceModeValue.equalsIgnoreCase(PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE)
+ || sourceModeValue.equalsIgnoreCase(
PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE));
final boolean isCompleted = isAllDataRegionCompleted &&
includeDataAndNeedDrop;
@@ -595,12 +595,12 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
Arrays.asList(EXTRACTOR_MODE_SNAPSHOT_KEY,
SOURCE_MODE_SNAPSHOT_KEY),
EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE);
} else {
- final String extractorModeValue =
+ final String sourceModeValue =
parameters.getStringOrDefault(
Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY),
EXTRACTOR_MODE_DEFAULT_VALUE);
isSnapshotMode =
- extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE)
- ||
extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE);
+ sourceModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE)
+ || sourceModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE);
}
return isSnapshotMode;
}
@@ -689,24 +689,24 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
@Override
protected void calculateMemoryUsage(
- final PipeParameters extractorParameters,
+ final PipeStaticMeta staticMeta,
+ final PipeParameters sourceParameters,
final PipeParameters processorParameters,
- final PipeParameters connectorParameters) {
- if (!PipeConfig.getInstance().isPipeEnableMemoryCheck()) {
+ final PipeParameters sinkParameters) {
+ if (!PipeConfig.getInstance().isPipeEnableMemoryCheck()
+ || !isInnerSource(sourceParameters)
+ || !PipeType.USER.equals(staticMeta.getPipeType())) {
return;
}
- calculateInsertNodeQueueMemory(extractorParameters, processorParameters,
connectorParameters);
+ calculateInsertNodeQueueMemory(sourceParameters);
long needMemory = 0;
- needMemory +=
- calculateTsFileParserMemory(extractorParameters, processorParameters,
connectorParameters);
- needMemory +=
- calculateSinkBatchMemory(extractorParameters, processorParameters,
connectorParameters);
- needMemory +=
- calculateSendTsFileReadBufferMemory(
- extractorParameters, processorParameters, connectorParameters);
+ needMemory += calculateTsFileParserMemory(sourceParameters,
sinkParameters);
+ needMemory += calculateSinkBatchMemory(sinkParameters);
+ needMemory += calculateSendTsFileReadBufferMemory(sourceParameters,
sinkParameters);
+ needMemory += calculateAssignerMemory(sourceParameters);
PipeMemoryManager pipeMemoryManager = PipeDataNodeResourceManager.memory();
final long freeMemorySizeInBytes =
pipeMemoryManager.getFreeMemorySizeInBytes();
@@ -727,13 +727,22 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
}
}
- private void calculateInsertNodeQueueMemory(
- final PipeParameters extractorParameters,
- final PipeParameters processorParameters,
- final PipeParameters connectorParameters) {
+ private boolean isInnerSource(final PipeParameters sourceParameters) {
+ final String pluginName =
+ sourceParameters
+ .getStringOrDefault(
+ Arrays.asList(PipeSourceConstant.EXTRACTOR_KEY,
PipeSourceConstant.SOURCE_KEY),
+ BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
+ .toLowerCase();
+
+ return
pluginName.equals(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
+ ||
pluginName.equals(BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName());
+ }
+
+ private void calculateInsertNodeQueueMemory(final PipeParameters
sourceParameters) {
- // Realtime extractor is enabled by default, so we only need to check the
source realtime
- if (!extractorParameters.getBooleanOrDefault(
+ // Realtime source is enabled by default, so we only need to check the
source realtime
+ if (!sourceParameters.getBooleanOrDefault(
Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY,
SOURCE_REALTIME_ENABLE_KEY),
EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) {
return;
@@ -741,7 +750,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
// If the realtime mode is batch or file, we do not need to allocate memory
final String realtimeMode =
- extractorParameters.getStringByKeys(
+ sourceParameters.getStringByKeys(
PipeSourceConstant.EXTRACTOR_REALTIME_MODE_KEY,
PipeSourceConstant.SOURCE_REALTIME_MODE_KEY);
if
(PipeSourceConstant.EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE.equals(realtimeMode)
@@ -764,53 +773,50 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
}
private long calculateTsFileParserMemory(
- final PipeParameters extractorParameters,
- final PipeParameters processorParameters,
- final PipeParameters connectorParameters) {
+ final PipeParameters sourceParameters, final PipeParameters
sinkParameters) {
- // If the extractor is not history, we do not need to allocate memory
+ // If the source is not history, we do not need to allocate memory
boolean isExtractorHistory =
- extractorParameters.getBooleanOrDefault(
+ sourceParameters.getBooleanOrDefault(
SystemConstant.RESTART_KEY,
SystemConstant.RESTART_DEFAULT_VALUE)
- || extractorParameters.getBooleanOrDefault(
+ || sourceParameters.getBooleanOrDefault(
Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY,
SOURCE_HISTORY_ENABLE_KEY),
EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
- // If the extractor is history, and has start/end time, we need to
allocate memory
+ // If the source is history, and has start/end time, we need to allocate
memory
boolean isTSFileParser =
isExtractorHistory
- && extractorParameters.hasAnyAttributes(
+ && sourceParameters.hasAnyAttributes(
EXTRACTOR_HISTORY_START_TIME_KEY,
SOURCE_HISTORY_START_TIME_KEY);
isTSFileParser =
isTSFileParser
|| (isExtractorHistory
- && extractorParameters.hasAnyAttributes(
+ && sourceParameters.hasAnyAttributes(
EXTRACTOR_HISTORY_END_TIME_KEY,
SOURCE_HISTORY_END_TIME_KEY));
- // if the extractor has start/end time, we need to allocate memory
+ // if the source has start/end time, we need to allocate memory
isTSFileParser =
isTSFileParser
- || extractorParameters.hasAnyAttributes(
- SOURCE_START_TIME_KEY, EXTRACTOR_START_TIME_KEY);
+ || sourceParameters.hasAnyAttributes(SOURCE_START_TIME_KEY,
EXTRACTOR_START_TIME_KEY);
isTSFileParser =
isTSFileParser
- || extractorParameters.hasAnyAttributes(SOURCE_END_TIME_KEY,
EXTRACTOR_END_TIME_KEY);
+ || sourceParameters.hasAnyAttributes(SOURCE_END_TIME_KEY,
EXTRACTOR_END_TIME_KEY);
- // If the extractor has pattern or path, we need to allocate memory
+ // If the source has pattern or path, we need to allocate memory
isTSFileParser =
isTSFileParser
- || extractorParameters.hasAnyAttributes(EXTRACTOR_PATTERN_KEY,
SOURCE_PATTERN_KEY);
+ || sourceParameters.hasAnyAttributes(EXTRACTOR_PATTERN_KEY,
SOURCE_PATTERN_KEY);
isTSFileParser =
- isTSFileParser ||
extractorParameters.hasAnyAttributes(EXTRACTOR_PATH_KEY, SOURCE_PATH_KEY);
+ isTSFileParser ||
sourceParameters.hasAnyAttributes(EXTRACTOR_PATH_KEY, SOURCE_PATH_KEY);
- // If the extractor is not hybrid, we do need to allocate memory
+ // If the source is not hybrid, we do need to allocate memory
isTSFileParser =
isTSFileParser
|| !PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE.equals(
- connectorParameters.getStringOrDefault(
+ sinkParameters.getStringOrDefault(
Arrays.asList(
PipeSinkConstant.CONNECTOR_FORMAT_KEY,
PipeSinkConstant.SINK_FORMAT_KEY),
PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE));
@@ -822,15 +828,12 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
return PipeConfig.getInstance().getTsFileParserMemory();
}
- private long calculateSinkBatchMemory(
- final PipeParameters extractorParameters,
- final PipeParameters processorParameters,
- final PipeParameters connectorParameters) {
+ private long calculateSinkBatchMemory(final PipeParameters sinkParameters) {
- // If the connector format is tsfile , we need to use batch
+ // If the sink format is tsfile , we need to use batch
boolean needUseBatch =
PipeSinkConstant.CONNECTOR_FORMAT_TS_FILE_VALUE.equals(
- connectorParameters.getStringOrDefault(
+ sinkParameters.getStringOrDefault(
Arrays.asList(
PipeSinkConstant.CONNECTOR_FORMAT_KEY,
PipeSinkConstant.SINK_FORMAT_KEY),
PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE));
@@ -839,9 +842,9 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
return PipeConfig.getInstance().getSinkBatchMemoryTsFile();
}
- // If the connector is batch mode, we need to use batch
+ // If the sink is batch mode, we need to use batch
needUseBatch =
- connectorParameters.getBooleanOrDefault(
+ sinkParameters.getBooleanOrDefault(
Arrays.asList(
PipeSinkConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY,
PipeSinkConstant.SINK_IOTDB_BATCH_MODE_ENABLE_KEY),
@@ -855,23 +858,21 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
}
private long calculateSendTsFileReadBufferMemory(
- final PipeParameters extractorParameters,
- final PipeParameters processorParameters,
- final PipeParameters connectorParameters) {
- // If the extractor is history enable, we need to transfer tsfile
+ final PipeParameters sourceParameters, final PipeParameters
sinkParameters) {
+ // If the source is history enable, we need to transfer tsfile
boolean needTransferTsFile =
- extractorParameters.getBooleanOrDefault(
+ sourceParameters.getBooleanOrDefault(
SystemConstant.RESTART_KEY,
SystemConstant.RESTART_DEFAULT_VALUE)
- || extractorParameters.getBooleanOrDefault(
+ || sourceParameters.getBooleanOrDefault(
Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY,
SOURCE_HISTORY_ENABLE_KEY),
EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
String format =
- connectorParameters.getStringOrDefault(
+ sinkParameters.getStringOrDefault(
Arrays.asList(PipeSinkConstant.CONNECTOR_FORMAT_KEY,
PipeSinkConstant.SINK_FORMAT_KEY),
PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE);
- // If the connector format is tsfile and hybrid, we need to transfer tsfile
+ // If the sink format is tsfile and hybrid, we need to transfer tsfile
needTransferTsFile =
needTransferTsFile
|| PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE.equals(format)
@@ -883,4 +884,19 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
return PipeConfig.getInstance().getSendTsFileReadBuffer();
}
+
+ private long calculateAssignerMemory(final PipeParameters sourceParameters) {
+ try {
+ if (!PipeInsertionDataNodeListener.getInstance().isEmpty()
+ ||
!DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(sourceParameters)
+ .getLeft()) {
+ return 0;
+ }
+ return
PipeConfig.getInstance().getPipeExtractorAssignerDisruptorRingBufferSize()
+ *
PipeConfig.getInstance().getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes()
+ * Math.min(StorageEngine.getInstance().getDataRegionNumber(), 10);
+ } catch (final IllegalPathException e) {
+ return 0;
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
index b14de1699ed..af38d626f4e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
@@ -119,7 +119,6 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
LoggerFactory.getLogger(PipeHistoricalDataRegionTsFileAndDeletionSource.class);
private static final Map<Integer, Long>
DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP = new HashMap<>();
- private static final long PIPE_MIN_FLUSH_INTERVAL_IN_MS = 2000;
private static final String TREE_MODEL_EVENT_TABLE_NAME_PREFIX = PATH_ROOT +
PATH_SEPARATOR;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
index 088909f5888..dff966e72e8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
@@ -172,6 +172,10 @@ public class PipeInsertionDataNodeListener {
PipeRealtimeEventFactory.createRealtimeEvent(key,
shouldPrintMessage)));
}
+ public boolean isEmpty() {
+ return dataRegionId2Assigner.isEmpty();
+ }
+
//////////////////////////// Permission change ////////////////////////////
public void invalidateAllCache() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 8666d2aefcd..345f4680069 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -251,8 +251,8 @@ public class CommonConfig {
private long pipeMaxWaitFinishTime = 10 * 1000;
- private int pipeExtractorAssignerDisruptorRingBufferSize = 65536;
- private long pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes = 50;
// 50B
+ private int pipeExtractorAssignerDisruptorRingBufferSize = 128;
+ private long pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes = 72 *
KB;
private long pipeExtractorMatcherCacheSize = 1024;
private int pipeConnectorHandshakeTimeoutMs = 10 * 1000; // 10 seconds
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index 7848ba96769..779871def33 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -478,6 +478,7 @@ public abstract class PipeTaskAgent {
final long creationTime =
pipeMetaFromCoordinator.getStaticMeta().getCreationTime();
calculateMemoryUsage(
+ pipeMetaFromCoordinator.getStaticMeta(),
pipeMetaFromCoordinator.getStaticMeta().getExtractorParameters(),
pipeMetaFromCoordinator.getStaticMeta().getProcessorParameters(),
pipeMetaFromCoordinator.getStaticMeta().getConnectorParameters());
@@ -523,6 +524,7 @@ public abstract class PipeTaskAgent {
}
protected void calculateMemoryUsage(
+ final PipeStaticMeta staticMeta,
final PipeParameters extractorParameters,
final PipeParameters processorParameters,
final PipeParameters connectorParameters) {