This is an automated email from the ASF dual-hosted git repository. JackieTien97 pushed a commit to branch rc/2.0.10 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 60a0942cc33b0cf23c3da84901d732188a06cf91 Author: Caideyipi <[email protected]> AuthorDate: Tue Jun 9 11:40:17 2026 +0800 Support sink config key for pipe request slicing (#17858) * Support sink config key for pipe request slicing * Support processor output series aliases --- .../it/env/cluster/config/MppCommonConfig.java | 2 +- .../twostage/plugin/TwoStageCountProcessor.java | 9 ++- .../plugin/TwoStageCountProcessorTest.java | 44 +++++++++++++++ .../iotdb/commons/pipe/config/PipeDescriptor.java | 8 ++- .../commons/pipe/config/PipeDescriptorTest.java | 65 ++++++++++++++++++++++ 5 files changed, 123 insertions(+), 5 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java index 4852b9d116e..3bba46f798c 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java @@ -538,7 +538,7 @@ public class MppCommonConfig extends MppBaseConfig implements CommonConfig { public CommonConfig setPipeConnectorRequestSliceThresholdBytes( int pipeConnectorRequestSliceThresholdBytes) { setProperty( - "pipe_connector_request_slice_threshold_bytes", + "pipe_sink_request_slice_threshold_bytes", String.valueOf(pipeConnectorRequestSliceThresholdBytes)); return this; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java index d9e2ed8bc17..e9f40b69fa6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex; +import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment; @@ -146,7 +147,7 @@ public class TwoStageCountProcessor implements PipeProcessor { isTableModel = PathUtils.isTableModelDatabase(dataBaseName); } - outputSeries = new PartialPath(parameters.getString(_PROCESSOR_OUTPUT_SERIES_KEY)); + outputSeries = parseOutputSeries(parameters); if (Objects.nonNull(pipeTaskMeta) && Objects.nonNull(pipeTaskMeta.getProgressIndex())) { if (pipeTaskMeta.getProgressIndex() instanceof MinimumProgressIndex) { @@ -178,6 +179,12 @@ public class TwoStageCountProcessor implements PipeProcessor { twoStageAggregateSender = new TwoStageAggregateSender(pipeName, creationTime); } + static PartialPath parseOutputSeries(final PipeParameters parameters) + throws IllegalPathException { + return new PartialPath( + parameters.getStringByKeys(PROCESSOR_OUTPUT_SERIES_KEY, _PROCESSOR_OUTPUT_SERIES_KEY)); + } + @Override public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector) throws Exception { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessorTest.java new file mode 100644 index 00000000000..66db9ccdde8 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessorTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.processor.twostage.plugin; + +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; + +public class TwoStageCountProcessorTest { + + @Test + public void testOutputSeriesSupportsNewAndLegacyKeys() throws Exception { + Assert.assertEquals( + "root.db.d.s1", parseOutputSeries("processor.output.series", "root.db.d.s1").getFullPath()); + Assert.assertEquals( + "root.db.d.s2", parseOutputSeries("processor.output-series", "root.db.d.s2").getFullPath()); + } + + private PartialPath parseOutputSeries(final String key, final String value) throws Exception { + return TwoStageCountProcessor.parseOutputSeries( + new PipeParameters(Collections.singletonMap(key, value))); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java index fb312fd4652..94181010520 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java @@ -437,9 +437,11 @@ public class PipeDescriptor { config.setPipeSinkRequestSliceThresholdBytes( Integer.parseInt( - properties.getProperty( - "pipe_connector_request_slice_threshold_bytes", - String.valueOf(config.getPipeSinkRequestSliceThresholdBytes())))); + Optional.ofNullable(properties.getProperty("pipe_sink_request_slice_threshold_bytes")) + .orElse( + properties.getProperty( + "pipe_connector_request_slice_threshold_bytes", + String.valueOf(config.getPipeSinkRequestSliceThresholdBytes()))))); config.setPipeReceiverLoginPeriodicVerificationIntervalMs( Long.parseLong( diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/config/PipeDescriptorTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/config/PipeDescriptorTest.java new file mode 100644 index 00000000000..00d98212271 --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/config/PipeDescriptorTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.config; + +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.conf.TrimProperties; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class PipeDescriptorTest { + + private final CommonConfig config = CommonDescriptor.getInstance().getConfig(); + + private int originalRequestSliceThresholdBytes; + + @Before + public void setUp() { + originalRequestSliceThresholdBytes = config.getPipeSinkRequestSliceThresholdBytes(); + } + + @After + public void tearDown() { + config.setPipeSinkRequestSliceThresholdBytes(originalRequestSliceThresholdBytes); + } + + @Test + public void testPipeRequestSliceThresholdSupportsSinkAndConnectorKeys() { + final TrimProperties connectorProperties = new TrimProperties(); + connectorProperties.setProperty("pipe_connector_request_slice_threshold_bytes", "123"); + PipeDescriptor.loadPipeInternalConfig(config, connectorProperties); + Assert.assertEquals(123, config.getPipeSinkRequestSliceThresholdBytes()); + + final TrimProperties sinkProperties = new TrimProperties(); + sinkProperties.setProperty("pipe_sink_request_slice_threshold_bytes", "456"); + PipeDescriptor.loadPipeInternalConfig(config, sinkProperties); + Assert.assertEquals(456, config.getPipeSinkRequestSliceThresholdBytes()); + + final TrimProperties bothProperties = new TrimProperties(); + bothProperties.setProperty("pipe_connector_request_slice_threshold_bytes", "123"); + bothProperties.setProperty("pipe_sink_request_slice_threshold_bytes", "456"); + PipeDescriptor.loadPipeInternalConfig(config, bothProperties); + Assert.assertEquals(456, config.getPipeSinkRequestSliceThresholdBytes()); + } +}
