This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch patch-2094 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b63ab6c64ecb82b465639b9cb8618c6875389434 Author: Caideyipi <[email protected]> AuthorDate: Wed May 6 17:31:43 2026 +0800 Pipe: Fixed the bug that the split historical pipe's enable-send-tsfile-limit cannot be configured (#17598) (cherry picked from commit c732af3d64cf4664fc422c13f838a7ec589a7b7f) --- .../treemodel/auto/basic/IoTDBPipeAutoSplitIT.java | 14 +++++++++++++- .../config/executor/ClusterConfigTaskExecutor.java | 22 ++++++++++++++-------- 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java index 5be609aba2f..53ab9181a95 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java @@ -140,9 +140,21 @@ public class IoTDBPipeAutoSplitIT extends AbstractPipeDualTreeModelAutoIT { "insert into root.test.device(time, field) values(0,1),(1,2)", "delete from root.test.device.* where time == 0", String.format( - "create pipe a2b with source ('inclusion'='all') with sink ('node-urls'='%s')", + "create pipe a2b with source ('inclusion'='all') with sink " + + "('node-urls'='%s', 'enable-send-tsfile-limit'='false')", receiverDataNode.getIpAndPortString()))); + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final List<TShowPipeInfo> showPipeResult = + client.showPipe(new TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList; + showPipeResult.removeIf(i -> i.getId().startsWith("__consensus")); + Assert.assertTrue( + showPipeResult.stream() + .filter(i -> Objects.equals(i.id, "a2b_history")) + .anyMatch(i -> i.pipeConnector.contains("enable-send-tsfile-limit=false"))); + } + TestUtils.assertDataEventuallyOnEnv( receiverEnv, "select * from root.test.device", diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index 5fb183e0c6f..c7d1b7ba459 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -2296,6 +2296,19 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } // 2. Send request to create the historical data synchronization pipeline + final Map<String, String> historySinkAttributes = + sinkPipeParameters.hasAnyAttributes( + PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT, + PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT) + ? createPipeStatement.getSinkAttributes() + : sinkPipeParameters + .addOrReplaceEquivalentAttributesWithClone( + new PipeParameters( + Collections.singletonMap( + PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT, + Boolean.TRUE.toString()))) + .getAttribute(); + final TCreatePipeReq historyReq = new TCreatePipeReq() // Append suffix to the pipeline name for historical data @@ -2328,14 +2341,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { PipeSourceConstant.EXTRACTOR_EXCLUSION_DEFAULT_VALUE))) .getAttribute()) .setProcessorAttributes(createPipeStatement.getProcessorAttributes()) - .setConnectorAttributes( - sinkPipeParameters - .addOrReplaceEquivalentAttributesWithClone( - new PipeParameters( - Collections.singletonMap( - PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT, - Boolean.TRUE.toString()))) - .getAttribute()); + .setConnectorAttributes(historySinkAttributes); final TSStatus historyTsStatus = configNodeClient.createPipe(historyReq); // If creation fails, immediately return with exception
