This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 355a872cfae Pipe: Fixed the bug that separated historical pipe may not 
include mod on deletion & The pipe without data.insert may be wrongly separated 
by pipe and transfer data (#17346)
355a872cfae is described below

commit 355a872cfaeb77e1e7fe68d78a90544b3054d3a5
Author: Caideyipi <[email protected]>
AuthorDate: Wed Mar 25 17:44:39 2026 +0800

    Pipe: Fixed the bug that separated historical pipe may not include mod on 
deletion & The pipe without data.insert may be wrongly separated by pipe and 
transfer data (#17346)
    
    * fix
    
    * fix
    
    * f
---
 .../treemodel/auto/basic/IoTDBPipeAutoSplitIT.java | 54 ++++++++++++++++++++--
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |  7 ++-
 .../config/executor/ClusterConfigTaskExecutor.java | 52 ++++++++++-----------
 3 files changed, 80 insertions(+), 33 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
index 31308f1c0e1..2ebad93348c 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
 import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.it.utils.TestUtils;
 import org.apache.iotdb.isession.SessionConfig;
 import org.apache.iotdb.it.env.MultiEnvFactory;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
@@ -39,6 +40,8 @@ import org.junit.runner.RunWith;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 
@@ -81,13 +84,12 @@ public class IoTDBPipeAutoSplitIT extends 
AbstractPipeDualTreeModelAutoIT {
   public void testSingleEnv() throws Exception {
     final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
 
-    final String sql =
-        String.format(
-            "create pipe a2b with source ('source'='iotdb-source') with 
processor ('processor'='do-nothing-processor') with sink ('node-urls'='%s')",
-            receiverDataNode.getIpAndPortString());
     try (final Connection connection = senderEnv.getConnection();
         final Statement statement = connection.createStatement()) {
-      statement.execute(sql);
+      statement.execute(
+          String.format(
+              "create pipe a2b with sink ('node-urls'='%s')",
+              receiverDataNode.getIpAndPortString()));
     } catch (final SQLException e) {
       fail(e.getMessage());
     }
@@ -104,5 +106,47 @@ public class IoTDBPipeAutoSplitIT extends 
AbstractPipeDualTreeModelAutoIT {
               || (Objects.equals(showPipeResult.get(1).id, "a2b_history")
                   && Objects.equals(showPipeResult.get(0).id, 
"a2b_realtime")));
     }
+
+    // Do not split for pipes without insertion or non-full
+    TestUtils.executeNonQueries(
+        senderEnv,
+        Arrays.asList(
+            "drop pipe a2b_history",
+            "drop pipe a2b_realtime",
+            String.format(
+                "create pipe a2b1 with source ('inclusion'='schema') with sink 
('node-urls'='%s')",
+                receiverDataNode.getIpAndPortString()),
+            String.format(
+                "create pipe a2b2 with source ('realtime.enable'='false') with 
sink ('node-urls'='%s')",
+                receiverDataNode.getIpAndPortString()),
+            String.format(
+                "create pipe a2b3 with source ('history.enable'='false') with 
sink ('node-urls'='%s')",
+                receiverDataNode.getIpAndPortString())));
+
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      final List<TShowPipeInfo> showPipeResult =
+          client.showPipe(new 
TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
+      showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
+      Assert.assertEquals(3, showPipeResult.size());
+    }
+
+    TestUtils.executeNonQueries(
+        senderEnv,
+        Arrays.asList(
+            "drop pipe a2b1",
+            "drop pipe a2b2",
+            "drop pipe a2b3",
+            "insert into root.test.device(time, field) values(0,1),(1,2)",
+            "delete from root.test.device.* where time == 0",
+            String.format(
+                "create pipe a2b with source ('inclusion'='all') with sink 
('node-urls'='%s')",
+                receiverDataNode.getIpAndPortString())));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "select * from root.test.device",
+        "Time,root.test.device.field,",
+        Collections.singleton("1,2.0,"));
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 0f8b9446d60..ea12513d647 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -584,7 +584,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     }
   }
 
