This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 6348c2db2a0 [to dev/1.3] Pipe: backport auto split fixes (#17866)
6348c2db2a0 is described below

commit 6348c2db2a04973d203659380b849efabbffb0f2
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 9 15:14:42 2026 +0800

    [to dev/1.3] Pipe: backport auto split fixes (#17866)
    
    * Pipe: Fixed the bug that separated historical pipe may not include mod on 
deletion & The pipe without data.insert may be wrongly separated by pipe and 
transfer data (#17346)
    
    * fix
    
    * fix
    
    * f
    
    (cherry picked from commit 355a872cfaeb77e1e7fe68d78a90544b3054d3a5)
    
    * Pipe: Made the historical pipe split auto dropped after completion 
(#17295)
    
    * snapshot
    
    * may-comp
    
    * auto
    
    (cherry picked from commit 9bfe0b0a78eec2f55b0d3f5388669f488b0e4a4b)
---
 .../pipe/it/autocreate/IoTDBPipeAutoDropIT.java    | 92 ++++++++++++----------
 .../pipe/it/autocreate/IoTDBPipeAutoSplitIT.java   | 55 +++++++++++--
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |  7 +-
 .../config/executor/ClusterConfigTaskExecutor.java | 62 ++++++++-------
 4 files changed, 138 insertions(+), 78 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
index dc1d1fc93ae..09657e2deb2 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
@@ -21,8 +21,10 @@ package org.apache.iotdb.pipe.it.autocreate;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
@@ -33,59 +35,67 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2AutoCreateSchema.class})
 public class IoTDBPipeAutoDropIT extends AbstractPipeDualAutoIT {
 
+  @Override
+  protected void setupConfig() {
+    super.setupConfig();
+    senderEnv.getConfig().getCommonConfig().setPipeAutoSplitFullEnabled(true);
+    
receiverEnv.getConfig().getCommonConfig().setPipeAutoSplitFullEnabled(true);
+    
senderEnv.getConfig().getConfigNodeConfig().setLeaderDistributionPolicy("HASH");
+  }
+
   @Test
   public void testAutoDropInHistoricalTransfer() throws Exception {
     final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
 
-    final String receiverIp = receiverDataNode.getIp();
-    final int receiverPort = receiverDataNode.getPort();
-
-    try (final SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
-
-      TestUtils.executeNonQuery(senderEnv, "insert into root.db.d1(time,s1) 
values (1,1)", null);
-
-      final Map<String, String> extractorAttributes = new HashMap<>();
-      final Map<String, String> processorAttributes = new HashMap<>();
-      final Map<String, String> connectorAttributes = new HashMap<>();
-
-      extractorAttributes.put("extractor.mode", "query");
-
-      connectorAttributes.put("connector", "iotdb-thrift-connector");
-      connectorAttributes.put("connector.batch.enable", "false");
-      connectorAttributes.put("connector.ip", receiverIp);
-      connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
-
-      final TSStatus status =
-          client.createPipe(
-              new TCreatePipeReq("p1", connectorAttributes)
-                  .setExtractorAttributes(extractorAttributes)
-                  .setProcessorAttributes(processorAttributes));
-
-      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
-      Assert.assertEquals(
-          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p1").getCode());
-
-      TestUtils.assertDataEventuallyOnEnv(
-          receiverEnv,
-          "select count(*) from root.**",
-          "count(root.db.d1.s1),",
-          Collections.singleton("1,"));
-
-      TestUtils.assertDataEventuallyOnEnv(
-          senderEnv,
-          "show pipes",
-          
"ID,CreationTime,State,PipeSource,PipeProcessor,PipeSink,ExceptionMessage,RemainingEventCount,EstimatedRemainingSeconds,",
-          Collections.emptySet());
-    }
+    TestUtils.executeNonQuery(
+        senderEnv,
+        String.format(
+            "create pipe a2b with sink ('node-urls'='%s')", 
receiverDataNode.getIpAndPortString()),
+        null);
+
+    TestUtils.executeNonQuery(senderEnv, "insert into root.db.d1(time,s1) 
values (1,1)", null);
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "select count(*) from root.**",
+        "count(root.db.d1.s1),",
+        Collections.singleton("1,"));
+
+    await()
+        .pollInSameThread()
+        .pollDelay(1L, TimeUnit.SECONDS)
+        .pollInterval(1L, TimeUnit.SECONDS)
+        .atMost(600, TimeUnit.SECONDS)
+        .untilAsserted(
+            () -> {
+              try (final Connection connection = senderEnv.getConnection();
+                  final Statement statement = connection.createStatement();
+                  final ResultSet result = statement.executeQuery("show 
pipes")) {
+                int pipeNum = 0;
+                while (result.next()) {
+                  final String pipeName = 
result.getString(ColumnHeaderConstant.ID);
+                  if (!pipeName.contains(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)
+                      && pipeName.endsWith("_history")) {
+                    pipeNum++;
+                  }
+                }
+                Assert.assertEquals(0, pipeNum);
+              }
+            });
   }
 
   @Test
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoSplitIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoSplitIT.java
index 5f732b6d390..ced1e9ee13c 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoSplitIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoSplitIT.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
 import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.it.utils.TestUtils;
 import org.apache.iotdb.it.env.MultiEnvFactory;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
@@ -37,6 +38,8 @@ import org.junit.runner.RunWith;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 
@@ -78,13 +81,12 @@ public class IoTDBPipeAutoSplitIT extends 
AbstractPipeDualAutoIT {
   public void testSingleEnv() throws Exception {
     final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
 
-    final String sql =
-        String.format(
-            "create pipe a2b with source ('source'='iotdb-source') with 
processor ('processor'='do-nothing-processor') with sink ('node-urls'='%s')",
-            receiverDataNode.getIpAndPortString());
     try (final Connection connection = senderEnv.getConnection();
         final Statement statement = connection.createStatement()) {
-      statement.execute(sql);
+      statement.execute(
+          String.format(
+              "create pipe a2b with sink ('node-urls'='%s')",
+              receiverDataNode.getIpAndPortString()));
     } catch (final SQLException e) {
       fail(e.getMessage());
     }
@@ -100,5 +102,48 @@ public class IoTDBPipeAutoSplitIT extends 
AbstractPipeDualAutoIT {
               || (Objects.equals(showPipeResult.get(1).id, "a2b_history")
                   && Objects.equals(showPipeResult.get(0).id, 
"a2b_realtime")));
     }
+
+    // Do not split for pipes without insertion or non-full
+    TestUtils.executeNonQueries(
+        senderEnv,
+        Arrays.asList(
+            "drop pipe a2b_history",
+            "drop pipe a2b_realtime",
+            String.format(
+                "create pipe a2b1 with source ('inclusion'='schema') with sink 
('node-urls'='%s')",
+                receiverDataNode.getIpAndPortString()),
+            String.format(
+                "create pipe a2b2 with source ('realtime.enable'='false') with 
sink ('node-urls'='%s')",
+                receiverDataNode.getIpAndPortString()),
+            String.format(
+                "create pipe a2b3 with source ('history.enable'='false') with 
sink ('node-urls'='%s')",
+                receiverDataNode.getIpAndPortString())),
+        null);
+
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      final List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
+      Assert.assertEquals(3, showPipeResult.size());
+    }
+
+    TestUtils.executeNonQueries(
+        senderEnv,
+        Arrays.asList(
+            "drop pipe a2b1",
+            "drop pipe a2b2",
+            "drop pipe a2b3",
+            "insert into root.test.device(time, field) values(0,1),(1,2)",
+            "delete from root.test.device.* where time == 0",
+            String.format(
+                "create pipe a2b with source ('inclusion'='all') with sink 
('node-urls'='%s')",
+                receiverDataNode.getIpAndPortString())),
+        null);
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "select * from root.test.device",
+        "Time,root.test.device.field,",
+        Collections.singleton("1,2.0,"));
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 9038e7c3a71..67b9460c15a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -586,7 +586,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     }
   }
 
-  public boolean isFullSync(final PipeParameters parameters) {
+  public boolean isFullSync(final PipeParameters parameters) throws 
IllegalPathException {
     if (isSnapshotMode(parameters)) {
       return false;
     }
@@ -600,7 +600,10 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
             Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, 
SOURCE_REALTIME_ENABLE_KEY),
             EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE);
 
-    return isHistoryEnable && isRealtimeEnable;
+    return isHistoryEnable
+        && isRealtimeEnable
+        && 
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(parameters)
+            .getLeft();
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index d582832864a..99eee84964e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -140,6 +140,7 @@ import 
org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
+import org.apache.iotdb.db.pipe.source.dataregion.DataRegionListeningFilter;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
@@ -1826,10 +1827,10 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
         new PipeParameters(createPipeStatement.getSourceAttributes());
     final PipeParameters sinkPipeParameters =
         new PipeParameters(createPipeStatement.getSinkAttributes());
-    if (PipeConfig.getInstance().getPipeAutoSplitFullEnabled()
-        && PipeDataNodeAgent.task().isFullSync(sourcePipeParameters)) {
-      try (final ConfigNodeClient configNodeClient =
-          
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+    try (final ConfigNodeClient configNodeClient =
+        
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+      if (PipeConfig.getInstance().getPipeAutoSplitFullEnabled()
+          && PipeDataNodeAgent.task().isFullSync(sourcePipeParameters)) {
         // 1. Send request to create the real-time data synchronization 
pipeline
         final TCreatePipeReq realtimeReq =
             new TCreatePipeReq()
@@ -1874,11 +1875,19 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
                                     Boolean.toString(false),
                                     
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY,
                                     Boolean.toString(true),
-                                    // We force the historical pipe to 
transfer data only
+                                    PipeSourceConstant.EXTRACTOR_MODE_KEY,
+                                    
PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE,
+                                    // We force the historical pipe to 
transfer data (and maybe
+                                    // deletion) only
                                     // Thus we can transfer schema only once
                                     // And may drop the historical pipe on 
successfully transferred
                                     PipeSourceConstant.SOURCE_INCLUSION_KEY,
-                                    
PipeSourceConstant.EXTRACTOR_INCLUSION_DEFAULT_VALUE,
+                                    DataRegionListeningFilter
+                                            
.parseInsertionDeletionListeningOptionPair(
+                                                sourcePipeParameters)
+                                            .getRight()
+                                        ? "data"
+                                        : 
PipeSourceConstant.EXTRACTOR_INCLUSION_DEFAULT_VALUE,
                                     PipeSourceConstant.SOURCE_EXCLUSION_KEY,
                                     
PipeSourceConstant.EXTRACTOR_EXCLUSION_DEFAULT_VALUE)))
                         .getAttribute())
@@ -1901,31 +1910,24 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
 
         // 3. Set success status only if both pipelines are created 
successfully
         future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
-      } catch (final Exception e) {
-        // Catch any other exceptions (e.g., network issues)
-        future.setException(e);
-      }
-      return future;
-    }
-
-    try (final ConfigNodeClient configNodeClient =
-        
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
-      TCreatePipeReq req =
-          new TCreatePipeReq()
-              .setPipeName(pipeName)
-              
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
-              
.setExtractorAttributes(createPipeStatement.getSourceAttributes())
-              
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
-              .setConnectorAttributes(createPipeStatement.getSinkAttributes());
-      TSStatus tsStatus = configNodeClient.createPipe(req);
-      if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
-        LOGGER.warn(
-            "Failed to create pipe {} in config node, status is {}.",
-            createPipeStatement.getPipeName(),
-            tsStatus);
-        future.setException(new IoTDBException(tsStatus));
       } else {
-        future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+        final TCreatePipeReq req =
+            new TCreatePipeReq()
+                .setPipeName(pipeName)
+                
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
+                
.setExtractorAttributes(createPipeStatement.getSourceAttributes())
+                
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
+                
.setConnectorAttributes(createPipeStatement.getSinkAttributes());
+        final TSStatus tsStatus = configNodeClient.createPipe(req);
+        if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) 
{
+          LOGGER.warn(
+              "Failed to create pipe {} in config node, status is {}.",
+              createPipeStatement.getPipeName(),
+              tsStatus);
+          future.setException(new IoTDBException(tsStatus));
+        } else {
+          future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+        }
       }
     } catch (final Exception e) {
       future.setException(e);

Reply via email to