This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 12f655ff763 Pipe: Optimized the OPC UA logic to avoid potential bugs 
(#17309)
12f655ff763 is described below

commit 12f655ff763de846d9985844b0d52881eab97c52
Author: Caideyipi <[email protected]>
AuthorDate: Wed Mar 18 17:31:49 2026 +0800

    Pipe: Optimized the OPC UA logic to avoid potential bugs (#17309)
    
    * fix
    
    * fix
---
 .../agent/task/subtask/sink/PipeSinkSubtask.java   |  12 +-
 .../task/subtask/sink/PipeSinkSubtaskManager.java  |  65 +++---
 .../metric/sink/PipeDataRegionSinkMetrics.java     | 241 ++++++++++-----------
 .../sink/protocol/opcua/server/OpcUaNameSpace.java |  25 ++-
 .../pipe/config/constant/PipeSinkConstant.java     |   8 +
 5 files changed, 183 insertions(+), 168 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 91bccc6cbda..c6ab2946005 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -56,7 +56,7 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask {
 
   // Record these variables to provide corresponding value to tag key of 
monitoring metrics
   private final String attributeSortedString;
-  private final int connectorIndex;
+  private final int sinkIndex;
 
   // Now parallel connectors run the same time, thus the heartbeat events are 
not sure
   // to trigger the general event transfer function, causing potentially such 
as
@@ -68,12 +68,12 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
       final String taskID,
       final long creationTime,
       final String attributeSortedString,
-      final int connectorIndex,
+      final int sinkIndex,
       final UnboundedBlockingPendingQueue<Event> inputPendingQueue,
       final PipeConnector outputPipeConnector) {
     super(taskID, creationTime, outputPipeConnector);
     this.attributeSortedString = attributeSortedString;
-    this.connectorIndex = connectorIndex;
+    this.sinkIndex = sinkIndex;
     this.inputPendingQueue = inputPendingQueue;
 
     if (!attributeSortedString.startsWith("schema_")) {
@@ -256,8 +256,8 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
     return attributeSortedString;
   }
 
-  public int getConnectorIndex() {
-    return connectorIndex;
+  public int getSinkIndex() {
+    return sinkIndex;
   }
 
   public int getTsFileInsertionEventCount() {
@@ -275,7 +275,7 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
         + (lastEvent instanceof PipeHeartbeatEvent ? 1 : 0);
   }
 
-  public int getAsyncConnectorRetryEventQueueSize() {
+  public int getAsyncSinkRetryEventQueueSize() {
     return outputPipeSink instanceof IoTDBDataRegionAsyncSink
         ? ((IoTDBDataRegionAsyncSink) outputPipeSink).getRetryEventQueueSize()
         : 0;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
index 3a461fe0395..9138a075918 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
@@ -64,10 +64,10 @@ public class PipeSinkSubtaskManager {
 
   public synchronized String register(
       final Supplier<? extends PipeSinkSubtaskExecutor> executorSupplier,
-      final PipeParameters pipeConnectorParameters,
+      final PipeParameters pipeSinkParameters,
       final PipeTaskSinkRuntimeEnvironment environment) {
     final String connectorKey =
-        pipeConnectorParameters
+        pipeSinkParameters
             .getStringOrDefault(
                 Arrays.asList(PipeSinkConstant.CONNECTOR_KEY, 
PipeSinkConstant.SINK_KEY),
                 BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
@@ -81,24 +81,32 @@ public class PipeSinkSubtaskManager {
             environment.getRegionId(),
             connectorKey);
 
-    final boolean isDataRegionConnector =
+    final boolean isDataRegionSink =
         StorageEngine.getInstance()
                 .getAllDataRegionIds()
                 .contains(new DataRegionId(environment.getRegionId()))
             || PipeRuntimeMeta.isSourceExternal(environment.getRegionId());
 
-    final int connectorNum;
+    final int sinkNum;
     boolean realTimeFirst = false;
-    String attributeSortedString = 
generateAttributeSortedString(pipeConnectorParameters);
-    if (isDataRegionConnector) {
-      connectorNum =
-          pipeConnectorParameters.getIntOrDefault(
+    String attributeSortedString = 
generateAttributeSortedString(pipeSinkParameters);
+    if (isDataRegionSink) {
+      sinkNum =
+          pipeSinkParameters.getIntOrDefault(
               Arrays.asList(
                   PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
                   PipeSinkConstant.SINK_IOTDB_PARALLEL_TASKS_KEY),
-              PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
+              PipeSinkConstant.SINGLE_THREAD_DEFAULT_SINK.contains(
+                      pipeSinkParameters
+                          .getStringOrDefault(
+                              Arrays.asList(
+                                  PipeSinkConstant.CONNECTOR_KEY, 
PipeSinkConstant.SINK_KEY),
+                              
BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName())
+                          .toLowerCase())
+                  ? 1
+                  : 
PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
       realTimeFirst =
-          pipeConnectorParameters.getBooleanOrDefault(
+          pipeSinkParameters.getBooleanOrDefault(
               Arrays.asList(
                   PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY,
                   PipeSinkConstant.SINK_REALTIME_FIRST_KEY),
@@ -107,15 +115,14 @@ public class PipeSinkSubtaskManager {
     } else {
       // Do not allow parallel tasks for schema region connectors
       // to avoid the potential disorder of the schema region data transfer
-      connectorNum = 1;
+      sinkNum = 1;
       attributeSortedString = "schema_" + attributeSortedString;
     }
     environment.setAttributeSortedString(attributeSortedString);
 
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
       final PipeSinkSubtaskExecutor executor = executorSupplier.get();
-      final List<PipeSinkSubtaskLifeCycle> pipeSinkSubtaskLifeCycleList =
-          new ArrayList<>(connectorNum);
+      final List<PipeSinkSubtaskLifeCycle> pipeSinkSubtaskLifeCycleList = new 
ArrayList<>(sinkNum);
 
       AtomicInteger counter = new AtomicInteger(0);
       // Shared pending queue for all subtasks
@@ -128,24 +135,23 @@ public class PipeSinkSubtaskManager {
         ((PipeRealtimePriorityBlockingQueue) 
pendingQueue).setOfferTsFileCounter(counter);
       }
 
-      for (int connectorIndex = 0; connectorIndex < connectorNum; 
connectorIndex++) {
-        final PipeConnector pipeConnector =
-            isDataRegionConnector
-                ? 
PipeDataNodeAgent.plugin().dataRegion().reflectSink(pipeConnectorParameters)
-                : 
PipeDataNodeAgent.plugin().schemaRegion().reflectSink(pipeConnectorParameters);
+      for (int sinkIndex = 0; sinkIndex < sinkNum; sinkIndex++) {
+        final PipeConnector pipeSink =
+            isDataRegionSink
+                ? 
PipeDataNodeAgent.plugin().dataRegion().reflectSink(pipeSinkParameters)
+                : 
PipeDataNodeAgent.plugin().schemaRegion().reflectSink(pipeSinkParameters);
         // 1. Construct, validate and customize PipeConnector, and then 
handshake (create
         // connection) with the target
         try {
-          if (pipeConnector instanceof IoTDBDataRegionAsyncSink) {
-            ((IoTDBDataRegionAsyncSink) 
pipeConnector).setTransferTsFileCounter(counter);
+          if (pipeSink instanceof IoTDBDataRegionAsyncSink) {
+            ((IoTDBDataRegionAsyncSink) 
pipeSink).setTransferTsFileCounter(counter);
           }
-          pipeConnector.validate(new 
PipeParameterValidator(pipeConnectorParameters));
-          pipeConnector.customize(
-              pipeConnectorParameters, new 
PipeTaskRuntimeConfiguration(environment));
-          pipeConnector.handshake();
+          pipeSink.validate(new PipeParameterValidator(pipeSinkParameters));
+          pipeSink.customize(pipeSinkParameters, new 
PipeTaskRuntimeConfiguration(environment));
+          pipeSink.handshake();
         } catch (final Exception e) {
           try {
-            pipeConnector.close();
+            pipeSink.close();
           } catch (final Exception closeException) {
             LOGGER.warn(
                 "Failed to close connector after failed to initialize 
connector. "
@@ -160,20 +166,19 @@ public class PipeSinkSubtaskManager {
         final PipeSinkSubtask pipeSinkSubtask =
             new PipeSinkSubtask(
                 String.format(
-                    "%s_%s_%s",
-                    attributeSortedString, environment.getCreationTime(), 
connectorIndex),
+                    "%s_%s_%s", attributeSortedString, 
environment.getCreationTime(), sinkIndex),
                 environment.getCreationTime(),
                 attributeSortedString,
-                connectorIndex,
+                sinkIndex,
                 pendingQueue,
-                pipeConnector);
+                pipeSink);
         final PipeSinkSubtaskLifeCycle pipeSinkSubtaskLifeCycle =
             new PipeSinkSubtaskLifeCycle(executor, pipeSinkSubtask, 
pendingQueue);
         pipeSinkSubtaskLifeCycleList.add(pipeSinkSubtaskLifeCycle);
       }
 
       LOGGER.info(
-          "Pipe connector subtasks with attributes {} is bounded with 
connectorExecutor {} and callbackExecutor {}.",
+          "Pipe sink subtasks with attributes {} is bounded with sinkExecutor 
{} and callbackExecutor {}.",
           attributeSortedString,
           executor.getWorkingThreadName(),
           executor.getCallbackThreadName());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
index 7eef4a03aa5..2502c385e5c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
@@ -46,7 +46,7 @@ public class PipeDataRegionSinkMetrics implements IMetricSet {
   @SuppressWarnings("java:S3077")
   private volatile AbstractMetricService metricService;
 
-  private final Map<String, PipeSinkSubtask> connectorMap = new HashMap<>();
+  private final Map<String, PipeSinkSubtask> sinkMap = new HashMap<>();
 
   private final Map<String, Rate> tabletRateMap = new ConcurrentHashMap<>();
 
@@ -61,7 +61,7 @@ public class PipeDataRegionSinkMetrics implements IMetricSet {
   @Override
   public void bindTo(final AbstractMetricService metricService) {
     this.metricService = metricService;
-    final ImmutableSet<String> taskIDs = 
ImmutableSet.copyOf(connectorMap.keySet());
+    final ImmutableSet<String> taskIDs = ImmutableSet.copyOf(sinkMap.keySet());
     for (String taskID : taskIDs) {
       createMetrics(taskID);
     }
@@ -75,91 +75,91 @@ public class PipeDataRegionSinkMetrics implements 
IMetricSet {
   }
 
   private void createAutoGauge(final String taskID) {
-    final PipeSinkSubtask connector = connectorMap.get(taskID);
+    final PipeSinkSubtask sink = sinkMap.get(taskID);
     // Pending event count
     metricService.createAutoGauge(
         Metric.UNTRANSFERRED_TABLET_COUNT.toString(),
         MetricLevel.IMPORTANT,
-        connector,
+        sink,
         PipeSinkSubtask::getTabletInsertionEventCount,
         Tag.NAME.toString(),
-        connector.getAttributeSortedString(),
+        sink.getAttributeSortedString(),
         Tag.INDEX.toString(),
-        String.valueOf(connector.getConnectorIndex()),
+        String.valueOf(sink.getSinkIndex()),
         Tag.CREATION_TIME.toString(),
-        String.valueOf(connector.getCreationTime()));
+        String.valueOf(sink.getCreationTime()));
     metricService.createAutoGauge(
         Metric.UNTRANSFERRED_TSFILE_COUNT.toString(),
         MetricLevel.IMPORTANT,
-        connector,
+        sink,
         PipeSinkSubtask::getTsFileInsertionEventCount,
         Tag.NAME.toString(),
-        connector.getAttributeSortedString(),
+        sink.getAttributeSortedString(),
         Tag.INDEX.toString(),
-        String.valueOf(connector.getConnectorIndex()),
+        String.valueOf(sink.getSinkIndex()),
         Tag.CREATION_TIME.toString(),
-        String.valueOf(connector.getCreationTime()));
+        String.valueOf(sink.getCreationTime()));
     metricService.createAutoGauge(
         Metric.UNTRANSFERRED_HEARTBEAT_COUNT.toString(),
         MetricLevel.IMPORTANT,
-        connector,
+        sink,
         PipeSinkSubtask::getPipeHeartbeatEventCount,
         Tag.NAME.toString(),
-        connector.getAttributeSortedString(),
+        sink.getAttributeSortedString(),
         Tag.INDEX.toString(),
-        String.valueOf(connector.getConnectorIndex()),
+        String.valueOf(sink.getSinkIndex()),
         Tag.CREATION_TIME.toString(),
-        String.valueOf(connector.getCreationTime()));
-    // Metrics related to IoTDBThriftAsyncConnector
+        String.valueOf(sink.getCreationTime()));
+    // Metrics related to IoTDBThriftAsyncSink
     metricService.createAutoGauge(
         Metric.PIPE_ASYNC_CONNECTOR_RETRY_EVENT_QUEUE_SIZE.toString(),
         MetricLevel.IMPORTANT,
-        connector,
-        PipeSinkSubtask::getAsyncConnectorRetryEventQueueSize,
+        sink,
+        PipeSinkSubtask::getAsyncSinkRetryEventQueueSize,
         Tag.NAME.toString(),
-        connector.getAttributeSortedString(),
+        sink.getAttributeSortedString(),
         Tag.INDEX.toString(),
-        String.valueOf(connector.getConnectorIndex()),
+        String.valueOf(sink.getSinkIndex()),
         Tag.CREATION_TIME.toString(),
-        String.valueOf(connector.getCreationTime()));
+        String.valueOf(sink.getCreationTime()));
     metricService.createAutoGauge(
         Metric.PIPE_PENDING_HANDLERS_SIZE.toString(),
         MetricLevel.IMPORTANT,
-        connector,
+        sink,
         PipeSinkSubtask::getPendingHandlersSize,
         Tag.NAME.toString(),
-        connector.getAttributeSortedString(),
+        sink.getAttributeSortedString(),
         Tag.INDEX.toString(),
-        String.valueOf(connector.getConnectorIndex()),
+        String.valueOf(sink.getSinkIndex()),
         Tag.CREATION_TIME.toString(),
-        String.valueOf(connector.getCreationTime()));
-    // Metrics related to IoTDB connector
+        String.valueOf(sink.getCreationTime()));
+    // Metrics related to IoTDB sink
     metricService.createAutoGauge(
         Metric.PIPE_TOTAL_UNCOMPRESSED_SIZE.toString(),
         MetricLevel.IMPORTANT,
-        connector,
+        sink,
         PipeSinkSubtask::getTotalUncompressedSize,
         Tag.NAME.toString(),
-        connector.getAttributeSortedString(),
+        sink.getAttributeSortedString(),
         Tag.INDEX.toString(),
-        String.valueOf(connector.getConnectorIndex()),
+        String.valueOf(sink.getSinkIndex()),
         Tag.CREATION_TIME.toString(),
-        String.valueOf(connector.getCreationTime()));
+        String.valueOf(sink.getCreationTime()));
     metricService.createAutoGauge(
         Metric.PIPE_TOTAL_COMPRESSED_SIZE.toString(),
         MetricLevel.IMPORTANT,
-        connector,
+        sink,
         PipeSinkSubtask::getTotalCompressedSize,
         Tag.NAME.toString(),
-        connector.getAttributeSortedString(),
+        sink.getAttributeSortedString(),
         Tag.INDEX.toString(),
-        String.valueOf(connector.getConnectorIndex()),
+        String.valueOf(sink.getSinkIndex()),
         Tag.CREATION_TIME.toString(),
-        String.valueOf(connector.getCreationTime()));
+        String.valueOf(sink.getCreationTime()));
   }
 
   private void createRate(final String taskID) {
-    final PipeSinkSubtask connector = connectorMap.get(taskID);
+    final PipeSinkSubtask sink = sinkMap.get(taskID);
     // Transfer event rate
     tabletRateMap.put(
         taskID,
@@ -167,109 +167,108 @@ public class PipeDataRegionSinkMetrics implements 
IMetricSet {
             Metric.PIPE_CONNECTOR_TABLET_TRANSFER.toString(),
             MetricLevel.IMPORTANT,
             Tag.NAME.toString(),
-            connector.getAttributeSortedString(),
+            sink.getAttributeSortedString(),
             Tag.INDEX.toString(),
-            String.valueOf(connector.getConnectorIndex()),
+            String.valueOf(sink.getSinkIndex()),
             Tag.CREATION_TIME.toString(),
-            String.valueOf(connector.getCreationTime())));
+            String.valueOf(sink.getCreationTime())));
     tsFileRateMap.put(
         taskID,
         metricService.getOrCreateRate(
             Metric.PIPE_CONNECTOR_TSFILE_TRANSFER.toString(),
             MetricLevel.IMPORTANT,
             Tag.NAME.toString(),
-            connector.getAttributeSortedString(),
+            sink.getAttributeSortedString(),
             Tag.INDEX.toString(),
-            String.valueOf(connector.getConnectorIndex()),
+            String.valueOf(sink.getSinkIndex()),
             Tag.CREATION_TIME.toString(),
-            String.valueOf(connector.getCreationTime())));
+            String.valueOf(sink.getCreationTime())));
     pipeHeartbeatRateMap.put(
         taskID,
         metricService.getOrCreateRate(
             Metric.PIPE_CONNECTOR_HEARTBEAT_TRANSFER.toString(),
             MetricLevel.IMPORTANT,
             Tag.NAME.toString(),
-            connector.getAttributeSortedString(),
+            sink.getAttributeSortedString(),
             Tag.INDEX.toString(),
-            String.valueOf(connector.getConnectorIndex()),
+            String.valueOf(sink.getSinkIndex()),
             Tag.CREATION_TIME.toString(),
-            String.valueOf(connector.getCreationTime())));
+            String.valueOf(sink.getCreationTime())));
   }
 
   private void createTimer(final String taskID) {
-    final PipeSinkSubtask connector = connectorMap.get(taskID);
+    final PipeSinkSubtask sink = sinkMap.get(taskID);
     compressionTimerMap.putIfAbsent(
-        connector.getAttributeSortedString(),
+        sink.getAttributeSortedString(),
         metricService.getOrCreateTimer(
             Metric.PIPE_COMPRESSION_TIME.toString(),
             MetricLevel.IMPORTANT,
             Tag.NAME.toString(),
-            connector.getAttributeSortedString(),
+            sink.getAttributeSortedString(),
             Tag.CREATION_TIME.toString(),
-            String.valueOf(connector.getCreationTime())));
+            String.valueOf(sink.getCreationTime())));
   }
 
   private void createHistogram(final String taskID) {
-    final PipeSinkSubtask connector = connectorMap.get(taskID);
+    final PipeSinkSubtask sink = sinkMap.get(taskID);
 
     final Histogram tabletBatchSizeHistogram =
         metricService.getOrCreateHistogram(
             Metric.PIPE_INSERT_NODE_BATCH_SIZE.toString(),
             MetricLevel.IMPORTANT,
             Tag.NAME.toString(),
-            connector.getAttributeSortedString(),
+            sink.getAttributeSortedString(),
             Tag.CREATION_TIME.toString(),
-            String.valueOf(connector.getCreationTime()));
-    connector.setTabletBatchSizeHistogram(tabletBatchSizeHistogram);
+            String.valueOf(sink.getCreationTime()));
+    sink.setTabletBatchSizeHistogram(tabletBatchSizeHistogram);
 
     final Histogram tsFileBatchSizeHistogram =
         metricService.getOrCreateHistogram(
             Metric.PIPE_TSFILE_BATCH_SIZE.toString(),
             MetricLevel.IMPORTANT,
             Tag.NAME.toString(),
-            connector.getAttributeSortedString(),
+            sink.getAttributeSortedString(),
             Tag.CREATION_TIME.toString(),
-            String.valueOf(connector.getCreationTime()));
-    connector.setTsFileBatchSizeHistogram(tsFileBatchSizeHistogram);
+            String.valueOf(sink.getCreationTime()));
+    sink.setTsFileBatchSizeHistogram(tsFileBatchSizeHistogram);
 
     final Histogram tabletBatchTimeIntervalHistogram =
         metricService.getOrCreateHistogram(
             Metric.PIPE_INSERT_NODE_BATCH_TIME_COST.toString(),
             MetricLevel.IMPORTANT,
             Tag.NAME.toString(),
-            connector.getAttributeSortedString(),
+            sink.getAttributeSortedString(),
             Tag.CREATION_TIME.toString(),
-            String.valueOf(connector.getCreationTime()));
-    
connector.setTabletBatchTimeIntervalHistogram(tabletBatchTimeIntervalHistogram);
+            String.valueOf(sink.getCreationTime()));
+    sink.setTabletBatchTimeIntervalHistogram(tabletBatchTimeIntervalHistogram);
 
     final Histogram tsFileBatchTimeIntervalHistogram =
         metricService.getOrCreateHistogram(
             Metric.PIPE_TSFILE_BATCH_TIME_COST.toString(),
             MetricLevel.IMPORTANT,
             Tag.NAME.toString(),
-            connector.getAttributeSortedString(),
+            sink.getAttributeSortedString(),
             Tag.CREATION_TIME.toString(),
-            String.valueOf(connector.getCreationTime()));
-    
connector.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram);
+            String.valueOf(sink.getCreationTime()));
+    sink.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram);
 
     Histogram eventSizeHistogram =
         metricService.getOrCreateHistogram(
             Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
             MetricLevel.IMPORTANT,
             Tag.NAME.toString(),
-            connector.getAttributeSortedString());
-    connector.setEventSizeHistogram(eventSizeHistogram);
+            sink.getAttributeSortedString());
+    sink.setEventSizeHistogram(eventSizeHistogram);
   }
 
   @Override
   public void unbindFrom(final AbstractMetricService metricService) {
-    final ImmutableSet<String> taskIDs = 
ImmutableSet.copyOf(connectorMap.keySet());
+    final ImmutableSet<String> taskIDs = ImmutableSet.copyOf(sinkMap.keySet());
     for (final String taskID : taskIDs) {
       deregister(taskID);
     }
-    if (!connectorMap.isEmpty()) {
-      LOGGER.warn(
-          "Failed to unbind from pipe data region connector metrics, connector 
map not empty");
+    if (!sinkMap.isEmpty()) {
+      LOGGER.warn("Failed to unbind from pipe data region sink metrics, sink 
map not empty");
     }
   }
 
@@ -281,181 +280,181 @@ public class PipeDataRegionSinkMetrics implements 
IMetricSet {
   }
 
   private void removeAutoGauge(final String taskID) {
-    final PipeSinkSubtask connector = connectorMap.get(taskID);
+    final PipeSinkSubtask sink = sinkMap.get(taskID);
     // Pending event count
     metricService.remove(
         MetricType.AUTO_GAUGE,
         Metric.UNTRANSFERRED_TABLET_COUNT.toString(),
         Tag.NAME.toString(),
-        connector.getAttributeSortedString(),
+        sink.getAttributeSortedString(),
         Tag.INDEX.toString(),
-        String.valueOf(connector.getConnectorIndex()),
+        String.valueOf(sink.getSinkIndex()),
         Tag.CREATION_TIME.toString(),
-        String.valueOf(connector.getCreationTime()));
+        String.valueOf(sink.getCreationTime()));
     metricService.remove(
         MetricType.AUTO_GAUGE,
         Metric.UNTRANSFERRED_TSFILE_COUNT.toString(),
         Tag.NAME.toString(),
-        connector.getAttributeSortedString(),
+        sink.getAttributeSortedString(),
         Tag.INDEX.toString(),
-        String.valueOf(connector.getConnectorIndex()),
+        String.valueOf(sink.getSinkIndex()),
         Tag.CREATION_TIME.toString(),
-        String.valueOf(connector.getCreationTime()));
+        String.valueOf(sink.getCreationTime()));
     metricService.remove(
         MetricType.AUTO_GAUGE,
         Metric.UNTRANSFERRED_HEARTBEAT_COUNT.toString(),
         Tag.NAME.toString(),
-        connector.getAttributeSortedString(),
+        sink.getAttributeSortedString(),
         Tag.INDEX.toString(),
-        String.valueOf(connector.getConnectorIndex()),
+        String.valueOf(sink.getSinkIndex()),
         Tag.CREATION_TIME.toString(),
-        String.valueOf(connector.getCreationTime()));
-    // Metrics related to IoTDBThriftAsyncConnector
+        String.valueOf(sink.getCreationTime()));
+    // Metrics related to IoTDBThriftAsyncSink
     metricService.remove(
         MetricType.AUTO_GAUGE,
         Metric.PIPE_ASYNC_CONNECTOR_RETRY_EVENT_QUEUE_SIZE.toString(),
         Tag.NAME.toString(),
-        connector.getAttributeSortedString(),
+        sink.getAttributeSortedString(),
         Tag.INDEX.toString(),
-        String.valueOf(connector.getConnectorIndex()),
+        String.valueOf(sink.getSinkIndex()),
         Tag.CREATION_TIME.toString(),
-        String.valueOf(connector.getCreationTime()));
+        String.valueOf(sink.getCreationTime()));
     metricService.remove(
         MetricType.AUTO_GAUGE,
         Metric.PIPE_PENDING_HANDLERS_SIZE.toString(),
         Tag.NAME.toString(),
-        connector.getAttributeSortedString(),
+        sink.getAttributeSortedString(),
         Tag.INDEX.toString(),
-        String.valueOf(connector.getConnectorIndex()),
+        String.valueOf(sink.getSinkIndex()),
         Tag.CREATION_TIME.toString(),
-        String.valueOf(connector.getCreationTime()));
-    // Metrics related to IoTDB connector
+        String.valueOf(sink.getCreationTime()));
+    // Metrics related to IoTDB sink
     metricService.remove(
         MetricType.AUTO_GAUGE,
         Metric.PIPE_TOTAL_UNCOMPRESSED_SIZE.toString(),
         Tag.NAME.toString(),
-        connector.getAttributeSortedString(),
+        sink.getAttributeSortedString(),
         Tag.INDEX.toString(),
-        String.valueOf(connector.getConnectorIndex()),
+        String.valueOf(sink.getSinkIndex()),
         Tag.CREATION_TIME.toString(),
-        String.valueOf(connector.getCreationTime()));
+        String.valueOf(sink.getCreationTime()));
     metricService.remove(
         MetricType.AUTO_GAUGE,
         Metric.PIPE_TOTAL_COMPRESSED_SIZE.toString(),
         Tag.NAME.toString(),
-        connector.getAttributeSortedString(),
+        sink.getAttributeSortedString(),
         Tag.INDEX.toString(),
-        String.valueOf(connector.getConnectorIndex()),
+        String.valueOf(sink.getSinkIndex()),
         Tag.CREATION_TIME.toString(),
-        String.valueOf(connector.getCreationTime()));
+        String.valueOf(sink.getCreationTime()));
   }
 
   private void removeRate(final String taskID) {
-    final PipeSinkSubtask connector = connectorMap.get(taskID);
+    final PipeSinkSubtask sink = sinkMap.get(taskID);
     // Transfer event rate
     metricService.remove(
         MetricType.RATE,
         Metric.PIPE_CONNECTOR_TABLET_TRANSFER.toString(),
         Tag.NAME.toString(),
-        connector.getAttributeSortedString(),
+        sink.getAttributeSortedString(),
         Tag.INDEX.toString(),
-        String.valueOf(connector.getConnectorIndex()),
+        String.valueOf(sink.getSinkIndex()),
         Tag.CREATION_TIME.toString(),
-        String.valueOf(connector.getCreationTime()));
+        String.valueOf(sink.getCreationTime()));
     metricService.remove(
         MetricType.RATE,
         Metric.PIPE_CONNECTOR_TSFILE_TRANSFER.toString(),
         Tag.NAME.toString(),
-        connector.getAttributeSortedString(),
+        sink.getAttributeSortedString(),
         Tag.INDEX.toString(),
-        String.valueOf(connector.getConnectorIndex()),
+        String.valueOf(sink.getSinkIndex()),
         Tag.CREATION_TIME.toString(),
-        String.valueOf(connector.getCreationTime()));
+        String.valueOf(sink.getCreationTime()));
     metricService.remove(
         MetricType.RATE,
         Metric.PIPE_CONNECTOR_HEARTBEAT_TRANSFER.toString(),
         Tag.NAME.toString(),
-        connector.getAttributeSortedString(),
+        sink.getAttributeSortedString(),
         Tag.INDEX.toString(),
-        String.valueOf(connector.getConnectorIndex()),
+        String.valueOf(sink.getSinkIndex()),
         Tag.CREATION_TIME.toString(),
-        String.valueOf(connector.getCreationTime()));
+        String.valueOf(sink.getCreationTime()));
     tabletRateMap.remove(taskID);
     tsFileRateMap.remove(taskID);
     pipeHeartbeatRateMap.remove(taskID);
   }
 
   private void removeTimer(final String taskID) {
-    final PipeSinkSubtask connector = connectorMap.get(taskID);
+    final PipeSinkSubtask sink = sinkMap.get(taskID);
     metricService.remove(
         MetricType.TIMER,
         Metric.PIPE_COMPRESSION_TIME.toString(),
         Tag.NAME.toString(),
-        connector.getAttributeSortedString(),
+        sink.getAttributeSortedString(),
         Tag.CREATION_TIME.toString(),
-        String.valueOf(connector.getCreationTime()));
-    compressionTimerMap.remove(connector.getAttributeSortedString());
+        String.valueOf(sink.getCreationTime()));
+    compressionTimerMap.remove(sink.getAttributeSortedString());
   }
 
   private void removeHistogram(final String taskID) {
-    final PipeSinkSubtask connector = connectorMap.get(taskID);
+    final PipeSinkSubtask sink = sinkMap.get(taskID);
     metricService.remove(
         MetricType.HISTOGRAM,
         Metric.PIPE_INSERT_NODE_BATCH_SIZE.toString(),
         Tag.NAME.toString(),
-        connector.getAttributeSortedString(),
+        sink.getAttributeSortedString(),
         Tag.CREATION_TIME.toString(),
-        String.valueOf(connector.getCreationTime()));
+        String.valueOf(sink.getCreationTime()));
     metricService.remove(
         MetricType.HISTOGRAM,
         Metric.PIPE_TSFILE_BATCH_SIZE.toString(),
         Tag.NAME.toString(),
-        connector.getAttributeSortedString(),
+        sink.getAttributeSortedString(),
         Tag.CREATION_TIME.toString(),
-        String.valueOf(connector.getCreationTime()));
+        String.valueOf(sink.getCreationTime()));
     metricService.remove(
         MetricType.HISTOGRAM,
         Metric.PIPE_INSERT_NODE_BATCH_TIME_COST.toString(),
         Tag.NAME.toString(),
-        connector.getAttributeSortedString(),
+        sink.getAttributeSortedString(),
         Tag.CREATION_TIME.toString(),
-        String.valueOf(connector.getCreationTime()));
+        String.valueOf(sink.getCreationTime()));
     metricService.remove(
         MetricType.HISTOGRAM,
         Metric.PIPE_TSFILE_BATCH_TIME_COST.toString(),
         Tag.NAME.toString(),
-        connector.getAttributeSortedString(),
+        sink.getAttributeSortedString(),
         Tag.CREATION_TIME.toString(),
-        String.valueOf(connector.getCreationTime()));
+        String.valueOf(sink.getCreationTime()));
 
     metricService.remove(
         MetricType.HISTOGRAM,
         Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
         Tag.NAME.toString(),
-        connector.getAttributeSortedString());
+        sink.getAttributeSortedString());
   }
 
   //////////////////////////// register & deregister (pipe integration) 