-  public boolean isFullSync(final PipeParameters parameters) {
+  public boolean isFullSync(final PipeParameters parameters) throws 
IllegalPathException {
     if (isSnapshotMode(parameters)) {
       return false;
     }
@@ -598,7 +598,10 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
             Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, 
SOURCE_REALTIME_ENABLE_KEY),
             EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE);
 
-    return isHistoryEnable && isRealtimeEnable;
+    return isHistoryEnable
+        && isRealtimeEnable
+        && 
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(parameters)
+            .getLeft();
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index d28edddb2f9..d0627351b27 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -181,6 +181,7 @@ import 
org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
+import org.apache.iotdb.db.pipe.source.dataregion.DataRegionListeningFilter;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
@@ -2205,10 +2206,10 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
         new PipeParameters(createPipeStatement.getSourceAttributes());
     final PipeParameters sinkPipeParameters =
         new PipeParameters(createPipeStatement.getSinkAttributes());
-    if (PipeConfig.getInstance().getPipeAutoSplitFullEnabled()
-        && PipeDataNodeAgent.task().isFullSync(sourcePipeParameters)) {
-      try (final ConfigNodeClient configNodeClient =
-          
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+    try (final ConfigNodeClient configNodeClient =
+        
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+      if (PipeConfig.getInstance().getPipeAutoSplitFullEnabled()
+          && PipeDataNodeAgent.task().isFullSync(sourcePipeParameters)) {
         // 1. Send request to create the real-time data synchronization 
pipeline
         final TCreatePipeReq realtimeReq =
             new TCreatePipeReq()
@@ -2253,11 +2254,17 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
                                     Boolean.toString(false),
                                     
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY,
                                     Boolean.toString(true),
-                                    // We force the historical pipe to 
transfer data only
+                                    // We force the historical pipe to 
transfer data (and maybe
+                                    // deletion) only
                                     // Thus we can transfer schema only once
                                     // And may drop the historical pipe on 
successfully transferred
                                     PipeSourceConstant.SOURCE_INCLUSION_KEY,
-                                    
PipeSourceConstant.EXTRACTOR_INCLUSION_DEFAULT_VALUE,
+                                    DataRegionListeningFilter
+                                            
.parseInsertionDeletionListeningOptionPair(
+                                                sourcePipeParameters)
+                                            .getRight()
+                                        ? "data"
+                                        : 
PipeSourceConstant.EXTRACTOR_INCLUSION_DEFAULT_VALUE,
                                     PipeSourceConstant.SOURCE_EXCLUSION_KEY,
                                     
PipeSourceConstant.EXTRACTOR_EXCLUSION_DEFAULT_VALUE)))
                         .getAttribute())
@@ -2280,27 +2287,20 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
 
         // 3. Set success status only if both pipelines are created 
successfully
         future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
-      } catch (final Exception e) {
-        // Catch any other exceptions (e.g., network issues)
-        future.setException(e);
-      }
-      return future;
-    }
-
-    try (final ConfigNodeClient configNodeClient =
-        
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
-      final TCreatePipeReq req =
-          new TCreatePipeReq()
-              .setPipeName(pipeName)
-              
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
-              
.setExtractorAttributes(createPipeStatement.getSourceAttributes())
-              
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
-              .setConnectorAttributes(createPipeStatement.getSinkAttributes());
-      final TSStatus tsStatus = configNodeClient.createPipe(req);
-      if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
-        future.setException(new IoTDBException(tsStatus));
       } else {
-        future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+        final TCreatePipeReq req =
+            new TCreatePipeReq()
+                .setPipeName(pipeName)
+                
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
+                
.setExtractorAttributes(createPipeStatement.getSourceAttributes())
+                
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
+                
.setConnectorAttributes(createPipeStatement.getSinkAttributes());
+        final TSStatus tsStatus = configNodeClient.createPipe(req);
+        if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) 
{
+          future.setException(new IoTDBException(tsStatus));
+        } else {
+          future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+        }
       }
     } catch (final Exception e) {
       future.setException(e);

Reply via email to