This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new fe0b45a213b fixup! Pipe: split full-sync pipe into history and 
realtime pipes (#16250) (#16316) (#16317)
fe0b45a213b is described below

commit fe0b45a213b2cc92dd58b7be59b6eea57e9305da
Author: VGalaxies <[email protected]>
AuthorDate: Tue Sep 2 15:29:17 2025 +0800

    fixup! Pipe: split full-sync pipe into history and realtime pipes (#16250) 
(#16316) (#16317)
---
 .../config/executor/ClusterConfigTaskExecutor.java | 40 +++++++++++-----------
 1 file changed, 20 insertions(+), 20 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index b1af7cee4d3..9dea7d92193 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -1808,58 +1808,58 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
         && PipeDataNodeAgent.task().isFullSync(extractorPipeParameters)) {
       try (final ConfigNodeClient configNodeClient =
           
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
-        // 1. Send request to create the historical data synchronization 
pipeline
-        final TCreatePipeReq historyReq =
+        // 1. Send request to create the real-time data synchronization 
pipeline
+        final TCreatePipeReq realtimeReq =
             new TCreatePipeReq()
-                // Append suffix to the pipeline name for historical data
-                .setPipeName(createPipeStatement.getPipeName() + "_history")
+                // Append suffix to the pipeline name for real-time data
+                .setPipeName(createPipeStatement.getPipeName() + "_realtime")
                 // NOTE: set if not exists always to true to handle partial 
failure
                 .setIfNotExistsCondition(true)
-                // Use extractor parameters for historical data
+                // Use extractor parameters for real-time data
                 .setExtractorAttributes(
                     extractorPipeParameters
                         .addOrReplaceEquivalentAttributesWithClone(
                             new PipeParameters(
                                 ImmutableMap.of(
-                                    
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY,
-                                    Boolean.toString(true),
                                     
PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY,
+                                    Boolean.toString(true),
+                                    
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY,
                                     Boolean.toString(false))))
                         .getAttribute())
                 
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
                 
.setConnectorAttributes(createPipeStatement.getConnectorAttributes());
 
-        final TSStatus historyTsStatus = 
configNodeClient.createPipe(historyReq);
+        final TSStatus realtimeTsStatus = 
configNodeClient.createPipe(realtimeReq);
         // If creation fails, immediately return with exception
-        if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != 
historyTsStatus.getCode()) {
-          future.setException(new IoTDBException(historyTsStatus));
+        if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != 
realtimeTsStatus.getCode()) {
+          future.setException(new IoTDBException(realtimeTsStatus));
           return future;
         }
 
-        // 2. Send request to create the real-time data synchronization 
pipeline
-        final TCreatePipeReq realtimeReq =
+        // 2. Send request to create the historical data synchronization 
pipeline
+        final TCreatePipeReq historyReq =
             new TCreatePipeReq()
-                // Append suffix to the pipeline name for real-time data
-                .setPipeName(createPipeStatement.getPipeName() + "_realtime")
+                // Append suffix to the pipeline name for historical data
+                .setPipeName(createPipeStatement.getPipeName() + "_history")
                 
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
-                // Use extractor parameters for real-time data
+                // Use extractor parameters for historical data
                 .setExtractorAttributes(
                     extractorPipeParameters
                         .addOrReplaceEquivalentAttributesWithClone(
                             new PipeParameters(
                                 ImmutableMap.of(
-                                    
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY,
-                                    Boolean.toString(false),
                                     
PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY,
+                                    Boolean.toString(false),
+                                    
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY,
                                     Boolean.toString(true))))
                         .getAttribute())
                 
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
                 
.setConnectorAttributes(createPipeStatement.getConnectorAttributes());
 
-        final TSStatus realtimeTsStatus = 
configNodeClient.createPipe(realtimeReq);
+        final TSStatus historyTsStatus = 
configNodeClient.createPipe(historyReq);
         // If creation fails, immediately return with exception
-        if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != 
realtimeTsStatus.getCode()) {
-          future.setException(new IoTDBException(realtimeTsStatus));
+        if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != 
historyTsStatus.getCode()) {
+          future.setException(new IoTDBException(historyTsStatus));
           return future;
         }
 

Reply via email to