////////////////////////////
 
   public void register(final PipeSinkSubtask pipeSinkSubtask) {
     final String taskID = pipeSinkSubtask.getTaskID();
-    connectorMap.putIfAbsent(taskID, pipeSinkSubtask);
+    sinkMap.putIfAbsent(taskID, pipeSinkSubtask);
     if (Objects.nonNull(metricService)) {
       createMetrics(taskID);
     }
   }
 
   public void deregister(final String taskID) {
-    if (!connectorMap.containsKey(taskID)) {
+    if (!sinkMap.containsKey(taskID)) {
       LOGGER.warn(
-          "Failed to deregister pipe data region connector metrics, 
PipeConnectorSubtask({}) does not exist",
+          "Failed to deregister pipe data region sink metrics, 
PipeSinkSubtask({}) does not exist",
           taskID);
       return;
     }
     if (Objects.nonNull(metricService)) {
       removeMetrics(taskID);
     }
-    connectorMap.remove(taskID);
+    sinkMap.remove(taskID);
   }
 
   public void markTabletEvent(final String taskID) {
@@ -465,7 +464,7 @@ public class PipeDataRegionSinkMetrics implements 
IMetricSet {
     final Rate rate = tabletRateMap.get(taskID);
     if (rate == null) {
       LOGGER.info(
-          "Failed to mark pipe data region connector tablet event, 
PipeConnectorSubtask({}) does not exist",
+          "Failed to mark pipe data region sink tablet event, 
PipeSinkSubtask({}) does not exist",
           taskID);
       return;
     }
@@ -479,7 +478,7 @@ public class PipeDataRegionSinkMetrics implements 
IMetricSet {
     final Rate rate = tsFileRateMap.get(taskID);
     if (rate == null) {
       LOGGER.info(
-          "Failed to mark pipe data region connector tsfile event, 
PipeConnectorSubtask({}) does not exist",
+          "Failed to mark pipe data region sink tsfile event, 
PipeSinkSubtask({}) does not exist",
           taskID);
       return;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
index 56a90680185..2c2e860af7b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
@@ -278,6 +278,14 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
           Objects.isNull(sink.getValueName()) ? name : 
segments[segments.length - 1];
       final NodeId nodeId = newNodeId(currentFolder + nodeName);
       final UaVariableNode measurementNode;
+      final long utcTimestamp = 
timestampToUtc(timestamps.get(timestamps.size() > 1 ? i : 0));
+      final DataValue dataValue =
+          new DataValue(
+              new Variant(values.get(i)),
+              currentQuality,
+              new DateTime(utcTimestamp),
+              new DateTime());
+
       if (!getNodeManager().containsNode(nodeId)) {
         measurementNode =
             new UaVariableNode.UaVariableNodeBuilder(getNodeContext())
@@ -288,6 +296,7 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
                 .setDisplayName(LocalizedText.english(nodeName))
                 .setDataType(convertToOpcDataType(type))
                 .setTypeDefinition(Identifiers.BaseDataVariableType)
+                .setValue(dataValue)
                 .build();
         getNodeManager().addNode(measurementNode);
         if (Objects.nonNull(folderNode)) {
@@ -311,17 +320,11 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
                                 String.format("The Node %s does not exist.", 
nodeId)));
       }
 
-      final long utcTimestamp = 
timestampToUtc(timestamps.get(timestamps.size() > 1 ? i : 0));
       if (Objects.isNull(sink.getValueName())) {
         if (Objects.isNull(measurementNode.getValue())
-            || 
Objects.requireNonNull(measurementNode.getValue().getSourceTime()).getUtcTime()
-                < utcTimestamp) {
-          measurementNode.setValue(
-              new DataValue(
-                  new Variant(values.get(i)),
-                  currentQuality,
-                  new DateTime(utcTimestamp),
-                  new DateTime()));
+            || Objects.isNull(measurementNode.getValue().getSourceTime())
+            || measurementNode.getValue().getSourceTime().getUtcTime() < 
utcTimestamp) {
+          measurementNode.setValue(dataValue);
         }
       } else {
         valueNode = measurementNode;
@@ -331,8 +334,8 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
     }
     if (Objects.nonNull(valueNode)) {
       if (Objects.isNull(valueNode.getValue())
-          || 
Objects.requireNonNull(valueNode.getValue().getSourceTime()).getUtcTime()
-              < timestamp) {
+          || Objects.isNull(valueNode.getValue().getSourceTime())
+          || valueNode.getValue().getSourceTime().getUtcTime() < timestamp) {
         valueNode.setValue(
             new DataValue(
                 new Variant(value), currentQuality, new DateTime(timestamp), 
new DateTime()));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
index ec9afce7c6f..7e1e38e1490 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.commons.pipe.config.constant;
 
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 
 import com.github.luben.zstd.Zstd;
@@ -55,6 +56,13 @@ public class PipeSinkConstant {
   public static final String SINK_IOTDB_PARALLEL_TASKS_KEY = 
"sink.parallel.tasks";
   public static final int CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE =
       PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum();
+  public static final Set<String> SINGLE_THREAD_DEFAULT_SINK =
+      new HashSet<>(
+          Arrays.asList(
+              BuiltinPipePlugin.OPC_UA_SINK.getPipePluginName(),
+              BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName(),
+              BuiltinPipePlugin.OPC_DA_SINK.getPipePluginName(),
+              BuiltinPipePlugin.OPC_DA_CONNECTOR.getPipePluginName()));
 
   public static final String CONNECTOR_REALTIME_FIRST_KEY = 
"connector.realtime-first";
   public static final String SINK_REALTIME_FIRST_KEY = "sink.realtime-first";


Reply via email to