This is an automated email from the ASF dual-hosted git repository. justinchen pushed a commit to branch opc-13 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 14dd1de1c02a997552bbb7cdbd1653bbb34875df Author: Caideyipi <[email protected]> AuthorDate: Tue Mar 17 19:05:50 2026 +0800 fix --- .../task/subtask/sink/PipeSinkSubtaskManager.java | 45 +++++++++++++--------- .../pipe/config/constant/PipeSinkConstant.java | 8 ++++ 2 files changed, 34 insertions(+), 19 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java index 69ec2d00295..caff425f790 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java @@ -62,10 +62,10 @@ public class PipeSinkSubtaskManager { public synchronized String register( final Supplier<? extends PipeSinkSubtaskExecutor> executorSupplier, - final PipeParameters pipeConnectorParameters, + final PipeParameters pipeSinkParameters, final PipeTaskSinkRuntimeEnvironment environment) { final String connectorKey = - pipeConnectorParameters + pipeSinkParameters .getStringOrDefault( Arrays.asList(PipeSinkConstant.CONNECTOR_KEY, PipeSinkConstant.SINK_KEY), BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName()) @@ -79,23 +79,31 @@ public class PipeSinkSubtaskManager { environment.getRegionId(), connectorKey); - final boolean isDataRegionConnector = + final boolean isDataSinkConnector = StorageEngine.getInstance() .getAllDataRegionIds() .contains(new DataRegionId(environment.getRegionId())); - final int connectorNum; + final int sinkNum; boolean realTimeFirst = false; - String attributeSortedString = generateAttributeSortedString(pipeConnectorParameters); - if (isDataRegionConnector) { - connectorNum = - pipeConnectorParameters.getIntOrDefault( + String attributeSortedString = generateAttributeSortedString(pipeSinkParameters); + if (isDataSinkConnector) { + sinkNum = + pipeSinkParameters.getIntOrDefault( Arrays.asList( PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY, PipeSinkConstant.SINK_IOTDB_PARALLEL_TASKS_KEY), - PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE); + PipeSinkConstant.SINGLE_THREAD_DEFAULT_SINK.contains( + pipeSinkParameters + .getStringOrDefault( + Arrays.asList( + PipeSinkConstant.CONNECTOR_KEY, PipeSinkConstant.SINK_KEY), + BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName()) + .toLowerCase()) + ? 1 + : PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE); realTimeFirst = - pipeConnectorParameters.getBooleanOrDefault( + pipeSinkParameters.getBooleanOrDefault( Arrays.asList( PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY, PipeSinkConstant.SINK_REALTIME_FIRST_KEY), @@ -104,7 +112,7 @@ public class PipeSinkSubtaskManager { } else { // Do not allow parallel tasks for schema region connectors // to avoid the potential disorder of the schema region data transfer - connectorNum = 1; + sinkNum = 1; attributeSortedString = "schema_" + attributeSortedString; } environment.setAttributeSortedString(attributeSortedString); @@ -112,8 +120,7 @@ public class PipeSinkSubtaskManager { if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) { final PipeSinkSubtaskExecutor executor = executorSupplier.get(); - final List<PipeSinkSubtaskLifeCycle> pipeSinkSubtaskLifeCycleList = - new ArrayList<>(connectorNum); + final List<PipeSinkSubtaskLifeCycle> pipeSinkSubtaskLifeCycleList = new ArrayList<>(sinkNum); AtomicInteger counter = new AtomicInteger(0); // Shared pending queue for all subtasks @@ -126,20 +133,20 @@ public class PipeSinkSubtaskManager { ((PipeRealtimePriorityBlockingQueue) pendingQueue).setOfferTsFileCounter(counter); } - for (int connectorIndex = 0; connectorIndex < connectorNum; connectorIndex++) { + for (int connectorIndex = 0; connectorIndex < sinkNum; connectorIndex++) { final PipeConnector pipeConnector = - isDataRegionConnector - ? PipeDataNodeAgent.plugin().dataRegion().reflectSink(pipeConnectorParameters) - : PipeDataNodeAgent.plugin().schemaRegion().reflectSink(pipeConnectorParameters); + isDataSinkConnector + ? PipeDataNodeAgent.plugin().dataRegion().reflectSink(pipeSinkParameters) + : PipeDataNodeAgent.plugin().schemaRegion().reflectSink(pipeSinkParameters); // 1. Construct, validate and customize PipeConnector, and then handshake (create // connection) with the target try { if (pipeConnector instanceof IoTDBDataRegionAsyncSink) { ((IoTDBDataRegionAsyncSink) pipeConnector).setTransferTsFileCounter(counter); } - pipeConnector.validate(new PipeParameterValidator(pipeConnectorParameters)); + pipeConnector.validate(new PipeParameterValidator(pipeSinkParameters)); pipeConnector.customize( - pipeConnectorParameters, new PipeTaskRuntimeConfiguration(environment)); + pipeSinkParameters, new PipeTaskRuntimeConfiguration(environment)); pipeConnector.handshake(); } catch (final Exception e) { try { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java index 451cd965268..1cd9cfb33c7 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java @@ -20,6 +20,7 @@ package org.apache.iotdb.commons.pipe.config.constant; import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.config.PipeConfig; import com.github.luben.zstd.Zstd; @@ -54,6 +55,13 @@ public class PipeSinkConstant { public static final String SINK_IOTDB_PARALLEL_TASKS_KEY = "sink.parallel.tasks"; public static final int CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE = PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum(); + public static final Set<String> SINGLE_THREAD_DEFAULT_SINK = + new HashSet<>( + Arrays.asList( + BuiltinPipePlugin.OPC_UA_SINK.getPipePluginName(), + BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName(), + BuiltinPipePlugin.OPC_DA_SINK.getPipePluginName(), + BuiltinPipePlugin.OPC_DA_CONNECTOR.getPipePluginName())); public static final String CONNECTOR_REALTIME_FIRST_KEY = "connector.realtime-first"; public static final String SINK_REALTIME_FIRST_KEY = "sink.realtime-first";
