This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bfe0b0a78e Pipe: Made the historical pipe split auto dropped after 
completion (#17295)
9bfe0b0a78e is described below

commit 9bfe0b0a78eec2f55b0d3f5388669f488b0e4a4b
Author: Caideyipi <[email protected]>
AuthorDate: Tue Mar 31 11:21:42 2026 +0800

    Pipe: Made the historical pipe split auto dropped after completion (#17295)
    
    * snapshot
    
    * may-comp
    
    * auto
---
 .../manual/enhanced/IoTDBPipeAutoDropIT.java       | 123 ++++++++++++---------
 .../config/executor/ClusterConfigTaskExecutor.java |   2 +
 2 files changed, 70 insertions(+), 55 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoDropIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoDropIT.java
index 6ab974c2f1e..ff906d8a5af 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoDropIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoDropIT.java
@@ -24,10 +24,12 @@ import 
org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.db.it.utils.TestUtils;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT2DualTableManualEnhanced;
+import org.apache.iotdb.itbase.env.BaseEnv;
 import org.apache.iotdb.pipe.it.dual.tablemodel.TableModelUtils;
 import 
org.apache.iotdb.pipe.it.dual.tablemodel.manual.AbstractPipeTableModelDualManualIT;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -40,6 +42,7 @@ import org.junit.runner.RunWith;
 
 import java.sql.Connection;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Collections;
 import java.util.HashMap;
@@ -49,6 +52,7 @@ import java.util.function.Consumer;
 
 import static org.apache.iotdb.util.MagicUtils.makeItCloseQuietly;
 import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.fail;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2DualTableManualEnhanced.class})
@@ -60,6 +64,34 @@ public class IoTDBPipeAutoDropIT extends 
AbstractPipeTableModelDualManualIT {
     super.setUp();
   }
 
