This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new f5142feff77 [To dev/1.3] Pipe: Fixed the bug that the default 
"enable-send-tsfile-limit" is set to false for historical pipes split by full 
sync (#17264) (#17265)
f5142feff77 is described below

commit f5142feff77805607ef6711faaa9c5356b17c434
Author: Caideyipi <[email protected]>
AuthorDate: Fri Mar 6 11:37:02 2026 +0800

    [To dev/1.3] Pipe: Fixed the bug that the default 
"enable-send-tsfile-limit" is set to false for historical pipes split by full 
sync (#17264) (#17265)
    
    * may-comp
    
    * Avoid someone merge with name 'may-comp'
---
 .../task/builder/PipeDataNodeTaskBuilder.java      |  3 ++-
 .../config/executor/ClusterConfigTaskExecutor.java | 24 +++++++++++++++-------
 .../execution/config/sys/pipe/CreatePipeTask.java  |  2 +-
 .../db/queryengine/plan/parser/ASTVisitor.java     |  8 ++++----
 .../metadata/pipe/CreatePipeStatement.java         | 20 +++++++++---------
 .../statement/sys/pipe/PipeStatementTest.java      |  8 ++++----
 6 files changed, 38 insertions(+), 27 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
index e767731c1ac..8d227d7c272 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
@@ -210,7 +210,8 @@ public class PipeDataNodeTaskBuilder {
               PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT);
 
       if (enableSendTsFileLimit == null) {
-        
sinkParameters.addAttribute(PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT, 
"true");
+        sinkParameters.addAttribute(
+            PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT, 
Boolean.TRUE.toString());
         LOGGER.info(
             "PipeDataNodeTaskBuilder: When the realtime sync is enabled, we 
enable rate limiter in sending tsfile by default to reserve disk and network IO 
for realtime sending.");
       } else if (!enableSendTsFileLimit) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index f282a3d004d..24c64c24030 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -51,6 +51,7 @@ import 
org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginExecutableMa
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
 import 
org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapPseudoTPipeTransferRequest;
 import org.apache.iotdb.commons.schema.cache.CacheClearOptions;
@@ -1817,9 +1818,9 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
       PipeDataNodeAgent.plugin()
           .validate(
               pipeName,
-              createPipeStatement.getExtractorAttributes(),
+              createPipeStatement.getSourceAttributes(),
               createPipeStatement.getProcessorAttributes(),
-              createPipeStatement.getConnectorAttributes());
+              createPipeStatement.getSinkAttributes());
     } catch (final Exception e) {
       LOGGER.info("Failed to validate create pipe statement, because {}", 
e.getMessage(), e);
       future.setException(
@@ -1830,7 +1831,9 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
     // Syntactic sugar: if full-sync mode is detected (i.e. not snapshot mode, 
or both realtime
     // and history are true), the pipe is split into history-only and 
realtime–only modes.
     final PipeParameters sourcePipeParameters =
-        new PipeParameters(createPipeStatement.getExtractorAttributes());
+        new PipeParameters(createPipeStatement.getSourceAttributes());
+    final PipeParameters sinkPipeParameters =
+        new PipeParameters(createPipeStatement.getSinkAttributes());
     if (PipeConfig.getInstance().getPipeAutoSplitFullEnabled()
         && PipeDataNodeAgent.task().isFullSync(sourcePipeParameters)) {
       try (final ConfigNodeClient configNodeClient =
@@ -1854,7 +1857,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
                                     Boolean.toString(false))))
                         .getAttribute())
                 
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
-                
.setConnectorAttributes(createPipeStatement.getConnectorAttributes());
+                
.setConnectorAttributes(createPipeStatement.getSinkAttributes());
 
         final TSStatus realtimeTsStatus = 
configNodeClient.createPipe(realtimeReq);
         // If creation fails, immediately return with exception
@@ -1888,7 +1891,14 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
                                     
PipeSourceConstant.EXTRACTOR_EXCLUSION_DEFAULT_VALUE)))
                         .getAttribute())
                 
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
-                
.setConnectorAttributes(createPipeStatement.getConnectorAttributes());
+                .setConnectorAttributes(
+                    sinkPipeParameters
+                        .addOrReplaceEquivalentAttributesWithClone(
+                            new PipeParameters(
+                                Collections.singletonMap(
+                                    
PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT,
+                                    Boolean.TRUE.toString())))
+                        .getAttribute());
 
         final TSStatus historyTsStatus = 
configNodeClient.createPipe(historyReq);
         // If creation fails, immediately return with exception
