This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d09785eaaba Support sink config key for pipe request slicing (#17858)
d09785eaaba is described below
commit d09785eaaba9e19d9a544c38e1312068d1087a85
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 9 11:40:17 2026 +0800
Support sink config key for pipe request slicing (#17858)
* Support sink config key for pipe request slicing
* Support processor output series aliases
---
.../it/env/cluster/config/MppCommonConfig.java | 2 +-
.../twostage/plugin/TwoStageCountProcessor.java | 9 ++-
.../plugin/TwoStageCountProcessorTest.java | 44 +++++++++++++++
.../iotdb/commons/pipe/config/PipeDescriptor.java | 8 ++-
.../commons/pipe/config/PipeDescriptorTest.java | 65 ++++++++++++++++++++++
5 files changed, 123 insertions(+), 5 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index 4852b9d116e..3bba46f798c 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -538,7 +538,7 @@ public class MppCommonConfig extends MppBaseConfig
implements CommonConfig {
public CommonConfig setPipeConnectorRequestSliceThresholdBytes(
int pipeConnectorRequestSliceThresholdBytes) {
setProperty(
- "pipe_connector_request_slice_threshold_bytes",
+ "pipe_sink_request_slice_threshold_bytes",
String.valueOf(pipeConnectorRequestSliceThresholdBytes));
return this;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java
index d9e2ed8bc17..e9f40b69fa6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment;
@@ -146,7 +147,7 @@ public class TwoStageCountProcessor implements
PipeProcessor {
isTableModel = PathUtils.isTableModelDatabase(dataBaseName);
}
- outputSeries = new
PartialPath(parameters.getString(_PROCESSOR_OUTPUT_SERIES_KEY));
+ outputSeries = parseOutputSeries(parameters);
if (Objects.nonNull(pipeTaskMeta) &&
Objects.nonNull(pipeTaskMeta.getProgressIndex())) {
if (pipeTaskMeta.getProgressIndex() instanceof MinimumProgressIndex) {
@@ -178,6 +179,12 @@ public class TwoStageCountProcessor implements
PipeProcessor {
twoStageAggregateSender = new TwoStageAggregateSender(pipeName,
creationTime);
}
+ static PartialPath parseOutputSeries(final PipeParameters parameters)
+ throws IllegalPathException {
+ return new PartialPath(
+ parameters.getStringByKeys(PROCESSOR_OUTPUT_SERIES_KEY,
_PROCESSOR_OUTPUT_SERIES_KEY));
+ }
+
@Override
public void process(TabletInsertionEvent tabletInsertionEvent,
EventCollector eventCollector)
throws Exception {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessorTest.java
new file mode 100644
index 00000000000..66db9ccdde8
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessorTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.plugin;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+
+public class TwoStageCountProcessorTest {
+
+ @Test
+ public void testOutputSeriesSupportsNewAndLegacyKeys() throws Exception {
+ Assert.assertEquals(
+ "root.db.d.s1", parseOutputSeries("processor.output.series",
"root.db.d.s1").getFullPath());
+ Assert.assertEquals(
+ "root.db.d.s2", parseOutputSeries("processor.output-series",
"root.db.d.s2").getFullPath());
+ }
+
+ private PartialPath parseOutputSeries(final String key, final String value)
throws Exception {
+ return TwoStageCountProcessor.parseOutputSeries(
+ new PipeParameters(Collections.singletonMap(key, value)));
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index fb312fd4652..94181010520 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -437,9 +437,11 @@ public class PipeDescriptor {
config.setPipeSinkRequestSliceThresholdBytes(
Integer.parseInt(
- properties.getProperty(
- "pipe_connector_request_slice_threshold_bytes",
-
String.valueOf(config.getPipeSinkRequestSliceThresholdBytes()))));
+
Optional.ofNullable(properties.getProperty("pipe_sink_request_slice_threshold_bytes"))
+ .orElse(
+ properties.getProperty(
+ "pipe_connector_request_slice_threshold_bytes",
+
String.valueOf(config.getPipeSinkRequestSliceThresholdBytes())))));
config.setPipeReceiverLoginPeriodicVerificationIntervalMs(
Long.parseLong(
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/config/PipeDescriptorTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/config/PipeDescriptorTest.java
new file mode 100644
index 00000000000..00d98212271
--- /dev/null
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/config/PipeDescriptorTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.config;
+
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.conf.TrimProperties;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PipeDescriptorTest {
+
+ private final CommonConfig config =
CommonDescriptor.getInstance().getConfig();
+
+ private int originalRequestSliceThresholdBytes;
+
+ @Before
+ public void setUp() {
+ originalRequestSliceThresholdBytes =
config.getPipeSinkRequestSliceThresholdBytes();
+ }
+
+ @After
+ public void tearDown() {
+
config.setPipeSinkRequestSliceThresholdBytes(originalRequestSliceThresholdBytes);
+ }
+
+ @Test
+ public void testPipeRequestSliceThresholdSupportsSinkAndConnectorKeys() {
+ final TrimProperties connectorProperties = new TrimProperties();
+
connectorProperties.setProperty("pipe_connector_request_slice_threshold_bytes",
"123");
+ PipeDescriptor.loadPipeInternalConfig(config, connectorProperties);
+ Assert.assertEquals(123, config.getPipeSinkRequestSliceThresholdBytes());
+
+ final TrimProperties sinkProperties = new TrimProperties();
+ sinkProperties.setProperty("pipe_sink_request_slice_threshold_bytes",
"456");
+ PipeDescriptor.loadPipeInternalConfig(config, sinkProperties);
+ Assert.assertEquals(456, config.getPipeSinkRequestSliceThresholdBytes());
+
+ final TrimProperties bothProperties = new TrimProperties();
+ bothProperties.setProperty("pipe_connector_request_slice_threshold_bytes",
"123");
+ bothProperties.setProperty("pipe_sink_request_slice_threshold_bytes",
"456");
+ PipeDescriptor.loadPipeInternalConfig(config, bothProperties);
+ Assert.assertEquals(456, config.getPipeSinkRequestSliceThresholdBytes());
+ }
+}