This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 72d4d2b3918 [To dev/1.3] Pipe: Optimized the OPC UA logic to avoid 
potential bugs (#17310)
72d4d2b3918 is described below

commit 72d4d2b39183f9dda3eaddac022e2a2e21778858
Author: Caideyipi <[email protected]>
AuthorDate: Wed Mar 18 17:30:09 2026 +0800

    [To dev/1.3] Pipe: Optimized the OPC UA logic to avoid potential bugs 
(#17310)
    
    * fix
    
    * fix
---
 .../task/subtask/sink/PipeSinkSubtaskManager.java  | 45 +++++++++++--------
 .../pipe/sink/protocol/opcua/OpcUaNameSpace.java   | 50 +++++++++++++---------
 .../pipe/config/constant/PipeSinkConstant.java     |  8 ++++
 3 files changed, 64 insertions(+), 39 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
index 69ec2d00295..caff425f790 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
@@ -62,10 +62,10 @@ public class PipeSinkSubtaskManager {
 
   public synchronized String register(
       final Supplier<? extends PipeSinkSubtaskExecutor> executorSupplier,
-      final PipeParameters pipeConnectorParameters,
+      final PipeParameters pipeSinkParameters,
       final PipeTaskSinkRuntimeEnvironment environment) {
     final String connectorKey =
-        pipeConnectorParameters
+        pipeSinkParameters
             .getStringOrDefault(
                 Arrays.asList(PipeSinkConstant.CONNECTOR_KEY, 
PipeSinkConstant.SINK_KEY),
                 BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
@@ -79,23 +79,31 @@ public class PipeSinkSubtaskManager {
             environment.getRegionId(),
             connectorKey);
 
-    final boolean isDataRegionConnector =
+    final boolean isDataSinkConnector =
         StorageEngine.getInstance()
             .getAllDataRegionIds()
             .contains(new DataRegionId(environment.getRegionId()));
 
-    final int connectorNum;
+    final int sinkNum;
     boolean realTimeFirst = false;
-    String attributeSortedString = 
generateAttributeSortedString(pipeConnectorParameters);
-    if (isDataRegionConnector) {
-      connectorNum =
-          pipeConnectorParameters.getIntOrDefault(
+    String attributeSortedString = 
generateAttributeSortedString(pipeSinkParameters);
+    if (isDataSinkConnector) {
+      sinkNum =
+          pipeSinkParameters.getIntOrDefault(
               Arrays.asList(
                   PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
                   PipeSinkConstant.SINK_IOTDB_PARALLEL_TASKS_KEY),
-              PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
+              PipeSinkConstant.SINGLE_THREAD_DEFAULT_SINK.contains(
+                      pipeSinkParameters
+                          .getStringOrDefault(
+                              Arrays.asList(
+                                  PipeSinkConstant.CONNECTOR_KEY, 
PipeSinkConstant.SINK_KEY),
+                              
BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName())
+                          .toLowerCase())
+                  ? 1
+                  : 
PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
       realTimeFirst =
-          pipeConnectorParameters.getBooleanOrDefault(
+          pipeSinkParameters.getBooleanOrDefault(
               Arrays.asList(
                   PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY,
                   PipeSinkConstant.SINK_REALTIME_FIRST_KEY),
@@ -104,7 +112,7 @@ public class PipeSinkSubtaskManager {
     } else {
       // Do not allow parallel tasks for schema region connectors
       // to avoid the potential disorder of the schema region data transfer
-      connectorNum = 1;
+      sinkNum = 1;
       attributeSortedString = "schema_" + attributeSortedString;
     }
     environment.setAttributeSortedString(attributeSortedString);
@@ -112,8 +120,7 @@ public class PipeSinkSubtaskManager {
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
       final PipeSinkSubtaskExecutor executor = executorSupplier.get();
 
-      final List<PipeSinkSubtaskLifeCycle> pipeSinkSubtaskLifeCycleList =
-          new ArrayList<>(connectorNum);
+      final List<PipeSinkSubtaskLifeCycle> pipeSinkSubtaskLifeCycleList = new 
ArrayList<>(sinkNum);
 
       AtomicInteger counter = new AtomicInteger(0);
       // Shared pending queue for all subtasks
@@ -126,20 +133,20 @@ public class PipeSinkSubtaskManager {
         ((PipeRealtimePriorityBlockingQueue) 
pendingQueue).setOfferTsFileCounter(counter);
       }
 
-      for (int connectorIndex = 0; connectorIndex < connectorNum; 
connectorIndex++) {
+      for (int connectorIndex = 0; connectorIndex < sinkNum; connectorIndex++) 
{
         final PipeConnector pipeConnector =
-            isDataRegionConnector
-                ? 
PipeDataNodeAgent.plugin().dataRegion().reflectSink(pipeConnectorParameters)
-                : 
PipeDataNodeAgent.plugin().schemaRegion().reflectSink(pipeConnectorParameters);
+            isDataSinkConnector
+                ? 
PipeDataNodeAgent.plugin().dataRegion().reflectSink(pipeSinkParameters)
+                : 
PipeDataNodeAgent.plugin().schemaRegion().reflectSink(pipeSinkParameters);
         // 1. Construct, validate and customize PipeConnector, and then 
handshake (create
         // connection) with the target
         try {
           if (pipeConnector instanceof IoTDBDataRegionAsyncSink) {
             ((IoTDBDataRegionAsyncSink) 
pipeConnector).setTransferTsFileCounter(counter);
           }
-          pipeConnector.validate(new 
PipeParameterValidator(pipeConnectorParameters));
+          pipeConnector.validate(new 
PipeParameterValidator(pipeSinkParameters));
           pipeConnector.customize(
-              pipeConnectorParameters, new 
PipeTaskRuntimeConfiguration(environment));
+              pipeSinkParameters, new 
PipeTaskRuntimeConfiguration(environment));
           pipeConnector.handshake();
         } catch (final Exception e) {
           try {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java
index f9fbb2d2ad4..a8edc941bb4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java
@@ -156,6 +156,26 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
       final TSDataType type = measurementSchema.getType();
       final NodeId nodeId = newNodeId(currentFolder + name);
       final UaVariableNode measurementNode;
+
+      int lastNonnullIndex = -1;
+      for (int j = tablet.rowSize - 1; j >= 0; --j) {
+        if (!tablet.bitMaps[i].isMarked(j)) {
+          lastNonnullIndex = j;
+          break;
+        }
+      }
+
+      if (lastNonnullIndex == -1) {
+        continue;
+      }
+
+      final long utcTimestamp = 
timestampToUtc(tablet.timestamps[lastNonnullIndex]);
+      final DataValue value =
+          new DataValue(
+              new Variant(getTabletObjectValue4Opc(tablet.values[i], 
lastNonnullIndex, type)),
+              StatusCode.GOOD,
+              new DateTime(utcTimestamp),
+              new DateTime());
       if (!getNodeManager().containsNode(nodeId)) {
         measurementNode =
             new UaVariableNode.UaVariableNodeBuilder(getNodeContext())
@@ -166,6 +186,7 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
                 .setDisplayName(LocalizedText.english(name))
                 .setDataType(convertToOpcDataType(type))
                 .setTypeDefinition(Identifiers.BaseDataVariableType)
+                .setValue(value)
                 .build();
         getNodeManager().addNode(measurementNode);
         folderNode.addOrganizes(measurementNode);
@@ -181,26 +202,15 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
                                 String.format("The Node %s does not exist.", 
nodeId)));
       }
 
-      int lastNonnullIndex = -1;
-      for (int j = tablet.rowSize - 1; j >= 0; --j) {
-        if (!tablet.bitMaps[i].isMarked(j)) {
-          lastNonnullIndex = j;
-          break;
-        }
-      }
-
-      if (lastNonnullIndex != -1) {
-        final long utcTimestamp = 
timestampToUtc(tablet.timestamps[lastNonnullIndex]);
-        if (Objects.isNull(measurementNode.getValue())
-            || 
Objects.requireNonNull(measurementNode.getValue().getSourceTime()).getUtcTime()
-                < utcTimestamp) {
-          measurementNode.setValue(
-              new DataValue(
-                  new Variant(getTabletObjectValue4Opc(tablet.values[i], 
lastNonnullIndex, type)),
-                  StatusCode.GOOD,
-                  new DateTime(utcTimestamp),
-                  new DateTime()));
-        }
+      if (Objects.isNull(measurementNode.getValue())
+          || Objects.isNull(measurementNode.getValue().getSourceTime())
+          || measurementNode.getValue().getSourceTime().getUtcTime() < 
utcTimestamp) {
+        measurementNode.setValue(
+            new DataValue(
+                new Variant(getTabletObjectValue4Opc(tablet.values[i], 
lastNonnullIndex, type)),
+                StatusCode.GOOD,
+                new DateTime(utcTimestamp),
+                new DateTime()));
       }
     }
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
index 451cd965268..1cd9cfb33c7 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.commons.pipe.config.constant;
 
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 
 import com.github.luben.zstd.Zstd;
@@ -54,6 +55,13 @@ public class PipeSinkConstant {
   public static final String SINK_IOTDB_PARALLEL_TASKS_KEY = 
"sink.parallel.tasks";
   public static final int CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE =
       PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum();
+  public static final Set<String> SINGLE_THREAD_DEFAULT_SINK =
+      new HashSet<>(
+          Arrays.asList(
+              BuiltinPipePlugin.OPC_UA_SINK.getPipePluginName(),
+              BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName(),
+              BuiltinPipePlugin.OPC_DA_SINK.getPipePluginName(),
+              BuiltinPipePlugin.OPC_DA_CONNECTOR.getPipePluginName()));
 
   public static final String CONNECTOR_REALTIME_FIRST_KEY = 
"connector.realtime-first";
   public static final String SINK_REALTIME_FIRST_KEY = "sink.realtime-first";

Reply via email to