+  protected void setupConfig() {
+    // Enable auto split
+    senderEnv
+        .getConfig()
+        .getCommonConfig()
+        .setAutoCreateSchemaEnabled(true)
+        .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setEnforceStrongPassword(false)
+        .setPipeMemoryManagementEnabled(false)
+        .setIsPipeEnableMemoryCheck(false);
+    receiverEnv
+        .getConfig()
+        .getCommonConfig()
+        .setAutoCreateSchemaEnabled(true)
+        .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        .setEnforceStrongPassword(false)
+        .setPipeMemoryManagementEnabled(false)
+        .setIsPipeEnableMemoryCheck(false);
+
+    // 10 min, assert that the operations will not time out
+    senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
+    receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
+
+    
senderEnv.getConfig().getConfigNodeConfig().setLeaderDistributionPolicy("HASH");
+  }
+
   @Test
   public void testAutoDropInHistoricalTransfer() throws Exception {
     final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
@@ -70,48 +102,29 @@ public class IoTDBPipeAutoDropIT extends 
AbstractPipeTableModelDualManualIT {
           TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
         };
 
-    final String receiverIp = receiverDataNode.getIp();
-    final int receiverPort = receiverDataNode.getPort();
-
-    try (final SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
-
-      TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
-      TableModelUtils.insertData("test", "test", 0, 100, senderEnv);
-
-      final Map<String, String> extractorAttributes = new HashMap<>();
-      final Map<String, String> processorAttributes = new HashMap<>();
-      final Map<String, String> connectorAttributes = new HashMap<>();
-
-      extractorAttributes.put("mode.snapshot", "true");
-      extractorAttributes.put("capture.table", "true");
-      extractorAttributes.put("user", "root");
-
-      connectorAttributes.put("connector", "iotdb-thrift-connector");
-      connectorAttributes.put("connector.batch.enable", "false");
-      connectorAttributes.put("connector.ip", receiverIp);
-      connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
-
-      final TSStatus status =
-          client.createPipe(
-              new TCreatePipeReq("p1", connectorAttributes)
-                  .setExtractorAttributes(extractorAttributes)
-                  .setProcessorAttributes(processorAttributes));
+    // Create an ordinary full sync pipe
+    final String sql =
+        String.format("create pipe a2b ('node-urls'='%s')", 
receiverDataNode.getIpAndPortString());
+    try (final Connection connection = 
senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+        final Statement statement = connection.createStatement()) {
+      statement.execute(sql);
+    } catch (SQLException e) {
+      fail(e.getMessage());
+    }
 
-      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
-      Assert.assertEquals(
-          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p1").getCode());
+    TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
+    TableModelUtils.insertData("test", "test", 0, 100, senderEnv);
 
-      TestUtils.assertDataEventuallyOnEnv(
-          receiverEnv,
-          TableModelUtils.getQueryCountSql("test"),
-          "_col0,",
-          Collections.singleton("100,"),
-          "test",
-          handleFailure);
-    }
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        TableModelUtils.getQueryCountSql("test"),
+        "_col0,",
+        Collections.singleton("100,"),
+        "test",
+        handleFailure);
 
-    try (final Connection connection = 
makeItCloseQuietly(senderEnv.getConnection());
+    try (final Connection connection =
+            
makeItCloseQuietly(senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT));
         final Statement statement = 
makeItCloseQuietly(connection.createStatement()); ) {
       ResultSet result = statement.executeQuery("show pipes");
       await()
@@ -124,9 +137,9 @@ public class IoTDBPipeAutoDropIT extends 
AbstractPipeTableModelDualManualIT {
                 try {
                   int pipeNum = 0;
                   while (result.next()) {
-                    if (!result
-                        .getString(ColumnHeaderConstant.ID)
-                        .contains(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
+                    final String pipeName = 
result.getString(ColumnHeaderConstant.ID);
+                    if 
(!pipeName.contains(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)
+                        && pipeName.endsWith("_history")) {
                       pipeNum++;
                     }
                   }
@@ -157,25 +170,25 @@ public class IoTDBPipeAutoDropIT extends 
AbstractPipeTableModelDualManualIT {
       TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
       TableModelUtils.insertData("test", "test", 0, 100, senderEnv);
 
-      final Map<String, String> extractorAttributes = new HashMap<>();
+      final Map<String, String> sourceAttributes = new HashMap<>();
       final Map<String, String> processorAttributes = new HashMap<>();
-      final Map<String, String> connectorAttributes = new HashMap<>();
+      final Map<String, String> sinkAttributes = new HashMap<>();
 
-      extractorAttributes.put("mode.snapshot", "true");
-      extractorAttributes.put("capture.table", "true");
-      extractorAttributes.put("start-time", "0");
-      extractorAttributes.put("end-time", "49");
-      extractorAttributes.put("user", "root");
+      sourceAttributes.put("mode.snapshot", "true");
+      sourceAttributes.put("capture.table", "true");
+      sourceAttributes.put("start-time", "0");
+      sourceAttributes.put("end-time", "49");
+      sourceAttributes.put("user", "root");
 
-      connectorAttributes.put("connector", "iotdb-thrift-connector");
-      connectorAttributes.put("connector.batch.enable", "false");
-      connectorAttributes.put("connector.ip", receiverIp);
-      connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
+      sinkAttributes.put("sink", "iotdb-thrift-sink");
+      sinkAttributes.put("sink.batch.enable", "false");
+      sinkAttributes.put("sink.ip", receiverIp);
+      sinkAttributes.put("sink.port", Integer.toString(receiverPort));
 
       final TSStatus status =
           client.createPipe(
-              new TCreatePipeReq("p1", connectorAttributes)
-                  .setExtractorAttributes(extractorAttributes)
+              new TCreatePipeReq("p1", sinkAttributes)
+                  .setExtractorAttributes(sourceAttributes)
                   .setProcessorAttributes(processorAttributes));
 
       Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index fc28d7a5f32..de3261b0484 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -2253,6 +2253,8 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
                                     Boolean.toString(false),
                                     
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY,
                                     Boolean.toString(true),
+                                    PipeSourceConstant.EXTRACTOR_MODE_KEY,
+                                    
PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE,
                                     // We force the historical pipe to 
transfer data (and maybe
                                     // deletion) only
                                     // Thus we can transfer schema only once

Reply via email to