This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 5c4853bdae8 fix (#17533)
5c4853bdae8 is described below

commit 5c4853bdae838fc8fc697ee802975b5610307dc1
Author: Caideyipi <[email protected]>
AuthorDate: Wed Apr 22 17:23:44 2026 +0800

    fix (#17533)
---
 .../protocol/opcua/client/IoTDBOpcUaClient.java    | 65 +++++++++++++++++-----
 1 file changed, 50 insertions(+), 15 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
index 2019c0fe833..7595b75747e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink;
 import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
+import org.apache.tsfile.common.constant.TsFileConstant;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.write.record.Tablet;
 import org.apache.tsfile.write.schema.MeasurementSchema;
@@ -56,9 +57,10 @@ import 
org.eclipse.milo.opcua.stack.core.types.structured.VariableAttributes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.nio.file.Paths;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.ExecutionException;
@@ -130,7 +132,6 @@ public class IoTDBOpcUaClient {
     StatusCode currentQuality = sink.getDefaultQuality();
     Object value = null;
     long timestamp = 0;
-    NodeId nodeId = null;
     NodeId opcDataType = null;
 
     for (int i = 0; i < measurementSchemas.size(); ++i) {
@@ -153,17 +154,43 @@ public class IoTDBOpcUaClient {
             "When the 'with-quality' mode is enabled, the measurement must be 
either \"value-name\" or \"quality-name\"");
         continue;
       }
-      nodeId = new NodeId(NAME_SPACE_INDEX, String.join("/", segments));
 
       final long utcTimestamp = 
timestampToUtc(timestamps.get(timestamps.size() > 1 ? i : 0));
-      value = values.get(i);
-      timestamp = utcTimestamp;
-      opcDataType = convertToOpcDataType(type);
+      if (Objects.isNull(sink.getValueName())) {
+        writeValue(
+            values.get(i),
+            utcTimestamp,
+            convertToOpcDataType(type),
+            currentQuality,
+            segments,
+            name);
+      } else {
+        value = values.get(i);
+        timestamp = utcTimestamp;
+        opcDataType = convertToOpcDataType(type);
+      }
     }
     if (Objects.isNull(value)) {
       return;
     }
 
+    writeValue(value, timestamp, opcDataType, currentQuality, segments, null);
+  }
+
+  private void writeValue(
+      final Object value,
+      final long timestamp,
+      final NodeId opcDataType,
+      final StatusCode currentQuality,
+      final String[] segments,
+      final @Nullable String name)
+      throws Exception {
+    final NodeId nodeId =
+        new NodeId(
+            NAME_SPACE_INDEX,
+            Objects.nonNull(name)
+                ? String.join("/", segments) + "/" + name
+                : String.join("/", segments));
     final Variant variant = new Variant(value);
     final DataValue dataValue =
         new DataValue(variant, currentQuality, new DateTime(timestamp), new 
DateTime());
@@ -171,36 +198,41 @@ public class IoTDBOpcUaClient {
 
     if (writeStatus.getValue() == StatusCodes.Bad_NodeIdUnknown) {
       final AddNodesResponse addStatus =
-          client.addNodes(getNodesToAdd(segments, opcDataType, variant)).get();
+          client.addNodes(getNodesToAdd(segments, name, opcDataType, 
variant)).get();
       for (final AddNodesResult result : addStatus.getResults()) {
         if (!result.getStatusCode().equals(StatusCode.GOOD)
             && !(result.getStatusCode().getValue() == 
StatusCodes.Bad_NodeIdExists)) {
           throw new PipeException(
               "Failed to create nodes after transfer data value, creation 
status: "
                   + addStatus
-                  + getErrorString(segments, opcDataType, value, writeStatus));
+                  + getErrorString(segments, name, opcDataType, value, 
writeStatus));
         }
       }
       writeStatus = client.writeValue(nodeId, dataValue).get();
       if (writeStatus.getValue() != StatusCode.GOOD.getValue()) {
         throw new PipeException(
             "Failed to transfer dataValue after successfully created nodes"
-                + getErrorString(segments, opcDataType, value, writeStatus));
+                + getErrorString(segments, name, opcDataType, value, 
writeStatus));
       }
     } else if (writeStatus.getValue() != StatusCode.GOOD.getValue()) {
       throw new PipeException(
           "Failed to transfer dataValue"
-              + getErrorString(segments, opcDataType, value, writeStatus));
+              + getErrorString(segments, name, opcDataType, value, 
writeStatus));
     }
   }
 
   private static String getErrorString(
       final String[] segments,
+      final @Nullable String name,
       final NodeId dataType,
       final Object value,
       final StatusCode writeStatus) {
-    return ", segments: "
-        + Arrays.toString(segments)
+    return ", measurement: "
+        + (Objects.nonNull(name)
+            ? String.join(TsFileConstant.PATH_SEPARATOR, segments)
+                + TsFileConstant.PATH_SEPARATOR
+                + name
+            : String.join(TsFileConstant.PATH_SEPARATOR, segments))
         + ", dataType: "
         + dataType
         + ", value: "
@@ -210,7 +242,10 @@ public class IoTDBOpcUaClient {
   }
 
   public List<AddNodesItem> getNodesToAdd(
-      final String[] segments, final NodeId opcDataType, final Variant 
initialValue) {
+      final String[] segments,
+      final @Nullable String name,
+      final NodeId opcDataType,
+      final Variant initialValue) {
     final List<AddNodesItem> addNodesItems = new ArrayList<>();
     final StringBuilder sb = new StringBuilder(segments[0]);
     ExpandedNodeId curNodeId = new NodeId(NAME_SPACE_INDEX, 
segments[0]).expanded();
@@ -226,7 +261,7 @@ public class IoTDBOpcUaClient {
             Identifiers.FolderType.expanded()));
 
     // segments.length >= 3
-    for (int i = 1; i < segments.length - 1; ++i) {
+    for (int i = 1; i < (Objects.nonNull(name) ? segments.length : 
segments.length - 1); ++i) {
       sb.append("/").append(segments[i]);
       final ExpandedNodeId nextId = new NodeId(NAME_SPACE_INDEX, 
sb.toString()).expanded();
       addNodesItems.add(
@@ -242,7 +277,7 @@ public class IoTDBOpcUaClient {
       curNodeId = nextId;
     }
 
-    final String measurementName = segments[segments.length - 1];
+    final String measurementName = Objects.nonNull(name) ? name : 
segments[segments.length - 1];
     sb.append("/").append(measurementName);
     addNodesItems.add(
         new AddNodesItem(

Reply via email to