This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch fix-historical-default
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/fix-historical-default by this 
push:
     new 5fdf5e693dc fix
5fdf5e693dc is described below

commit 5fdf5e693dc15a2b6426cd7e97ed88b647d03d91
Author: Caideyipi <[email protected]>
AuthorDate: Tue Mar 24 14:32:20 2026 +0800

    fix
---
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |  7 ++-
 .../config/executor/ClusterConfigTaskExecutor.java | 52 +++++++++++-----------
 2 files changed, 31 insertions(+), 28 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 0f8b9446d60..c2c1bbd6f96 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -584,7 +584,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     }
   }
 
-  public boolean isFullSync(final PipeParameters parameters) {
+  public boolean isFullSync(final PipeParameters parameters) throws 
IllegalPathException {
     if (isSnapshotMode(parameters)) {
       return false;
     }
@@ -598,7 +598,10 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
             Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, 
SOURCE_REALTIME_ENABLE_KEY),
             EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE);
 
-    return isHistoryEnable && isRealtimeEnable;
+    return isHistoryEnable
+        && isRealtimeEnable
+        && 
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(parameters)
+            .getRight();
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index d28edddb2f9..d0627351b27 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -181,6 +181,7 @@ import 
org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
+import org.apache.iotdb.db.pipe.source.dataregion.DataRegionListeningFilter;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
@@ -2205,10 +2206,10 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
         new PipeParameters(createPipeStatement.getSourceAttributes());
     final PipeParameters sinkPipeParameters =
         new PipeParameters(createPipeStatement.getSinkAttributes());
-    if (PipeConfig.getInstance().getPipeAutoSplitFullEnabled()
-        && PipeDataNodeAgent.task().isFullSync(sourcePipeParameters)) {
-      try (final ConfigNodeClient configNodeClient =
-          
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+    try (final ConfigNodeClient configNodeClient =
+        
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+      if (PipeConfig.getInstance().getPipeAutoSplitFullEnabled()
+          && PipeDataNodeAgent.task().isFullSync(sourcePipeParameters)) {
         // 1. Send request to create the real-time data synchronization 
pipeline
         final TCreatePipeReq realtimeReq =
             new TCreatePipeReq()
@@ -2253,11 +2254,17 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
                                     Boolean.toString(false),
                                     
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY,
                                     Boolean.toString(true),
-                                    // We force the historical pipe to 
transfer data only
+                                    // We force the historical pipe to 
transfer data (and maybe
+                                    // deletion) only
                                     // Thus we can transfer schema only once
                                     // And may drop the historical pipe on 
successfully transferred
                                     PipeSourceConstant.SOURCE_INCLUSION_KEY,
-                                    
PipeSourceConstant.EXTRACTOR_INCLUSION_DEFAULT_VALUE,
+                                    DataRegionListeningFilter
+                                            
.parseInsertionDeletionListeningOptionPair(
+                                                sourcePipeParameters)
+                                            .getRight()
+                                        ? "data"
+                                        : 
PipeSourceConstant.EXTRACTOR_INCLUSION_DEFAULT_VALUE,
                                     PipeSourceConstant.SOURCE_EXCLUSION_KEY,
                                     
PipeSourceConstant.EXTRACTOR_EXCLUSION_DEFAULT_VALUE)))
                         .getAttribute())
@@ -2280,27 +2287,20 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
 
         // 3. Set success status only if both pipelines are created 
successfully
         future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
-      } catch (final Exception e) {
-        // Catch any other exceptions (e.g., network issues)
-        future.setException(e);
-      }
-      return future;
-    }
-
-    try (final ConfigNodeClient configNodeClient =
-        
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
-      final TCreatePipeReq req =
-          new TCreatePipeReq()
-              .setPipeName(pipeName)
-              
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
-              
.setExtractorAttributes(createPipeStatement.getSourceAttributes())
-              
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
-              .setConnectorAttributes(createPipeStatement.getSinkAttributes());
-      final TSStatus tsStatus = configNodeClient.createPipe(req);
-      if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
-        future.setException(new IoTDBException(tsStatus));
       } else {
-        future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+        final TCreatePipeReq req =
+            new TCreatePipeReq()
+                .setPipeName(pipeName)
+                
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
+                
.setExtractorAttributes(createPipeStatement.getSourceAttributes())
+                
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
+                
.setConnectorAttributes(createPipeStatement.getSinkAttributes());
+        final TSStatus tsStatus = configNodeClient.createPipe(req);
+        if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) 
{
+          future.setException(new IoTDBException(tsStatus));
+        } else {
+          future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+        }
       }
     } catch (final Exception e) {
       future.setException(e);

Reply via email to