This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/2.0.5
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f1517cade60680050a2b2a3b1c69ed3e822ef2b0
Author: Caideyipi <[email protected]>
AuthorDate: Fri Aug 1 14:11:10 2025 +0800

    Pipe: Fixed the default param of single entry of disruptor queue & Banned 
memory checks from some missing pipe ITs & Do not check non-user pipes (#16069)
    
    * Update CommonConfig.java
    
    * refactor
    
    * try-fix
    
    * test
    
    * revert-pom
    
    * fix
    
    * fix
    
    (cherry picked from commit 19810e5efaff1c5e363bc72b81d75a41792b04ac)
---
 .../it/env/cluster/config/MppCommonConfig.java     |   6 +
 .../env/cluster/config/MppSharedCommonConfig.java  |   7 +
 .../it/env/remote/config/RemoteCommonConfig.java   |   5 +
 .../org/apache/iotdb/itbase/env/CommonConfig.java  |   2 +
 .../manual/AbstractPipeTableModelDualManualIT.java |   2 +
 .../manual/basic/IoTDBPipePermissionIT.java        |   6 +-
 .../manual/basic/IoTDBPipeProtocolIT.java          |  42 +++---
 .../tablemodel/manual/basic/IoTDBPipeSourceIT.java |  15 ++-
 .../manual/enhanced/IoTDBPipeClusterIT.java        |  14 +-
 .../treemodel/auto/basic/IoTDBPipeProcessorIT.java |   2 +
 .../treemodel/auto/basic/IoTDBPipeProtocolIT.java  |   2 +
 .../treemodel/auto/basic/IoTDBPipeSourceIT.java    |   2 +
 .../auto/enhanced/IoTDBPipeAutoConflictIT.java     |   2 +
 .../auto/enhanced/IoTDBPipeClusterIT.java          |   2 +
 .../auto/enhanced/IoTDBPipeIdempotentIT.java       |   2 +
 .../auto/enhanced/IoTDBPipeSinkCompressionIT.java  |   2 +
 .../auto/enhanced/IoTDBPipeWithLoadIT.java         |   2 +
 .../manual/AbstractPipeDualTreeModelManualIT.java  |   2 +
 .../it/cluster/IoTDBSubscriptionRestartIT.java     |   1 +
 .../it/local/AbstractSubscriptionLocalIT.java      |   1 +
 .../it/triple/AbstractSubscriptionTripleIT.java    |   1 +
 .../apache/iotdb/tools/it/ExportTsFileTestIT.java  |   1 +
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  | 142 ++++++++++++---------
 ...istoricalDataRegionTsFileAndDeletionSource.java |   1 -
 .../listener/PipeInsertionDataNodeListener.java    |   4 +
 .../apache/iotdb/commons/conf/CommonConfig.java    |   4 +-
 .../commons/pipe/agent/task/PipeTaskAgent.java     |   2 +
 27 files changed, 177 insertions(+), 97 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index 78baab77218..ef3856b8068 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -453,6 +453,12 @@ public class MppCommonConfig extends MppBaseConfig 
implements CommonConfig {
     return this;
   }
 
+  @Override
+  public CommonConfig setPipeMemoryManagementEnabled(boolean 
pipeMemoryManagementEnabled) {
+    setProperty("pipe_memory_management_enabled", 
String.valueOf(pipeMemoryManagementEnabled));
+    return this;
+  }
+
   @Override
   public CommonConfig setIsPipeEnableMemoryCheck(boolean 
isPipeEnableMemoryCheck) {
     setProperty("pipe_enable_memory_checked", 
String.valueOf(isPipeEnableMemoryCheck));
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index 6db1003f5b9..59578423c61 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -466,6 +466,13 @@ public class MppSharedCommonConfig implements CommonConfig 
{
     return this;
   }
 
+  @Override
+  public CommonConfig setPipeMemoryManagementEnabled(boolean 
pipeMemoryManagementEnabled) {
+    dnConfig.setPipeMemoryManagementEnabled(pipeMemoryManagementEnabled);
+    cnConfig.setPipeMemoryManagementEnabled(pipeMemoryManagementEnabled);
+    return this;
+  }
+
   @Override
   public CommonConfig setIsPipeEnableMemoryCheck(boolean 
isPipeEnableMemoryCheck) {
     dnConfig.setIsPipeEnableMemoryCheck(isPipeEnableMemoryCheck);
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index 55a772a60da..62526dca5b2 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -328,6 +328,11 @@ public class RemoteCommonConfig implements CommonConfig {
     return this;
   }
 
+  @Override
+  public CommonConfig setPipeMemoryManagementEnabled(boolean 
pipeMemoryManagementEnabled) {
+    return this;
+  }
+
   @Override
   public CommonConfig setIsPipeEnableMemoryCheck(boolean 
isPipeEnableMemoryCheck) {
     return this;
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index 6f02ebfc770..ba503551e9c 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -146,6 +146,8 @@ public interface CommonConfig {
 
   CommonConfig setSchemaRegionPerDataNode(double schemaRegionPerDataNode);
 
+  CommonConfig setPipeMemoryManagementEnabled(boolean 
pipeMemoryManagementEnabled);
+
   CommonConfig setIsPipeEnableMemoryCheck(boolean isPipeEnableMemoryCheck);
 
   CommonConfig setPipeAirGapReceiverEnabled(boolean 
isPipeAirGapReceiverEnabled);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
index dd921f33bc7..91fea245335 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
@@ -49,6 +49,7 @@ public abstract class AbstractPipeTableModelDualManualIT {
         .setAutoCreateSchemaEnabled(true)
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false);
     receiverEnv
         .getConfig()
@@ -56,6 +57,7 @@ public abstract class AbstractPipeTableModelDualManualIT {
         .setAutoCreateSchemaEnabled(true)
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false);
 
     // 10 min, assert that the operations will not time out
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
index 7a3f90e687a..8c56fd6a799 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
@@ -68,7 +68,9 @@ public class IoTDBPipePermissionIT extends 
AbstractPipeTableModelDualManualIT {
         .setDefaultSchemaRegionGroupNumPerDatabase(1)
         .setTimestampPrecision("ms")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
-        
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
+        
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setPipeMemoryManagementEnabled(false)
+        .setIsPipeEnableMemoryCheck(false);
     receiverEnv
         .getConfig()
         .getCommonConfig()
@@ -77,6 +79,8 @@ public class IoTDBPipePermissionIT extends 
AbstractPipeTableModelDualManualIT {
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+        .setPipeMemoryManagementEnabled(false)
+        .setIsPipeEnableMemoryCheck(false)
         .setSchemaReplicationFactor(3)
         .setDataReplicationFactor(2);
 
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java
index 4e18091bd07..939e84b9f49 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java
@@ -76,7 +76,10 @@ public class IoTDBPipeProtocolIT extends 
AbstractPipeTableModelDualManualIT {
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setDataRegionConsensusProtocolClass(dataRegionConsensus)
         .setSchemaReplicationFactor(schemaRegionReplicationFactor)
-        .setDataReplicationFactor(dataRegionReplicationFactor);
+        .setDataReplicationFactor(dataRegionReplicationFactor)
+        .setDnConnectionTimeoutMs(600000)
+        .setPipeMemoryManagementEnabled(false)
+        .setIsPipeEnableMemoryCheck(false);
     receiverEnv
         .getConfig()
         .getCommonConfig()
@@ -85,11 +88,10 @@ public class IoTDBPipeProtocolIT extends 
AbstractPipeTableModelDualManualIT {
         .setSchemaRegionConsensusProtocolClass(schemaRegionConsensus)
         .setDataRegionConsensusProtocolClass(dataRegionConsensus)
         .setSchemaReplicationFactor(schemaRegionReplicationFactor)
-        .setDataReplicationFactor(dataRegionReplicationFactor);
-
-    // 10 min, assert that the operations will not time out
-    senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
-    receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
+        .setDataReplicationFactor(dataRegionReplicationFactor)
+        .setDnConnectionTimeoutMs(600000)
+        .setPipeMemoryManagementEnabled(false)
+        .setIsPipeEnableMemoryCheck(false);
 
     senderEnv.initClusterEnvironment(configNodesNum, dataNodesNum);
     receiverEnv.initClusterEnvironment(configNodesNum, dataNodesNum);
@@ -170,7 +172,10 @@ public class IoTDBPipeProtocolIT extends 
AbstractPipeTableModelDualManualIT {
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
         .setSchemaReplicationFactor(3)
-        .setDataReplicationFactor(2);
+        .setDataReplicationFactor(2)
+        .setDnConnectionTimeoutMs(600000)
+        .setPipeMemoryManagementEnabled(false)
+        .setIsPipeEnableMemoryCheck(false);
     receiverEnv
         .getConfig()
         .getCommonConfig()
@@ -179,11 +184,10 @@ public class IoTDBPipeProtocolIT extends 
AbstractPipeTableModelDualManualIT {
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
         .setSchemaReplicationFactor(1)
-        .setDataReplicationFactor(1);
-
-    // 10 min, assert that the operations will not time out
-    senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
-    receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
+        .setDataReplicationFactor(1)
+        .setDnConnectionTimeoutMs(600000)
+        .setPipeMemoryManagementEnabled(false)
+        .setIsPipeEnableMemoryCheck(false);
 
     senderEnv.initClusterEnvironment(3, 3);
     receiverEnv.initClusterEnvironment(1, 1);
@@ -379,7 +383,10 @@ public class IoTDBPipeProtocolIT extends 
AbstractPipeTableModelDualManualIT {
         .setDataReplicationFactor(1)
         .setEnableSeqSpaceCompaction(false)
         .setEnableUnseqSpaceCompaction(false)
-        .setEnableCrossSpaceCompaction(false);
+        .setEnableCrossSpaceCompaction(false)
+        .setDnConnectionTimeoutMs(600000)
+        .setPipeMemoryManagementEnabled(false)
+        .setIsPipeEnableMemoryCheck(false);
     receiverEnv
         .getConfig()
         .getCommonConfig()
@@ -389,11 +396,10 @@ public class IoTDBPipeProtocolIT extends 
AbstractPipeTableModelDualManualIT {
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
         .setSchemaReplicationFactor(3)
-        .setDataReplicationFactor(2);
-
-    // 10 min, assert that the operations will not time out
-    senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
-    receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
+        .setDataReplicationFactor(2)
+        .setDnConnectionTimeoutMs(600000)
+        .setPipeMemoryManagementEnabled(false)
+        .setIsPipeEnableMemoryCheck(false);
 
     senderEnv.initClusterEnvironment(1, 1);
     receiverEnv.initClusterEnvironment(1, 3);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java
index 1b43ce561c0..83727deb1b8 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java
@@ -71,17 +71,20 @@ public class IoTDBPipeSourceIT extends 
AbstractPipeTableModelDualManualIT {
         // Disable sender compaction for tsfile determination in loose range 
test
         .setEnableSeqSpaceCompaction(false)
         .setEnableUnseqSpaceCompaction(false)
-        .setEnableCrossSpaceCompaction(false);
+        .setEnableCrossSpaceCompaction(false)
+        .setDnConnectionTimeoutMs(600000)
+        .setPipeMemoryManagementEnabled(false)
+        .setIsPipeEnableMemoryCheck(false);
+
     receiverEnv
         .getConfig()
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
-        
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
-
-    // 10 min, assert that the operations will not time out
-    senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
-    receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
+        
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setDnConnectionTimeoutMs(600000)
+        .setPipeMemoryManagementEnabled(false)
+        .setIsPipeEnableMemoryCheck(false);
 
     senderEnv.initClusterEnvironment();
     receiverEnv.initClusterEnvironment();
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
index 985bfe752b6..3911b1f4666 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
@@ -74,7 +74,10 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelDualManualIT {
         .setAutoCreateSchemaEnabled(true)
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
-        .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
+        .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+        .setDnConnectionTimeoutMs(600000)
+        .setPipeMemoryManagementEnabled(false)
+        .setIsPipeEnableMemoryCheck(false);
 
     receiverEnv
         .getConfig()
@@ -84,11 +87,10 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelDualManualIT {
         .setSchemaReplicationFactor(3)
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
-        .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
-
-    // 10 min, assert that the operations will not time out
-    senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
-    receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
+        .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+        .setDnConnectionTimeoutMs(600000)
+        .setPipeMemoryManagementEnabled(false)
+        .setIsPipeEnableMemoryCheck(false);
 
     senderEnv.initClusterEnvironment(3, 3, 180);
     receiverEnv.initClusterEnvironment(3, 3, 180);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java
index 9c755c1b0bf..95dc04df4c6 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java
@@ -62,6 +62,7 @@ public class IoTDBPipeProcessorIT extends 
AbstractPipeDualTreeModelAutoIT {
         .setTimestampPrecision("ms")
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false);
     receiverEnv
         .getConfig()
@@ -69,6 +70,7 @@ public class IoTDBPipeProcessorIT extends 
AbstractPipeDualTreeModelAutoIT {
         .setAutoCreateSchemaEnabled(true)
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false);
 
     // 10 min, assert that the operations will not time out
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java
index 4358bc2a203..1cb822d0aa5 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java
@@ -77,6 +77,7 @@ public class IoTDBPipeProtocolIT extends 
AbstractPipeDualTreeModelAutoIT {
         .setDataRegionConsensusProtocolClass(dataRegionConsensus)
         .setSchemaReplicationFactor(schemaRegionReplicationFactor)
         .setDataReplicationFactor(dataRegionReplicationFactor)
+        .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false);
     receiverEnv
         .getConfig()
@@ -87,6 +88,7 @@ public class IoTDBPipeProtocolIT extends 
AbstractPipeDualTreeModelAutoIT {
         .setDataRegionConsensusProtocolClass(dataRegionConsensus)
         .setSchemaReplicationFactor(schemaRegionReplicationFactor)
         .setDataReplicationFactor(dataRegionReplicationFactor)
+        .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false);
 
     // 10 min, assert that the operations will not time out
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
index 060f96f36a9..fb766a1fedf 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
@@ -73,6 +73,7 @@ public class IoTDBPipeSourceIT extends 
AbstractPipeDualTreeModelAutoIT {
         .setEnableSeqSpaceCompaction(false)
         .setEnableUnseqSpaceCompaction(false)
         .setEnableCrossSpaceCompaction(false)
+        .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false);
     receiverEnv
         .getConfig()
@@ -80,6 +81,7 @@ public class IoTDBPipeSourceIT extends 
AbstractPipeDualTreeModelAutoIT {
         .setAutoCreateSchemaEnabled(true)
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false);
 
     // 10 min, assert that the operations will not time out
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeAutoConflictIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeAutoConflictIT.java
index ac70fc47349..e12b96ada17 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeAutoConflictIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeAutoConflictIT.java
@@ -62,6 +62,7 @@ public class IoTDBPipeAutoConflictIT extends 
AbstractPipeDualTreeModelAutoIT {
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+        .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false);
     receiverEnv
         .getConfig()
@@ -70,6 +71,7 @@ public class IoTDBPipeAutoConflictIT extends 
AbstractPipeDualTreeModelAutoIT {
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+        .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false);
 
     // 10 min, assert that the operations will not time out
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java
index afde95f701b..4b16ab5ca1b 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java
@@ -77,6 +77,7 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeDualTreeModelAutoIT {
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+        .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false);
 
     receiverEnv
@@ -88,6 +89,7 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeDualTreeModelAutoIT {
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+        .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false);
 
     // 10 min, assert that the operations will not time out
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java
index 6034a9be97d..63b81881b8d 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java
@@ -68,6 +68,7 @@ public class IoTDBPipeIdempotentIT extends 
AbstractPipeDualTreeModelAutoIT {
         .setDefaultSchemaRegionGroupNumPerDatabase(1)
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false);
     receiverEnv
         .getConfig()
@@ -75,6 +76,7 @@ public class IoTDBPipeIdempotentIT extends 
AbstractPipeDualTreeModelAutoIT {
         .setAutoCreateSchemaEnabled(true)
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false);
 
     // 10 min, assert that the operations will not time out
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
index 9cdeb224151..0a35386c5f8 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
@@ -69,6 +69,7 @@ public class IoTDBPipeSinkCompressionIT extends 
AbstractPipeDualTreeModelAutoIT
         .setAutoCreateSchemaEnabled(true)
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false);
 
     receiverEnv
@@ -78,6 +79,7 @@ public class IoTDBPipeSinkCompressionIT extends 
AbstractPipeDualTreeModelAutoIT
         .setPipeAirGapReceiverEnabled(true)
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false);
 
     // 10 min, assert that the operations will not time out
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeWithLoadIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeWithLoadIT.java
index ea3d4252c44..1001dc79fd4 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeWithLoadIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeWithLoadIT.java
@@ -63,6 +63,7 @@ public class IoTDBPipeWithLoadIT extends 
AbstractPipeDualTreeModelAutoIT {
         .setEnableSeqSpaceCompaction(false)
         .setEnableUnseqSpaceCompaction(false)
         .setEnableCrossSpaceCompaction(false)
+        .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false);
     receiverEnv
         .getConfig()
@@ -70,6 +71,7 @@ public class IoTDBPipeWithLoadIT extends 
AbstractPipeDualTreeModelAutoIT {
         .setAutoCreateSchemaEnabled(true)
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false);
 
     // 10 min, assert that the operations will not time out
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java
index bf0fe81482d..44983d23df3 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java
@@ -57,6 +57,7 @@ public abstract class AbstractPipeDualTreeModelManualIT {
         .setAutoCreateSchemaEnabled(false)
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false);
     receiverEnv
         .getConfig()
@@ -64,6 +65,7 @@ public abstract class AbstractPipeDualTreeModelManualIT {
         .setAutoCreateSchemaEnabled(false)
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false);
 
     // 10 min, assert that the operations will not time out
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/cluster/IoTDBSubscriptionRestartIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/cluster/IoTDBSubscriptionRestartIT.java
index c118166847c..2042e0fa1ce 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/cluster/IoTDBSubscriptionRestartIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/cluster/IoTDBSubscriptionRestartIT.java
@@ -83,6 +83,7 @@ public class IoTDBSubscriptionRestartIT extends 
AbstractSubscriptionIT {
         .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
         .setSchemaReplicationFactor(3)
         .setDataReplicationFactor(2)
+        .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false);
 
     EnvFactory.getEnv().initClusterEnvironment(3, 3);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/AbstractSubscriptionLocalIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/AbstractSubscriptionLocalIT.java
index 7b564023498..fe667480c86 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/AbstractSubscriptionLocalIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/AbstractSubscriptionLocalIT.java
@@ -37,6 +37,7 @@ public abstract class AbstractSubscriptionLocalIT extends 
AbstractSubscriptionIT
         .getConfig()
         .getCommonConfig()
         .setSubscriptionEnabled(true)
+        .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false);
 
     EnvFactory.getEnv().initClusterEnvironment();
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java
index b2cfede0945..bf57f904c26 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java
@@ -74,6 +74,7 @@ public abstract class AbstractSubscriptionTripleIT extends 
AbstractSubscriptionI
     sender
         .getConfig()
         .getCommonConfig()
+        .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false)
         .setSubscriptionPrefetchTsFileBatchMaxDelayInMs(500)
         .setSubscriptionPrefetchTsFileBatchMaxSizeInBytes(32 * 1024);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java
index da8ffdc6823..3c84a8fd9f9 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java
@@ -57,6 +57,7 @@ public class ExportTsFileTestIT extends AbstractScriptIT {
         .getConfig()
         .getCommonConfig()
         .setSubscriptionEnabled(true)
+        .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false);
     EnvFactory.getEnv().initClusterEnvironment();
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 1286d26f6c1..f8cbe0b45b9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MetaProgressIndex;
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.pipe.agent.task.PipeTask;
 import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
@@ -170,20 +171,20 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
       final PipeTaskMeta pipeTaskMeta)
       throws IllegalPathException {
     if (pipeTaskMeta.getLeaderNodeId() == CONFIG.getDataNodeId()) {
-      final PipeParameters extractorParameters = 
pipeStaticMeta.getExtractorParameters();
+      final PipeParameters sourceParameters = 
pipeStaticMeta.getExtractorParameters();
       final DataRegionId dataRegionId = new DataRegionId(consensusGroupId);
       final boolean needConstructDataRegionTask =
           
StorageEngine.getInstance().getAllDataRegionIds().contains(dataRegionId)
               && DataRegionListeningFilter.shouldDataRegionBeListened(
-                  extractorParameters, dataRegionId);
+                  sourceParameters, dataRegionId);
       final boolean needConstructSchemaRegionTask =
           SchemaEngine.getInstance()
                   .getAllSchemaRegionIds()
                   .contains(new SchemaRegionId(consensusGroupId))
               && SchemaRegionListeningFilter.shouldSchemaRegionBeListened(
-                  consensusGroupId, extractorParameters);
+                  consensusGroupId, sourceParameters);
 
-      // Advance the extractor parameters parsing logic to avoid creating 
un-relevant pipeTasks
+      // Advance the source parameters parsing logic to avoid creating 
un-relevant pipeTasks
       if (
       // For external source
       PipeRuntimeMeta.isSourceExternal(consensusGroupId)
@@ -426,7 +427,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
                 || pipeTaskMap.entrySet().stream()
                     .filter(entry -> dataRegionIds.contains(entry.getKey()))
                     .allMatch(entry -> ((PipeDataNodeTask) 
entry.getValue()).isCompleted());
-        final String extractorModeValue =
+        final String sourceModeValue =
             pipeMeta
                 .getStaticMeta()
                 .getExtractorParameters()
@@ -438,9 +439,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
             
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(
                         pipeMeta.getStaticMeta().getExtractorParameters())
                     .getLeft()
-                && (extractorModeValue.equalsIgnoreCase(
-                        PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE)
-                    || extractorModeValue.equalsIgnoreCase(
+                && 
(sourceModeValue.equalsIgnoreCase(PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE)
+                    || sourceModeValue.equalsIgnoreCase(
                         PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE));
 
         final boolean isCompleted = isAllDataRegionCompleted && 
includeDataAndNeedDrop;
@@ -595,12 +595,12 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
               Arrays.asList(EXTRACTOR_MODE_SNAPSHOT_KEY, 
SOURCE_MODE_SNAPSHOT_KEY),
               EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE);
     } else {
-      final String extractorModeValue =
+      final String sourceModeValue =
           parameters.getStringOrDefault(
               Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY), 
EXTRACTOR_MODE_DEFAULT_VALUE);
       isSnapshotMode =
-          extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE)
-              || 
extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE);
+          sourceModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE)
+              || sourceModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE);
     }
     return isSnapshotMode;
   }
@@ -689,24 +689,24 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
 
   @Override
   protected void calculateMemoryUsage(
-      final PipeParameters extractorParameters,
+      final PipeStaticMeta staticMeta,
+      final PipeParameters sourceParameters,
       final PipeParameters processorParameters,
-      final PipeParameters connectorParameters) {
-    if (!PipeConfig.getInstance().isPipeEnableMemoryCheck()) {
+      final PipeParameters sinkParameters) {
+    if (!PipeConfig.getInstance().isPipeEnableMemoryCheck()
+        || !isInnerSource(sourceParameters)
+        || !PipeType.USER.equals(staticMeta.getPipeType())) {
       return;
     }
 
-    calculateInsertNodeQueueMemory(extractorParameters, processorParameters, 
connectorParameters);
+    calculateInsertNodeQueueMemory(sourceParameters);
 
     long needMemory = 0;
 
-    needMemory +=
-        calculateTsFileParserMemory(extractorParameters, processorParameters, 
connectorParameters);
-    needMemory +=
-        calculateSinkBatchMemory(extractorParameters, processorParameters, 
connectorParameters);
-    needMemory +=
-        calculateSendTsFileReadBufferMemory(
-            extractorParameters, processorParameters, connectorParameters);
+    needMemory += calculateTsFileParserMemory(sourceParameters, 
sinkParameters);
+    needMemory += calculateSinkBatchMemory(sinkParameters);
+    needMemory += calculateSendTsFileReadBufferMemory(sourceParameters, 
sinkParameters);
+    needMemory += calculateAssignerMemory(sourceParameters);
 
     PipeMemoryManager pipeMemoryManager = PipeDataNodeResourceManager.memory();
     final long freeMemorySizeInBytes = 
pipeMemoryManager.getFreeMemorySizeInBytes();
@@ -727,13 +727,22 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     }
   }
 
-  private void calculateInsertNodeQueueMemory(
-      final PipeParameters extractorParameters,
-      final PipeParameters processorParameters,
-      final PipeParameters connectorParameters) {
+  private boolean isInnerSource(final PipeParameters sourceParameters) {
+    final String pluginName =
+        sourceParameters
+            .getStringOrDefault(
+                Arrays.asList(PipeSourceConstant.EXTRACTOR_KEY, 
PipeSourceConstant.SOURCE_KEY),
+                BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
+            .toLowerCase();
+
+    return 
pluginName.equals(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName())
+        || 
pluginName.equals(BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName());
+  }
+
+  private void calculateInsertNodeQueueMemory(final PipeParameters 
sourceParameters) {
 
-    // Realtime extractor is enabled by default, so we only need to check the 
source realtime
-    if (!extractorParameters.getBooleanOrDefault(
+    // Realtime source is enabled by default, so we only need to check the 
source realtime
+    if (!sourceParameters.getBooleanOrDefault(
         Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, 
SOURCE_REALTIME_ENABLE_KEY),
         EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) {
       return;
@@ -741,7 +750,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
 
     // If the realtime mode is batch or file, we do not need to allocate memory
     final String realtimeMode =
-        extractorParameters.getStringByKeys(
+        sourceParameters.getStringByKeys(
             PipeSourceConstant.EXTRACTOR_REALTIME_MODE_KEY,
             PipeSourceConstant.SOURCE_REALTIME_MODE_KEY);
     if 
(PipeSourceConstant.EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE.equals(realtimeMode)
@@ -764,53 +773,50 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
   }
 
   private long calculateTsFileParserMemory(
-      final PipeParameters extractorParameters,
-      final PipeParameters processorParameters,
-      final PipeParameters connectorParameters) {
+      final PipeParameters sourceParameters, final PipeParameters 
sinkParameters) {
 
-    // If the extractor is not history, we do not need to allocate memory
+    // If the source is not history, we do not need to allocate memory
     boolean isExtractorHistory =
-        extractorParameters.getBooleanOrDefault(
+        sourceParameters.getBooleanOrDefault(
                 SystemConstant.RESTART_KEY, 
SystemConstant.RESTART_DEFAULT_VALUE)
-            || extractorParameters.getBooleanOrDefault(
+            || sourceParameters.getBooleanOrDefault(
                 Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, 
SOURCE_HISTORY_ENABLE_KEY),
                 EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
 
-    // If the extractor is history, and has start/end time, we need to 
allocate memory
+    // If the source is history, and has start/end time, we need to allocate 
memory
     boolean isTSFileParser =
         isExtractorHistory
-            && extractorParameters.hasAnyAttributes(
+            && sourceParameters.hasAnyAttributes(
                 EXTRACTOR_HISTORY_START_TIME_KEY, 
SOURCE_HISTORY_START_TIME_KEY);
 
     isTSFileParser =
         isTSFileParser
             || (isExtractorHistory
-                && extractorParameters.hasAnyAttributes(
+                && sourceParameters.hasAnyAttributes(
                     EXTRACTOR_HISTORY_END_TIME_KEY, 
SOURCE_HISTORY_END_TIME_KEY));
 
-    // if the extractor has start/end time, we need to allocate memory
+    // if the source has start/end time, we need to allocate memory
     isTSFileParser =
         isTSFileParser
-            || extractorParameters.hasAnyAttributes(
-                SOURCE_START_TIME_KEY, EXTRACTOR_START_TIME_KEY);
+            || sourceParameters.hasAnyAttributes(SOURCE_START_TIME_KEY, 
EXTRACTOR_START_TIME_KEY);
 
     isTSFileParser =
         isTSFileParser
-            || extractorParameters.hasAnyAttributes(SOURCE_END_TIME_KEY, 
EXTRACTOR_END_TIME_KEY);
+            || sourceParameters.hasAnyAttributes(SOURCE_END_TIME_KEY, 
EXTRACTOR_END_TIME_KEY);
 
-    // If the extractor has pattern or path, we need to allocate memory
+    // If the source has pattern or path, we need to allocate memory
     isTSFileParser =
         isTSFileParser
-            || extractorParameters.hasAnyAttributes(EXTRACTOR_PATTERN_KEY, 
SOURCE_PATTERN_KEY);
+            || sourceParameters.hasAnyAttributes(EXTRACTOR_PATTERN_KEY, 
SOURCE_PATTERN_KEY);
 
     isTSFileParser =
-        isTSFileParser || 
extractorParameters.hasAnyAttributes(EXTRACTOR_PATH_KEY, SOURCE_PATH_KEY);
+        isTSFileParser || 
sourceParameters.hasAnyAttributes(EXTRACTOR_PATH_KEY, SOURCE_PATH_KEY);
 
-    // If the extractor is not hybrid, we do need to allocate memory
+    // If the source is not hybrid, we do need to allocate memory
     isTSFileParser =
         isTSFileParser
             || !PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE.equals(
-                connectorParameters.getStringOrDefault(
+                sinkParameters.getStringOrDefault(
                     Arrays.asList(
                         PipeSinkConstant.CONNECTOR_FORMAT_KEY, 
PipeSinkConstant.SINK_FORMAT_KEY),
                     PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE));
@@ -822,15 +828,12 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     return PipeConfig.getInstance().getTsFileParserMemory();
   }
 
-  private long calculateSinkBatchMemory(
-      final PipeParameters extractorParameters,
-      final PipeParameters processorParameters,
-      final PipeParameters connectorParameters) {
+  private long calculateSinkBatchMemory(final PipeParameters sinkParameters) {
 
-    // If the connector format is tsfile , we need to use batch
+    // If the sink format is tsfile , we need to use batch
     boolean needUseBatch =
         PipeSinkConstant.CONNECTOR_FORMAT_TS_FILE_VALUE.equals(
-            connectorParameters.getStringOrDefault(
+            sinkParameters.getStringOrDefault(
                 Arrays.asList(
                     PipeSinkConstant.CONNECTOR_FORMAT_KEY, 
PipeSinkConstant.SINK_FORMAT_KEY),
                 PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE));
@@ -839,9 +842,9 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
       return PipeConfig.getInstance().getSinkBatchMemoryTsFile();
     }
 
-    // If the connector is batch mode, we need to use batch
+    // If the sink is batch mode, we need to use batch
     needUseBatch =
-        connectorParameters.getBooleanOrDefault(
+        sinkParameters.getBooleanOrDefault(
             Arrays.asList(
                 PipeSinkConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY,
                 PipeSinkConstant.SINK_IOTDB_BATCH_MODE_ENABLE_KEY),
@@ -855,23 +858,21 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
   }
 
   private long calculateSendTsFileReadBufferMemory(
-      final PipeParameters extractorParameters,
-      final PipeParameters processorParameters,
-      final PipeParameters connectorParameters) {
-    // If the extractor is history enable, we need to transfer tsfile
+      final PipeParameters sourceParameters, final PipeParameters 
sinkParameters) {
+    // If the source is history enable, we need to transfer tsfile
     boolean needTransferTsFile =
-        extractorParameters.getBooleanOrDefault(
+        sourceParameters.getBooleanOrDefault(
                 SystemConstant.RESTART_KEY, 
SystemConstant.RESTART_DEFAULT_VALUE)
-            || extractorParameters.getBooleanOrDefault(
+            || sourceParameters.getBooleanOrDefault(
                 Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, 
SOURCE_HISTORY_ENABLE_KEY),
                 EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
 
     String format =
-        connectorParameters.getStringOrDefault(
+        sinkParameters.getStringOrDefault(
             Arrays.asList(PipeSinkConstant.CONNECTOR_FORMAT_KEY, 
PipeSinkConstant.SINK_FORMAT_KEY),
             PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE);
 
-    // If the connector format is tsfile and hybrid, we need to transfer tsfile
+    // If the sink format is tsfile and hybrid, we need to transfer tsfile
     needTransferTsFile =
         needTransferTsFile
             || PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE.equals(format)
@@ -883,4 +884,19 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
 
     return PipeConfig.getInstance().getSendTsFileReadBuffer();
   }
+
+  private long calculateAssignerMemory(final PipeParameters sourceParameters) {
+    try {
+      if (!PipeInsertionDataNodeListener.getInstance().isEmpty()
+          || 
!DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(sourceParameters)
+              .getLeft()) {
+        return 0;
+      }
+      return 
PipeConfig.getInstance().getPipeExtractorAssignerDisruptorRingBufferSize()
+          * 
PipeConfig.getInstance().getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes()
+          * Math.min(StorageEngine.getInstance().getDataRegionNumber(), 10);
+    } catch (final IllegalPathException e) {
+      return 0;
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
index b14de1699ed..af38d626f4e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
@@ -119,7 +119,6 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
       
LoggerFactory.getLogger(PipeHistoricalDataRegionTsFileAndDeletionSource.class);
 
   private static final Map<Integer, Long> 
DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP = new HashMap<>();
-  private static final long PIPE_MIN_FLUSH_INTERVAL_IN_MS = 2000;
 
   private static final String TREE_MODEL_EVENT_TABLE_NAME_PREFIX = PATH_ROOT + 
PATH_SEPARATOR;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
index 088909f5888..dff966e72e8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
@@ -172,6 +172,10 @@ public class PipeInsertionDataNodeListener {
                 PipeRealtimeEventFactory.createRealtimeEvent(key, 
shouldPrintMessage)));
   }
 
+  public boolean isEmpty() {
+    return dataRegionId2Assigner.isEmpty();
+  }
+
   //////////////////////////// Permission change ////////////////////////////
 
   public void invalidateAllCache() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index f9697bd585f..604e65e153b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -251,8 +251,8 @@ public class CommonConfig {
 
   private long pipeMaxWaitFinishTime = 10 * 1000;
 
-  private int pipeExtractorAssignerDisruptorRingBufferSize = 65536;
-  private long pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes = 50; 
// 50B
+  private int pipeExtractorAssignerDisruptorRingBufferSize = 128;
+  private long pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes = 72 * 
KB;
   private long pipeExtractorMatcherCacheSize = 1024;
 
   private int pipeConnectorHandshakeTimeoutMs = 10 * 1000; // 10 seconds
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index 7848ba96769..779871def33 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -478,6 +478,7 @@ public abstract class PipeTaskAgent {
     final long creationTime = 
pipeMetaFromCoordinator.getStaticMeta().getCreationTime();
 
     calculateMemoryUsage(
+        pipeMetaFromCoordinator.getStaticMeta(),
         pipeMetaFromCoordinator.getStaticMeta().getExtractorParameters(),
         pipeMetaFromCoordinator.getStaticMeta().getProcessorParameters(),
         pipeMetaFromCoordinator.getStaticMeta().getConnectorParameters());
@@ -523,6 +524,7 @@ public abstract class PipeTaskAgent {
   }
 
   protected void calculateMemoryUsage(
+      final PipeStaticMeta staticMeta,
       final PipeParameters extractorParameters,
       final PipeParameters processorParameters,
       final PipeParameters connectorParameters) {

Reply via email to