@@ -1912,9 +1922,9 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
           new TCreatePipeReq()
               .setPipeName(pipeName)
               
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
-              
.setExtractorAttributes(createPipeStatement.getExtractorAttributes())
+              
.setExtractorAttributes(createPipeStatement.getSourceAttributes())
               
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
-              
.setConnectorAttributes(createPipeStatement.getConnectorAttributes());
+              .setConnectorAttributes(createPipeStatement.getSinkAttributes());
       TSStatus tsStatus = configNodeClient.createPipe(req);
       if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
         LOGGER.warn(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java
index 56a1c69fcd1..229e950356b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java
@@ -37,7 +37,7 @@ public class CreatePipeTask implements IConfigTask {
 
   public CreatePipeTask(CreatePipeStatement createPipeStatement) {
     // support now() function
-    
applyNowFunctionToExtractorAttributes(createPipeStatement.getExtractorAttributes());
+    
applyNowFunctionToExtractorAttributes(createPipeStatement.getSourceAttributes());
     this.createPipeStatement = createPipeStatement;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
index 0f60d2f9ba6..77af818e124 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
@@ -3822,11 +3822,11 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
         ctx.IF() != null && ctx.NOT() != null && ctx.EXISTS() != null);
 
     if (ctx.extractorAttributesClause() != null) {
-      createPipeStatement.setExtractorAttributes(
+      createPipeStatement.setSourceAttributes(
           parseExtractorAttributesClause(
               ctx.extractorAttributesClause().extractorAttributeClause()));
     } else {
-      createPipeStatement.setExtractorAttributes(new HashMap<>());
+      createPipeStatement.setSourceAttributes(new HashMap<>());
     }
     if (ctx.processorAttributesClause() != null) {
       createPipeStatement.setProcessorAttributes(
@@ -3836,11 +3836,11 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
       createPipeStatement.setProcessorAttributes(new HashMap<>());
     }
     if (ctx.connectorAttributesClause() != null) {
-      createPipeStatement.setConnectorAttributes(
+      createPipeStatement.setSinkAttributes(
           parseConnectorAttributesClause(
               ctx.connectorAttributesClause().connectorAttributeClause()));
     } else {
-      createPipeStatement.setConnectorAttributes(
+      createPipeStatement.setSinkAttributes(
           parseConnectorAttributesClause(
               
ctx.connectorAttributesWithoutWithSinkClause().connectorAttributeClause()));
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java
index a7b7471ffd0..634277093c5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java
@@ -38,9 +38,9 @@ public class CreatePipeStatement extends Statement implements 
IConfigStatement {
 
   private String pipeName;
   private boolean ifNotExistsCondition;
-  private Map<String, String> extractorAttributes;
+  private Map<String, String> sourceAttributes;
   private Map<String, String> processorAttributes;
-  private Map<String, String> connectorAttributes;
+  private Map<String, String> sinkAttributes;
 
   public CreatePipeStatement(StatementType createPipeStatement) {
     this.statementType = createPipeStatement;
@@ -54,16 +54,16 @@ public class CreatePipeStatement extends Statement 
implements IConfigStatement {
     return ifNotExistsCondition;
   }
 
-  public Map<String, String> getExtractorAttributes() {
-    return extractorAttributes;
+  public Map<String, String> getSourceAttributes() {
+    return sourceAttributes;
   }
 
   public Map<String, String> getProcessorAttributes() {
     return processorAttributes;
   }
 
-  public Map<String, String> getConnectorAttributes() {
-    return connectorAttributes;
+  public Map<String, String> getSinkAttributes() {
+    return sinkAttributes;
   }
 
   public void setPipeName(String pipeName) {
@@ -74,16 +74,16 @@ public class CreatePipeStatement extends Statement 
implements IConfigStatement {
     this.ifNotExistsCondition = ifNotExistsCondition;
   }
 
-  public void setExtractorAttributes(Map<String, String> extractorAttributes) {
-    this.extractorAttributes = extractorAttributes;
+  public void setSourceAttributes(Map<String, String> sourceAttributes) {
+    this.sourceAttributes = sourceAttributes;
   }
 
   public void setProcessorAttributes(Map<String, String> processorAttributes) {
     this.processorAttributes = processorAttributes;
   }
 
-  public void setConnectorAttributes(Map<String, String> connectorAttributes) {
-    this.connectorAttributes = connectorAttributes;
+  public void setSinkAttributes(Map<String, String> sinkAttributes) {
+    this.sinkAttributes = sinkAttributes;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/statement/sys/pipe/PipeStatementTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/statement/sys/pipe/PipeStatementTest.java
index ab885ddb557..04fccc19560 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/statement/sys/pipe/PipeStatementTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/statement/sys/pipe/PipeStatementTest.java
@@ -42,14 +42,14 @@ public class PipeStatementTest {
     CreatePipeStatement statement = new 
CreatePipeStatement(StatementType.CREATE_PIPE);
 
     statement.setPipeName("test");
-    statement.setExtractorAttributes(extractorAttributes);
+    statement.setSourceAttributes(extractorAttributes);
     statement.setProcessorAttributes(processorAttributes);
-    statement.setConnectorAttributes(connectorAttributes);
+    statement.setSinkAttributes(connectorAttributes);
 
     Assert.assertEquals("test", statement.getPipeName());
-    Assert.assertEquals(extractorAttributes, 
statement.getExtractorAttributes());
+    Assert.assertEquals(extractorAttributes, statement.getSourceAttributes());
     Assert.assertEquals(processorAttributes, 
statement.getProcessorAttributes());
-    Assert.assertEquals(connectorAttributes, 
statement.getConnectorAttributes());
+    Assert.assertEquals(connectorAttributes, statement.getSinkAttributes());
 
     Assert.assertEquals(QueryType.WRITE, statement.getQueryType());
   }

Reply via email to