This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new f5142feff77 [To dev/1.3] Pipe: Fixed the bug that the default
"enable-send-tsfile-limit" is set to false for historical pipes split by full
sync (#17264) (#17265)
f5142feff77 is described below
commit f5142feff77805607ef6711faaa9c5356b17c434
Author: Caideyipi <[email protected]>
AuthorDate: Fri Mar 6 11:37:02 2026 +0800
[To dev/1.3] Pipe: Fixed the bug that the default
"enable-send-tsfile-limit" is set to false for historical pipes split by full
sync (#17264) (#17265)
* may-comp
* Avoid someone merge with name 'may-comp'
---
.../task/builder/PipeDataNodeTaskBuilder.java | 3 ++-
.../config/executor/ClusterConfigTaskExecutor.java | 24 +++++++++++++++-------
.../execution/config/sys/pipe/CreatePipeTask.java | 2 +-
.../db/queryengine/plan/parser/ASTVisitor.java | 8 ++++----
.../metadata/pipe/CreatePipeStatement.java | 20 +++++++++---------
.../statement/sys/pipe/PipeStatementTest.java | 8 ++++----
6 files changed, 38 insertions(+), 27 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
index e767731c1ac..8d227d7c272 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
@@ -210,7 +210,8 @@ public class PipeDataNodeTaskBuilder {
PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT);
if (enableSendTsFileLimit == null) {
-
sinkParameters.addAttribute(PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT,
"true");
+ sinkParameters.addAttribute(
+ PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT,
Boolean.TRUE.toString());
LOGGER.info(
"PipeDataNodeTaskBuilder: When the realtime sync is enabled, we
enable rate limiter in sending tsfile by default to reserve disk and network IO
for realtime sending.");
} else if (!enableSendTsFileLimit) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index f282a3d004d..24c64c24030 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -51,6 +51,7 @@ import
org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginExecutableMa
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
import
org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapPseudoTPipeTransferRequest;
import org.apache.iotdb.commons.schema.cache.CacheClearOptions;
@@ -1817,9 +1818,9 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
PipeDataNodeAgent.plugin()
.validate(
pipeName,
- createPipeStatement.getExtractorAttributes(),
+ createPipeStatement.getSourceAttributes(),
createPipeStatement.getProcessorAttributes(),
- createPipeStatement.getConnectorAttributes());
+ createPipeStatement.getSinkAttributes());
} catch (final Exception e) {
LOGGER.info("Failed to validate create pipe statement, because {}",
e.getMessage(), e);
future.setException(
@@ -1830,7 +1831,9 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
// Syntactic sugar: if full-sync mode is detected (i.e. not snapshot mode,
or both realtime
// and history are true), the pipe is split into history-only and
realtime–only modes.
final PipeParameters sourcePipeParameters =
- new PipeParameters(createPipeStatement.getExtractorAttributes());
+ new PipeParameters(createPipeStatement.getSourceAttributes());
+ final PipeParameters sinkPipeParameters =
+ new PipeParameters(createPipeStatement.getSinkAttributes());
if (PipeConfig.getInstance().getPipeAutoSplitFullEnabled()
&& PipeDataNodeAgent.task().isFullSync(sourcePipeParameters)) {
try (final ConfigNodeClient configNodeClient =
@@ -1854,7 +1857,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
Boolean.toString(false))))
.getAttribute())
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
-
.setConnectorAttributes(createPipeStatement.getConnectorAttributes());
+
.setConnectorAttributes(createPipeStatement.getSinkAttributes());
final TSStatus realtimeTsStatus =
configNodeClient.createPipe(realtimeReq);
// If creation fails, immediately return with exception
@@ -1888,7 +1891,14 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
PipeSourceConstant.EXTRACTOR_EXCLUSION_DEFAULT_VALUE)))
.getAttribute())
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
-
.setConnectorAttributes(createPipeStatement.getConnectorAttributes());
+ .setConnectorAttributes(
+ sinkPipeParameters
+ .addOrReplaceEquivalentAttributesWithClone(
+ new PipeParameters(
+ Collections.singletonMap(
+
PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT,
+ Boolean.TRUE.toString())))
+ .getAttribute());
final TSStatus historyTsStatus =
configNodeClient.createPipe(historyReq);
// If creation fails, immediately return with exception
@@ -1912,9 +1922,9 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
new TCreatePipeReq()
.setPipeName(pipeName)
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
-
.setExtractorAttributes(createPipeStatement.getExtractorAttributes())
+
.setExtractorAttributes(createPipeStatement.getSourceAttributes())
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
-
.setConnectorAttributes(createPipeStatement.getConnectorAttributes());
+ .setConnectorAttributes(createPipeStatement.getSinkAttributes());
TSStatus tsStatus = configNodeClient.createPipe(req);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
LOGGER.warn(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java
index 56a1c69fcd1..229e950356b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java
@@ -37,7 +37,7 @@ public class CreatePipeTask implements IConfigTask {
public CreatePipeTask(CreatePipeStatement createPipeStatement) {
// support now() function
-
applyNowFunctionToExtractorAttributes(createPipeStatement.getExtractorAttributes());
+
applyNowFunctionToExtractorAttributes(createPipeStatement.getSourceAttributes());
this.createPipeStatement = createPipeStatement;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
index 0f60d2f9ba6..77af818e124 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
@@ -3822,11 +3822,11 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
ctx.IF() != null && ctx.NOT() != null && ctx.EXISTS() != null);
if (ctx.extractorAttributesClause() != null) {
- createPipeStatement.setExtractorAttributes(
+ createPipeStatement.setSourceAttributes(
parseExtractorAttributesClause(
ctx.extractorAttributesClause().extractorAttributeClause()));
} else {
- createPipeStatement.setExtractorAttributes(new HashMap<>());
+ createPipeStatement.setSourceAttributes(new HashMap<>());
}
if (ctx.processorAttributesClause() != null) {
createPipeStatement.setProcessorAttributes(
@@ -3836,11 +3836,11 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
createPipeStatement.setProcessorAttributes(new HashMap<>());
}
if (ctx.connectorAttributesClause() != null) {
- createPipeStatement.setConnectorAttributes(
+ createPipeStatement.setSinkAttributes(
parseConnectorAttributesClause(
ctx.connectorAttributesClause().connectorAttributeClause()));
} else {
- createPipeStatement.setConnectorAttributes(
+ createPipeStatement.setSinkAttributes(
parseConnectorAttributesClause(
ctx.connectorAttributesWithoutWithSinkClause().connectorAttributeClause()));
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java
index a7b7471ffd0..634277093c5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java
@@ -38,9 +38,9 @@ public class CreatePipeStatement extends Statement implements
IConfigStatement {
private String pipeName;
private boolean ifNotExistsCondition;
- private Map<String, String> extractorAttributes;
+ private Map<String, String> sourceAttributes;
private Map<String, String> processorAttributes;
- private Map<String, String> connectorAttributes;
+ private Map<String, String> sinkAttributes;
public CreatePipeStatement(StatementType createPipeStatement) {
this.statementType = createPipeStatement;
@@ -54,16 +54,16 @@ public class CreatePipeStatement extends Statement
implements IConfigStatement {
return ifNotExistsCondition;
}
- public Map<String, String> getExtractorAttributes() {
- return extractorAttributes;
+ public Map<String, String> getSourceAttributes() {
+ return sourceAttributes;
}
public Map<String, String> getProcessorAttributes() {
return processorAttributes;
}
- public Map<String, String> getConnectorAttributes() {
- return connectorAttributes;
+ public Map<String, String> getSinkAttributes() {
+ return sinkAttributes;
}
public void setPipeName(String pipeName) {
@@ -74,16 +74,16 @@ public class CreatePipeStatement extends Statement
implements IConfigStatement {
this.ifNotExistsCondition = ifNotExistsCondition;
}
- public void setExtractorAttributes(Map<String, String> extractorAttributes) {
- this.extractorAttributes = extractorAttributes;
+ public void setSourceAttributes(Map<String, String> sourceAttributes) {
+ this.sourceAttributes = sourceAttributes;
}
public void setProcessorAttributes(Map<String, String> processorAttributes) {
this.processorAttributes = processorAttributes;
}
- public void setConnectorAttributes(Map<String, String> connectorAttributes) {
- this.connectorAttributes = connectorAttributes;
+ public void setSinkAttributes(Map<String, String> sinkAttributes) {
+ this.sinkAttributes = sinkAttributes;
}
@Override
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/statement/sys/pipe/PipeStatementTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/statement/sys/pipe/PipeStatementTest.java
index ab885ddb557..04fccc19560 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/statement/sys/pipe/PipeStatementTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/statement/sys/pipe/PipeStatementTest.java
@@ -42,14 +42,14 @@ public class PipeStatementTest {
CreatePipeStatement statement = new
CreatePipeStatement(StatementType.CREATE_PIPE);
statement.setPipeName("test");
- statement.setExtractorAttributes(extractorAttributes);
+ statement.setSourceAttributes(extractorAttributes);
statement.setProcessorAttributes(processorAttributes);
- statement.setConnectorAttributes(connectorAttributes);
+ statement.setSinkAttributes(connectorAttributes);
Assert.assertEquals("test", statement.getPipeName());
- Assert.assertEquals(extractorAttributes,
statement.getExtractorAttributes());
+ Assert.assertEquals(extractorAttributes, statement.getSourceAttributes());
Assert.assertEquals(processorAttributes,
statement.getProcessorAttributes());
- Assert.assertEquals(connectorAttributes,
statement.getConnectorAttributes());
+ Assert.assertEquals(connectorAttributes, statement.getSinkAttributes());
Assert.assertEquals(QueryType.WRITE, statement.getQueryType());
}