This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a7665a0b2c Pipe: split full-sync pipe into history and realtime pipes 
(#16250)
6a7665a0b2c is described below

commit 6a7665a0b2c3612b13ed6dcd9111d5e956ebd326
Author: VGalaxies <[email protected]>
AuthorDate: Fri Aug 29 09:53:58 2025 +0800

    Pipe: split full-sync pipe into history and realtime pipes (#16250)
    
    * setup
    
    * minor improve
    
    * apply Copilot review
    
    * add internal config
    
    * fixup
    
    * fixup
    
    * add IoTDBPipeAutoSplitIT
    
    * set if not exists always to true to handle partial failure
---
 .../it/env/cluster/config/MppCommonConfig.java     |   6 ++
 .../env/cluster/config/MppSharedCommonConfig.java  |   7 ++
 .../it/env/remote/config/RemoteCommonConfig.java   |   5 +
 .../org/apache/iotdb/itbase/env/CommonConfig.java  |   2 +
 .../manual/AbstractPipeTableModelDualManualIT.java |   6 +-
 .../manual/basic/IoTDBPipePermissionIT.java        |   6 +-
 .../tablemodel/manual/basic/IoTDBPipeSourceIT.java |   6 +-
 .../enhanced/IoTDBPipeSinkCompressionIT.java       |   6 +-
 .../auto/AbstractPipeDualTreeModelAutoIT.java      |   6 +-
 .../treemodel/auto/basic/IoTDBPipeAutoSplitIT.java | 106 +++++++++++++++++++++
 .../treemodel/auto/basic/IoTDBPipeSourceIT.java    |   6 +-
 .../auto/enhanced/IoTDBPipeSinkCompressionIT.java  |   6 +-
 .../manual/AbstractPipeDualTreeModelManualIT.java  |   6 +-
 .../treemodel/manual/IoTDBPipePermissionIT.java    |   6 +-
 .../iotdb/pipe/it/single/AbstractPipeSingleIT.java |   3 +-
 .../relational/it/schema/IoTDBDatabaseIT.java      |  12 ++-
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |  17 ++++
 .../config/executor/ClusterConfigTaskExecutor.java |  74 ++++++++++++++
 .../apache/iotdb/commons/conf/CommonConfig.java    |  10 ++
 .../iotdb/commons/pipe/config/PipeConfig.java      |   8 ++
 .../iotdb/commons/pipe/config/PipeDescriptor.java  |   6 ++
 21 files changed, 289 insertions(+), 21 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index 8a0f6c483dd..818d674a9ea 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -538,6 +538,12 @@ public class MppCommonConfig extends MppBaseConfig 
implements CommonConfig {
     return this;
   }
 
+  @Override
+  public CommonConfig setPipeAutoSplitFullEnabled(boolean 
pipeAutoSplitFullEnabled) {
+    setProperty("pipe_auto_split_full_enabled", 
String.valueOf(pipeAutoSplitFullEnabled));
+    return this;
+  }
+
   @Override
   public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) {
     setProperty("chunk_timeseriesmeta_free_memory_proportion", 
queryMemoryProportion);
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index b58d5c132e8..002057ab3d7 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -554,6 +554,13 @@ public class MppSharedCommonConfig implements CommonConfig 
{
     return this;
   }
 
+  @Override
+  public CommonConfig setPipeAutoSplitFullEnabled(boolean 
pipeAutoSplitFullEnabled) {
+    dnConfig.setPipeAutoSplitFullEnabled(pipeAutoSplitFullEnabled);
+    cnConfig.setPipeAutoSplitFullEnabled(pipeAutoSplitFullEnabled);
+    return this;
+  }
+
   @Override
   public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) {
     dnConfig.setQueryMemoryProportion(queryMemoryProportion);
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index f18828f5afc..eff6c61deae 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -391,6 +391,11 @@ public class RemoteCommonConfig implements CommonConfig {
     return this;
   }
 
+  @Override
+  public CommonConfig setPipeAutoSplitFullEnabled(boolean 
pipeAutoSplitFullEnabled) {
+    return this;
+  }
+
   @Override
   public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) {
     return this;
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index d4cc7c2e1ed..ce4a57a807d 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -172,6 +172,8 @@ public interface CommonConfig {
   CommonConfig setPipeConnectorRequestSliceThresholdBytes(
       int pipeConnectorRequestSliceThresholdBytes);
 
+  CommonConfig setPipeAutoSplitFullEnabled(boolean pipeAutoSplitFullEnabled);
+
   CommonConfig setQueryMemoryProportion(String queryMemoryProportion);
 
   CommonConfig setDataNodeMemoryProportion(String dataNodeMemoryProportion);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
index 49b5c6d368a..55c0170aacd 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
@@ -51,7 +51,8 @@ public abstract class AbstractPipeTableModelDualManualIT {
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setEnforceStrongPassword(false)
         .setPipeMemoryManagementEnabled(false)
-        .setIsPipeEnableMemoryCheck(false);
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(false);
     receiverEnv
         .getConfig()
         .getCommonConfig()
@@ -60,7 +61,8 @@ public abstract class AbstractPipeTableModelDualManualIT {
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setEnforceStrongPassword(false)
         .setPipeMemoryManagementEnabled(false)
-        .setIsPipeEnableMemoryCheck(false);
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(false);
 
     // 10 min, assert that the operations will not time out
     senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
index 2275ddf505a..803880202d1 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
@@ -71,7 +71,8 @@ public class IoTDBPipePermissionIT extends 
AbstractPipeTableModelDualManualIT {
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setDnConnectionTimeoutMs(600000)
         .setPipeMemoryManagementEnabled(false)
-        .setIsPipeEnableMemoryCheck(false);
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(false);
     receiverEnv
         .getConfig()
         .getCommonConfig()
@@ -86,7 +87,8 @@ public class IoTDBPipePermissionIT extends 
AbstractPipeTableModelDualManualIT {
         .setDataReplicationFactor(2)
         .setDnConnectionTimeoutMs(600000)
         .setPipeMemoryManagementEnabled(false)
-        .setIsPipeEnableMemoryCheck(false);
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(false);
 
     senderEnv.initClusterEnvironment();
     receiverEnv.initClusterEnvironment(3, 3);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java
index a43e9f11c6f..d4cdf521304 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java
@@ -74,7 +74,8 @@ public class IoTDBPipeSourceIT extends 
AbstractPipeTableModelDualManualIT {
         .setEnableCrossSpaceCompaction(false)
         .setDnConnectionTimeoutMs(600000)
         .setPipeMemoryManagementEnabled(false)
-        .setIsPipeEnableMemoryCheck(false);
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(false);
     
senderEnv.getConfig().getConfigNodeConfig().setLeaderDistributionPolicy("HASH");
     receiverEnv
         .getConfig()
@@ -84,7 +85,8 @@ public class IoTDBPipeSourceIT extends 
AbstractPipeTableModelDualManualIT {
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setDnConnectionTimeoutMs(600000)
         .setPipeMemoryManagementEnabled(false)
-        .setIsPipeEnableMemoryCheck(false);
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(false);
 
     senderEnv.initClusterEnvironment();
     receiverEnv.initClusterEnvironment();
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java
index d1056effae3..74c43b8df34 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java
@@ -74,7 +74,8 @@ public class IoTDBPipeSinkCompressionIT extends 
AbstractPipeTableModelDualManual
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setDnConnectionTimeoutMs(600000)
         .setPipeMemoryManagementEnabled(false)
-        .setIsPipeEnableMemoryCheck(false);
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(false);
     
senderEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:1");
     receiverEnv
         .getConfig()
@@ -85,7 +86,8 @@ public class IoTDBPipeSinkCompressionIT extends 
AbstractPipeTableModelDualManual
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setDnConnectionTimeoutMs(600000)
         .setPipeMemoryManagementEnabled(false)
-        .setIsPipeEnableMemoryCheck(false);
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(false);
 
     senderEnv.initClusterEnvironment();
     receiverEnv.initClusterEnvironment();
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/AbstractPipeDualTreeModelAutoIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/AbstractPipeDualTreeModelAutoIT.java
index 9443256fac9..2e1cf5aff3d 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/AbstractPipeDualTreeModelAutoIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/AbstractPipeDualTreeModelAutoIT.java
@@ -59,7 +59,8 @@ public abstract class AbstractPipeDualTreeModelAutoIT {
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setEnforceStrongPassword(false)
         .setPipeMemoryManagementEnabled(false)
-        .setIsPipeEnableMemoryCheck(false);
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(false);
     
senderEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:1");
     receiverEnv
         .getConfig()
@@ -68,7 +69,8 @@ public abstract class AbstractPipeDualTreeModelAutoIT {
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setPipeMemoryManagementEnabled(false)
-        .setIsPipeEnableMemoryCheck(false);
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(false);
     
receiverEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:9:1");
 
     // 10 min, assert that the operations will not time out
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
new file mode 100644
index 00000000000..4cc50e97323
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.dual.treemodel.auto.basic;
+
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.it.env.MultiEnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeAutoBasic;
+import 
org.apache.iotdb.pipe.it.dual.treemodel.auto.AbstractPipeDualTreeModelAutoIT;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Objects;
+
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2DualTreeAutoBasic.class})
+public class IoTDBPipeAutoSplitIT extends AbstractPipeDualTreeModelAutoIT {
+
+  @Override
+  @Before
+  public void setUp() {
+    MultiEnvFactory.createEnv(2);
+    senderEnv = MultiEnvFactory.getEnv(0);
+    receiverEnv = MultiEnvFactory.getEnv(1);
+    senderEnv
+        .getConfig()
+        .getCommonConfig()
+        .setAutoCreateSchemaEnabled(true)
+        .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setEnforceStrongPassword(false)
+        .setPipeMemoryManagementEnabled(false)
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(true); // set auto split to true
+    receiverEnv
+        .getConfig()
+        .getCommonConfig()
+        .setAutoCreateSchemaEnabled(true)
+        .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setPipeMemoryManagementEnabled(false)
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(true); // set auto split to true
+    senderEnv.initClusterEnvironment();
+    receiverEnv.initClusterEnvironment();
+  }
+
+  @Test
+  public void testSingleEnv() throws Exception {
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    final String sql =
+        String.format(
+            "create pipe a2b with source ('source'='iotdb-source') with 
processor ('processor'='do-nothing-processor') with sink ('node-urls'='%s')",
+            receiverDataNode.getIpAndPortString());
+    try (final Connection connection = senderEnv.getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute(sql);
+    } catch (final SQLException e) {
+      fail(e.getMessage());
+    }
+
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      final List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
+      Assert.assertEquals(2, showPipeResult.size());
+      Assert.assertTrue(
+          (Objects.equals(showPipeResult.get(0).id, "a2b_history")
+                  && Objects.equals(showPipeResult.get(1).id, "a2b_realtime"))
+              || (Objects.equals(showPipeResult.get(1).id, "a2b_history")
+                  && Objects.equals(showPipeResult.get(0).id, 
"a2b_realtime")));
+    }
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
index caad46ea625..6f95d3d1f6c 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
@@ -74,7 +74,8 @@ public class IoTDBPipeSourceIT extends 
AbstractPipeDualTreeModelAutoIT {
         .setEnableUnseqSpaceCompaction(false)
         .setEnableCrossSpaceCompaction(false)
         .setPipeMemoryManagementEnabled(false)
-        .setIsPipeEnableMemoryCheck(false);
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(false);
     
senderEnv.getConfig().getConfigNodeConfig().setLeaderDistributionPolicy("HASH");
     receiverEnv
         .getConfig()
@@ -83,7 +84,8 @@ public class IoTDBPipeSourceIT extends 
AbstractPipeDualTreeModelAutoIT {
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setPipeMemoryManagementEnabled(false)
-        .setIsPipeEnableMemoryCheck(false);
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(false);
     
receiverEnv.getConfig().getConfigNodeConfig().setLeaderDistributionPolicy("HASH");
 
     // 10 min, assert that the operations will not time out
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
index 1ac37a6c768..40d8c31725f 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
@@ -71,7 +71,8 @@ public class IoTDBPipeSinkCompressionIT extends 
AbstractPipeDualTreeModelAutoIT
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setPipeMemoryManagementEnabled(false)
-        .setIsPipeEnableMemoryCheck(false);
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(false);
 
     receiverEnv
         .getConfig()
@@ -81,7 +82,8 @@ public class IoTDBPipeSinkCompressionIT extends 
AbstractPipeDualTreeModelAutoIT
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setPipeMemoryManagementEnabled(false)
-        .setIsPipeEnableMemoryCheck(false);
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(false);
 
     // 10 min, assert that the operations will not time out
     senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java
index d18a3340eb4..b704125ca30 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java
@@ -58,7 +58,8 @@ public abstract class AbstractPipeDualTreeModelManualIT {
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setPipeMemoryManagementEnabled(false)
-        .setIsPipeEnableMemoryCheck(false);
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(false);
     
senderEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:1");
 
     receiverEnv
@@ -68,7 +69,8 @@ public abstract class AbstractPipeDualTreeModelManualIT {
         .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setPipeMemoryManagementEnabled(false)
-        .setIsPipeEnableMemoryCheck(false);
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(false);
 
     // 10 min, assert that the operations will not time out
     senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
index bdd9b850c8a..bc2f2558b64 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
@@ -66,7 +66,8 @@ public class IoTDBPipePermissionIT extends 
AbstractPipeDualTreeModelManualIT {
         
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
         .setDnConnectionTimeoutMs(600000)
         .setPipeMemoryManagementEnabled(false)
-        .setIsPipeEnableMemoryCheck(false);
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(false);
     
senderEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:1");
     receiverEnv
         .getConfig()
@@ -80,7 +81,8 @@ public class IoTDBPipePermissionIT extends 
AbstractPipeDualTreeModelManualIT {
         .setDataReplicationFactor(2)
         .setDnConnectionTimeoutMs(600000)
         .setPipeMemoryManagementEnabled(false)
-        .setIsPipeEnableMemoryCheck(false);
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(false);
 
     senderEnv.initClusterEnvironment();
     receiverEnv.initClusterEnvironment(3, 3);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
index 96c6a801ffd..3bb3ecce21e 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
@@ -37,7 +37,8 @@ abstract class AbstractPipeSingleIT {
         .getCommonConfig()
         .setAutoCreateSchemaEnabled(true)
         .setPipeMemoryManagementEnabled(false)
-        .setIsPipeEnableMemoryCheck(false);
+        .setIsPipeEnableMemoryCheck(false)
+        .setPipeAutoSplitFullEnabled(false);
     env.initClusterEnvironment();
   }
 
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java
index e9df4cf853b..a9736613e72 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java
@@ -56,9 +56,17 @@ public class IoTDBDatabaseIT {
 
   @Before
   public void setUp() throws Exception {
-    
EnvFactory.getEnv().getConfig().getCommonConfig().setEnforceStrongPassword(false);
+    EnvFactory.getEnv()
+        .getConfig()
+        .getCommonConfig()
+        .setEnforceStrongPassword(false)
+        .setPipeAutoSplitFullEnabled(false);
     // enable subscription
-    
EnvFactory.getEnv().getConfig().getCommonConfig().setSubscriptionEnabled(true);
+    EnvFactory.getEnv()
+        .getConfig()
+        .getCommonConfig()
+        .setSubscriptionEnabled(true)
+        .setPipeAutoSplitFullEnabled(false);
     EnvFactory.getEnv().initClusterEnvironment();
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 4d3644aabc9..7e881401838 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -587,6 +587,23 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     }
   }
 
+  public boolean isFullSync(final PipeParameters parameters) {
+    if (isSnapshotMode(parameters)) {
+      return false;
+    }
+
+    final boolean isHistoryEnable =
+        parameters.getBooleanOrDefault(
+            Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, 
SOURCE_HISTORY_ENABLE_KEY),
+            EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
+    final boolean isRealtimeEnable =
+        parameters.getBooleanOrDefault(
+            Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, 
SOURCE_REALTIME_ENABLE_KEY),
+            EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE);
+
+    return isHistoryEnable && isRealtimeEnable;
+  }
+
   private boolean isSnapshotMode(final PipeParameters parameters) {
     final boolean isSnapshotMode;
     if (parameters.hasAnyAttributes(EXTRACTOR_MODE_SNAPSHOT_KEY, 
SOURCE_MODE_SNAPSHOT_KEY)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 4d3a607d61f..e44b1c0983e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -53,6 +53,7 @@ import 
org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginClassLoader;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginExecutableManager;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
 import org.apache.iotdb.commons.pipe.datastructure.visibility.Visibility;
@@ -314,6 +315,7 @@ import org.apache.iotdb.udf.api.relational.ScalarFunction;
 import org.apache.iotdb.udf.api.relational.TableFunction;
 import 
org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.SettableFuture;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.thrift.TException;
@@ -2007,6 +2009,78 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
       return future;
     }
 
+    // Syntactic sugar: if full-sync mode is detected (i.e. not snapshot mode, 
or both realtime
+    // and history are true), the pipe is split into history-only and 
realtime–only modes.
+    final PipeParameters extractorPipeParameters =
+        new PipeParameters(createPipeStatement.getExtractorAttributes());
+    if (PipeConfig.getInstance().getPipeAutoSplitFullEnabled()
+        && PipeDataNodeAgent.task().isFullSync(extractorPipeParameters)) {
+      try (final ConfigNodeClient configNodeClient =
+          
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+        // 1. Send request to create the historical data synchronization 
pipeline
+        final TCreatePipeReq historyReq =
+            new TCreatePipeReq()
+                // Append suffix to the pipeline name for historical data
+                .setPipeName(createPipeStatement.getPipeName() + "_history")
+                // NOTE: set if not exists always to true to handle partial 
failure
+                .setIfNotExistsCondition(true)
+                // Use extractor parameters for historical data
+                .setExtractorAttributes(
+                    extractorPipeParameters
+                        .addOrReplaceEquivalentAttributesWithClone(
+                            new PipeParameters(
+                                ImmutableMap.of(
+                                    
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY,
+                                    Boolean.toString(true),
+                                    
PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY,
+                                    Boolean.toString(false))))
+                        .getAttribute())
+                
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
+                
.setConnectorAttributes(createPipeStatement.getConnectorAttributes());
+
+        final TSStatus historyTsStatus = 
configNodeClient.createPipe(historyReq);
+        // If creation fails, immediately return with exception
+        if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != 
historyTsStatus.getCode()) {
+          future.setException(new IoTDBException(historyTsStatus));
+          return future;
+        }
+
+        // 2. Send request to create the real-time data synchronization 
pipeline
+        final TCreatePipeReq realtimeReq =
+            new TCreatePipeReq()
+                // Append suffix to the pipeline name for real-time data
+                .setPipeName(createPipeStatement.getPipeName() + "_realtime")
+                
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
+                // Use extractor parameters for real-time data
+                .setExtractorAttributes(
+                    extractorPipeParameters
+                        .addOrReplaceEquivalentAttributesWithClone(
+                            new PipeParameters(
+                                ImmutableMap.of(
+                                    
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY,
+                                    Boolean.toString(false),
+                                    
PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY,
+                                    Boolean.toString(true))))
+                        .getAttribute())
+                
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
+                
.setConnectorAttributes(createPipeStatement.getConnectorAttributes());
+
+        final TSStatus realtimeTsStatus = 
configNodeClient.createPipe(realtimeReq);
+        // If creation fails, immediately return with exception
+        if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != 
realtimeTsStatus.getCode()) {
+          future.setException(new IoTDBException(realtimeTsStatus));
+          return future;
+        }
+
+        // 3. Set success status only if both pipelines are created 
successfully
+        future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+      } catch (final Exception e) {
+        // Catch any other exceptions (e.g., network issues)
+        future.setException(e);
+      }
+      return future;
+    }
+
     try (final ConfigNodeClient configNodeClient =
         
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
       final TCreatePipeReq req =
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 61212fbd5a2..c5d0ecc558f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -337,6 +337,8 @@ public class CommonConfig {
   private boolean pipeEventReferenceTrackingEnabled = true;
   private long pipeEventReferenceEliminateIntervalSeconds = 10;
 
+  private boolean pipeAutoSplitFullEnabled = true;
+
   private boolean subscriptionEnabled = false;
 
   private float subscriptionCacheMemoryUsagePercentage = 0.2F;
@@ -2083,6 +2085,14 @@ public class CommonConfig {
         pipeEventReferenceEliminateIntervalSeconds);
   }
 
+  public boolean getPipeAutoSplitFullEnabled() {
+    return pipeAutoSplitFullEnabled;
+  }
+
+  public void setPipeAutoSplitFullEnabled(boolean pipeAutoSplitFullEnabled) {
+    this.pipeAutoSplitFullEnabled = pipeAutoSplitFullEnabled;
+  }
+
   public boolean getSubscriptionEnabled() {
     return subscriptionEnabled;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 887d89b0279..21605ee70e2 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -427,6 +427,12 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeEventReferenceEliminateIntervalSeconds();
   }
 
+  /////////////////////////////// Syntactic sugar 
///////////////////////////////
+
+  public boolean getPipeAutoSplitFullEnabled() {
+    return COMMON_CONFIG.getPipeAutoSplitFullEnabled();
+  }
+
   /////////////////////////////// Utils ///////////////////////////////
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeConfig.class);
@@ -622,6 +628,8 @@ public class PipeConfig {
     LOGGER.info(
         "PipeEventReferenceEliminateIntervalSeconds: {}",
         getPipeEventReferenceEliminateIntervalSeconds());
+
+    LOGGER.info("PipeAutoSplitFullEnabled: {}", getPipeAutoSplitFullEnabled());
   }
 
   /////////////////////////////// Singleton ///////////////////////////////
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 9087db068ed..8009fa27dea 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -547,6 +547,12 @@ public class PipeDescriptor {
         Long.parseLong(
             properties.getProperty(
                 "pipe_max_wait_finish_time", 
String.valueOf(config.getPipeMaxWaitFinishTime()))));
+
+    config.setPipeAutoSplitFullEnabled(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "pipe_auto_split_full_enabled",
+                String.valueOf(config.getPipeAutoSplitFullEnabled()))));
   }
 
   public static void loadPipeExternalConfig(


Reply via email to