This is an automated email from the ASF dual-hosted git repository. justinchen pushed a commit to branch chainge-13 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6f4dde83030298e7f0d7732e3bce62445ee3763a Author: Caideyipi <[email protected]> AuthorDate: Thu Mar 5 17:33:01 2026 +0800 may-comp --- .../config/executor/ClusterConfigTaskExecutor.java | 23 +++++++++++++++------- .../execution/config/sys/pipe/CreatePipeTask.java | 2 +- .../db/queryengine/plan/parser/ASTVisitor.java | 8 ++++---- .../metadata/pipe/CreatePipeStatement.java | 20 +++++++++---------- .../statement/sys/pipe/PipeStatementTest.java | 8 ++++---- 5 files changed, 35 insertions(+), 26 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index f282a3d004d..1b287840055 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -51,6 +51,7 @@ import org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginExecutableMa import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; import org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapPseudoTPipeTransferRequest; import org.apache.iotdb.commons.schema.cache.CacheClearOptions; @@ -1817,9 +1818,9 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { PipeDataNodeAgent.plugin() .validate( pipeName, - createPipeStatement.getExtractorAttributes(), + createPipeStatement.getSourceAttributes(), createPipeStatement.getProcessorAttributes(), - createPipeStatement.getConnectorAttributes()); + createPipeStatement.getSinkAttributes()); } catch (final Exception e) { LOGGER.info("Failed to validate create pipe statement, because {}", e.getMessage(), e); future.setException( @@ -1830,7 +1831,9 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { // Syntactic sugar: if full-sync mode is detected (i.e. not snapshot mode, or both realtime // and history are true), the pipe is split into history-only and realtime–only modes. final PipeParameters sourcePipeParameters = - new PipeParameters(createPipeStatement.getExtractorAttributes()); + new PipeParameters(createPipeStatement.getSourceAttributes()); + final PipeParameters sinkPipeParameters = + new PipeParameters(createPipeStatement.getSinkAttributes()); if (PipeConfig.getInstance().getPipeAutoSplitFullEnabled() && PipeDataNodeAgent.task().isFullSync(sourcePipeParameters)) { try (final ConfigNodeClient configNodeClient = @@ -1854,7 +1857,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { Boolean.toString(false)))) .getAttribute()) .setProcessorAttributes(createPipeStatement.getProcessorAttributes()) - .setConnectorAttributes(createPipeStatement.getConnectorAttributes()); + .setConnectorAttributes(createPipeStatement.getSinkAttributes()); final TSStatus realtimeTsStatus = configNodeClient.createPipe(realtimeReq); // If creation fails, immediately return with exception @@ -1888,7 +1891,13 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { PipeSourceConstant.EXTRACTOR_EXCLUSION_DEFAULT_VALUE))) .getAttribute()) .setProcessorAttributes(createPipeStatement.getProcessorAttributes()) - .setConnectorAttributes(createPipeStatement.getConnectorAttributes()); + .setConnectorAttributes( + sinkPipeParameters + .addOrReplaceEquivalentAttributesWithClone( + new PipeParameters( + Collections.singletonMap( + PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT, "true"))) + .getAttribute()); final TSStatus historyTsStatus = configNodeClient.createPipe(historyReq); // If creation fails, immediately return with exception @@ -1912,9 +1921,9 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { new TCreatePipeReq() .setPipeName(pipeName) .setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition()) - .setExtractorAttributes(createPipeStatement.getExtractorAttributes()) + .setExtractorAttributes(createPipeStatement.getSourceAttributes()) .setProcessorAttributes(createPipeStatement.getProcessorAttributes()) - .setConnectorAttributes(createPipeStatement.getConnectorAttributes()); + .setConnectorAttributes(createPipeStatement.getSinkAttributes()); TSStatus tsStatus = configNodeClient.createPipe(req); if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { LOGGER.warn( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java index 56a1c69fcd1..229e950356b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/CreatePipeTask.java @@ -37,7 +37,7 @@ public class CreatePipeTask implements IConfigTask { public CreatePipeTask(CreatePipeStatement createPipeStatement) { // support now() function - applyNowFunctionToExtractorAttributes(createPipeStatement.getExtractorAttributes()); + applyNowFunctionToExtractorAttributes(createPipeStatement.getSourceAttributes()); this.createPipeStatement = createPipeStatement; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java index 0f60d2f9ba6..77af818e124 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java @@ -3822,11 +3822,11 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> { ctx.IF() != null && ctx.NOT() != null && ctx.EXISTS() != null); if (ctx.extractorAttributesClause() != null) { - createPipeStatement.setExtractorAttributes( + createPipeStatement.setSourceAttributes( parseExtractorAttributesClause( ctx.extractorAttributesClause().extractorAttributeClause())); } else { - createPipeStatement.setExtractorAttributes(new HashMap<>()); + createPipeStatement.setSourceAttributes(new HashMap<>()); } if (ctx.processorAttributesClause() != null) { createPipeStatement.setProcessorAttributes( @@ -3836,11 +3836,11 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> { createPipeStatement.setProcessorAttributes(new HashMap<>()); } if (ctx.connectorAttributesClause() != null) { - createPipeStatement.setConnectorAttributes( + createPipeStatement.setSinkAttributes( parseConnectorAttributesClause( ctx.connectorAttributesClause().connectorAttributeClause())); } else { - createPipeStatement.setConnectorAttributes( + createPipeStatement.setSinkAttributes( parseConnectorAttributesClause( ctx.connectorAttributesWithoutWithSinkClause().connectorAttributeClause())); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java index a7b7471ffd0..634277093c5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java @@ -38,9 +38,9 @@ public class CreatePipeStatement extends Statement implements IConfigStatement { private String pipeName; private boolean ifNotExistsCondition; - private Map<String, String> extractorAttributes; + private Map<String, String> sourceAttributes; private Map<String, String> processorAttributes; - private Map<String, String> connectorAttributes; + private Map<String, String> sinkAttributes; public CreatePipeStatement(StatementType createPipeStatement) { this.statementType = createPipeStatement; @@ -54,16 +54,16 @@ public class CreatePipeStatement extends Statement implements IConfigStatement { return ifNotExistsCondition; } - public Map<String, String> getExtractorAttributes() { - return extractorAttributes; + public Map<String, String> getSourceAttributes() { + return sourceAttributes; } public Map<String, String> getProcessorAttributes() { return processorAttributes; } - public Map<String, String> getConnectorAttributes() { - return connectorAttributes; + public Map<String, String> getSinkAttributes() { + return sinkAttributes; } public void setPipeName(String pipeName) { @@ -74,16 +74,16 @@ public class CreatePipeStatement extends Statement implements IConfigStatement { this.ifNotExistsCondition = ifNotExistsCondition; } - public void setExtractorAttributes(Map<String, String> extractorAttributes) { - this.extractorAttributes = extractorAttributes; + public void setSourceAttributes(Map<String, String> sourceAttributes) { + this.sourceAttributes = sourceAttributes; } public void setProcessorAttributes(Map<String, String> processorAttributes) { this.processorAttributes = processorAttributes; } - public void setConnectorAttributes(Map<String, String> connectorAttributes) { - this.connectorAttributes = connectorAttributes; + public void setSinkAttributes(Map<String, String> sinkAttributes) { + this.sinkAttributes = sinkAttributes; } @Override diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/statement/sys/pipe/PipeStatementTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/statement/sys/pipe/PipeStatementTest.java index ab885ddb557..04fccc19560 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/statement/sys/pipe/PipeStatementTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/statement/sys/pipe/PipeStatementTest.java @@ -42,14 +42,14 @@ public class PipeStatementTest { CreatePipeStatement statement = new CreatePipeStatement(StatementType.CREATE_PIPE); statement.setPipeName("test"); - statement.setExtractorAttributes(extractorAttributes); + statement.setSourceAttributes(extractorAttributes); statement.setProcessorAttributes(processorAttributes); - statement.setConnectorAttributes(connectorAttributes); + statement.setSinkAttributes(connectorAttributes); Assert.assertEquals("test", statement.getPipeName()); - Assert.assertEquals(extractorAttributes, statement.getExtractorAttributes()); + Assert.assertEquals(extractorAttributes, statement.getSourceAttributes()); Assert.assertEquals(processorAttributes, statement.getProcessorAttributes()); - Assert.assertEquals(connectorAttributes, statement.getConnectorAttributes()); + Assert.assertEquals(connectorAttributes, statement.getSinkAttributes()); Assert.assertEquals(QueryType.WRITE, statement.getQueryType()); }
