This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch client-opc
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/client-opc by this push:
     new 97897c39cf2 fix
97897c39cf2 is described below

commit 97897c39cf2a314094e115aba831eb5134419557
Author: Caideyipi <[email protected]>
AuthorDate: Mon Dec 22 18:18:34 2025 +0800

    fix
---
 .../db/pipe/sink/protocol/opcua/OpcUaSink.java     |   8 +-
 .../protocol/opcua/client/IoTDBOpcUaClient.java    | 201 ++++++++-------------
 .../sink/protocol/opcua/server/OpcUaNameSpace.java |   2 +-
 3 files changed, 81 insertions(+), 130 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
index c2ca0341920..d3788f0d505 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
@@ -361,7 +361,13 @@ public class OpcUaSink implements PipeConnector {
     transferByTablet(
         tabletInsertionEvent,
         LOGGER,
-        (tablet, isTableModel) -> nameSpace.transfer(tablet, isTableModel, 
this));
+        (tablet, isTableModel) -> {
+          if (Objects.nonNull(nameSpace)) {
+            nameSpace.transfer(tablet, isTableModel, this);
+          } else if (Objects.nonNull(client)) {
+            client.transfer(tablet, this);
+          }
+        });
   }
 
   public static void transferByTablet(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
index 6d9c6a71a7f..db9fd171dad 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
@@ -35,6 +35,7 @@ import org.eclipse.milo.opcua.stack.core.StatusCodes;
 import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
 import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
 import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
+import org.eclipse.milo.opcua.stack.core.types.builtin.ExpandedNodeId;
 import org.eclipse.milo.opcua.stack.core.types.builtin.ExtensionObject;
 import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
 import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
@@ -43,25 +44,23 @@ import 
org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
 import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
 import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned;
 import org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass;
-import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
 import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesItem;
 import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesResponse;
 import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesResult;
-import org.eclipse.milo.opcua.stack.core.types.structured.DeleteNodesItem;
 import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
 import org.eclipse.milo.opcua.stack.core.types.structured.ObjectAttributes;
 import org.eclipse.milo.opcua.stack.core.types.structured.VariableAttributes;
 
-import java.util.Arrays;
-import java.util.Collections;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 import java.util.function.Predicate;
 
+import static 
org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.convertToOpcDataType;
 import static 
org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.timestampToUtc;
 
 public class IoTDBOpcUaClient {
-
+  private static final int NAME_SPACE_INDEX = 2;
   private final String nodeUrl;
 
   private final SecurityPolicy securityPolicy;
@@ -101,6 +100,7 @@ public class IoTDBOpcUaClient {
     Object value = null;
     long timestamp = 0;
     NodeId nodeId = null;
+    NodeId opcDataType = null;
 
     for (int i = 0; i < measurementSchemas.size(); ++i) {
       if (Objects.isNull(values.get(i))) {
@@ -120,59 +120,21 @@ public class IoTDBOpcUaClient {
         throw new UnsupportedOperationException(
             "When the 'with-quality' mode is enabled, the measurement must be 
either \"value-name\" or \"quality-name\"");
       }
-      nodeId = new NodeId(2, String.join("/", segments));
+      nodeId = new NodeId(NAME_SPACE_INDEX, String.join("/", segments));
 
       final long utcTimestamp = 
timestampToUtc(timestamps.get(timestamps.size() > 1 ? i : 0));
       value = values.get(i);
       timestamp = utcTimestamp;
+      opcDataType = convertToOpcDataType(type);
     }
+    final Variant variant = new Variant(value);
     final DataValue dataValue =
-        new DataValue(new Variant(value), currentQuality, new 
DateTime(timestamp), new DateTime());
+        new DataValue(variant, currentQuality, new DateTime(timestamp), new 
DateTime());
     StatusCode writeStatus = client.writeValue(nodeId, dataValue).get();
 
     if (writeStatus.getValue() == StatusCodes.Bad_NodeIdUnknown) {
       final AddNodesResponse addStatus =
-          client
-              .addNodes(
-                  Arrays.asList(
-                      new AddNodesItem(
-                          Identifiers.ObjectsFolder.expanded(),
-                          Identifiers.Organizes,
-                          new NodeId(2, "root").expanded(),
-                          new QualifiedName(2, "root"),
-                          NodeClass.Object,
-                          ExtensionObject.encode(
-                              client.getStaticSerializationContext(), 
createFolder0Attributes()),
-                          Identifiers.FolderType.expanded()),
-                      new AddNodesItem(
-                          new NodeId(2, "root").expanded(),
-                          Identifiers.Organizes,
-                          new NodeId(2, "root/sg").expanded(),
-                          new QualifiedName(2, "sg"),
-                          NodeClass.Object,
-                          ExtensionObject.encode(
-                              client.getStaticSerializationContext(), 
createFolder1Attributes()),
-                          Identifiers.FolderType.expanded()),
-                      new AddNodesItem(
-                          new NodeId(2, "root/sg").expanded(),
-                          Identifiers.Organizes,
-                          new NodeId(2, "root/sg/d1").expanded(),
-                          new QualifiedName(2, "d2"),
-                          NodeClass.Object,
-                          ExtensionObject.encode(
-                              client.getStaticSerializationContext(), 
createFolder2Attributes()),
-                          Identifiers.FolderType.expanded()),
-                      new AddNodesItem(
-                          new NodeId(2, "root/sg/d1").expanded(),
-                          Identifiers.Organizes,
-                          new NodeId(2, "root/sg/d1/s2").expanded(),
-                          new QualifiedName(2, "s2"),
-                          NodeClass.Variable,
-                          ExtensionObject.encode(
-                              client.getStaticSerializationContext(),
-                              createPressureSensorAttributes()),
-                          Identifiers.BaseDataVariableType.expanded())))
-              .get();
+          client.addNodes(getNodesToAdd(segments, opcDataType, variant)).get();
       for (final AddNodesResult result : addStatus.getResults()) {
         if (!result.getStatusCode().equals(StatusCode.GOOD)
             && !(result.getStatusCode().getValue() == 
StatusCodes.Bad_NodeIdExists)) {
@@ -193,6 +155,56 @@ public class IoTDBOpcUaClient {
     }
   }
 
+  public List<AddNodesItem> getNodesToAdd(
+      final String[] segments, final NodeId opcDataType, final Variant 
initialValue) {
+    final List<AddNodesItem> addNodesItems = new ArrayList<>();
+    final StringBuilder sb = new StringBuilder(segments[0]);
+    ExpandedNodeId curNodeId = new NodeId(NAME_SPACE_INDEX, 
segments[0]).expanded();
+    addNodesItems.add(
+        new AddNodesItem(
+            Identifiers.ObjectsFolder.expanded(),
+            Identifiers.Organizes,
+            curNodeId,
+            new QualifiedName(NAME_SPACE_INDEX, segments[0]),
+            NodeClass.Object,
+            ExtensionObject.encode(
+                client.getStaticSerializationContext(), 
createFolderAttributes(segments[0])),
+            Identifiers.FolderType.expanded()));
+
+    // segments.length >= 3
+    for (int i = 1; i < segments.length - 1; ++i) {
+      sb.append("/").append(segments[i]);
+      final ExpandedNodeId nextId = new NodeId(NAME_SPACE_INDEX, 
sb.toString()).expanded();
+      addNodesItems.add(
+          new AddNodesItem(
+              curNodeId,
+              Identifiers.Organizes,
+              nextId,
+              new QualifiedName(NAME_SPACE_INDEX, segments[i]),
+              NodeClass.Object,
+              ExtensionObject.encode(
+                  client.getStaticSerializationContext(), 
createFolderAttributes(segments[i])),
+              Identifiers.FolderType.expanded()));
+      curNodeId = nextId;
+    }
+
+    final String measurementName = segments[segments.length - 1];
+    sb.append("/").append(measurementName);
+    addNodesItems.add(
+        new AddNodesItem(
+            curNodeId,
+            Identifiers.Organizes,
+            new NodeId(NAME_SPACE_INDEX, sb.toString()).expanded(),
+            new QualifiedName(NAME_SPACE_INDEX, measurementName),
+            NodeClass.Variable,
+            ExtensionObject.encode(
+                client.getStaticSerializationContext(),
+                createMeasurementAttributes(measurementName, opcDataType, 
initialValue)),
+            Identifiers.BaseDataVariableType.expanded()));
+
+    return addNodesItems;
+  }
+
   /////////////////////////////// Getter ///////////////////////////////
 
   String getNodeUrl() {
@@ -211,99 +223,32 @@ public class IoTDBOpcUaClient {
     return identityProvider;
   }
 
-  public void runA(OpcUaClient client) throws Exception {
-    // synchronous connect
-    client.connect().get();
-    System.out.println("✅ 连接成功");
-
-    // 读取标签值c
-    NodeId nodeId = new NodeId(2, "root/sg/d1/s2");
-
-    // 1. 先读取当前值确认节点可访问
-    DataValue readValue = client.readValue(0, TimestampsToReturn.Both, 
nodeId).get();
-    System.out.println("读取当前值: " + readValue.getValue().getValue());
-    System.out.println("读取状态: " + readValue.getStatusCode());
+  /////////////////////////////// Attribute creator 
///////////////////////////////
 
-    // 2. 尝试写入新值
-    Variant newValue = new Variant(42.0f);
-    DataValue writeValue = new DataValue(newValue, StatusCode.GOOD, new 
DateTime(), new DateTime());
-
-    System.out.println("尝试写入值: " + newValue.getValue());
-
-    StatusCode writeStatus = client.writeValue(nodeId, writeValue).get();
-    System.out.println("写入状态: " + writeStatus);
-
-    client.deleteNodes(Collections.singletonList(new DeleteNodesItem(nodeId, 
true)));
-
-    AddNodesResponse addStatus =
-        client
-            .addNodes(
-                Arrays.asList(
-                    new AddNodesItem(
-                        Identifiers.ObjectsFolder.expanded(),
-                        Identifiers.Organizes,
-                        new NodeId(2, "root").expanded(),
-                        new QualifiedName(2, "root"),
-                        NodeClass.Object,
-                        ExtensionObject.encode(
-                            client.getStaticSerializationContext(), 
createFolder0Attributes()),
-                        Identifiers.FolderType.expanded()),
-                    new AddNodesItem(
-                        new NodeId(2, "root").expanded(),
-                        Identifiers.Organizes,
-                        new NodeId(2, "root/sg").expanded(),
-                        new QualifiedName(2, "sg"),
-                        NodeClass.Object,
-                        ExtensionObject.encode(
-                            client.getStaticSerializationContext(), 
createFolder1Attributes()),
-                        Identifiers.FolderType.expanded()),
-                    new AddNodesItem(
-                        new NodeId(2, "root/sg").expanded(),
-                        Identifiers.Organizes,
-                        new NodeId(2, "root/sg/d1").expanded(),
-                        new QualifiedName(2, "d2"),
-                        NodeClass.Object,
-                        ExtensionObject.encode(
-                            client.getStaticSerializationContext(), 
createFolder2Attributes()),
-                        Identifiers.FolderType.expanded()),
-                    new AddNodesItem(
-                        new NodeId(2, "root/sg/d1").expanded(),
-                        Identifiers.Organizes,
-                        new NodeId(2, "root/sg/d1/s2").expanded(),
-                        new QualifiedName(2, "s2"),
-                        NodeClass.Variable,
-                        ExtensionObject.encode(
-                            client.getStaticSerializationContext(),
-                            createPressureSensorAttributes()),
-                        Identifiers.BaseDataVariableType.expanded())))
-            .get();
-    System.out.println("新增节点状态: " + addStatus);
-    client.disconnect().get();
-  }
-
-  public static VariableAttributes createPressureSensorAttributes() {
+  public static VariableAttributes createMeasurementAttributes(
+      final String name, final NodeId objectType, final Variant initialValue) {
     return new VariableAttributes(
         Unsigned.uint(0xFFFF), // specifiedAttributes
-        LocalizedText.english("s2"),
-        LocalizedText.english("反应釜压力传感器"),
+        LocalizedText.english(name),
+        LocalizedText.english(name),
         Unsigned.uint(0), // writeMask
         Unsigned.uint(0), // userWriteMask
-        new Variant(101.3f), // 初始压力值 101.3 kPa
-        Identifiers.Float, // 浮点数类型
-        ValueRanks.Scalar, // 标量
+        new Variant(initialValue),
+        objectType,
+        ValueRanks.Scalar,
         null, // arrayDimensions
         AccessLevel.toValue(AccessLevel.READ_WRITE),
         AccessLevel.toValue(AccessLevel.READ_WRITE),
-        500.0, // 500ms 采样间隔
-        false // 启用历史记录
+        500.0, // samplingInterval
+        false // historizing
         );
   }
 
-  public static ObjectAttributes createFolder0Attributes() {
+  public static ObjectAttributes createFolderAttributes(final String name) {
     return new ObjectAttributes(
         Unsigned.uint(0xFFFF), // specifiedAttributes
-        LocalizedText.english("root"),
-        LocalizedText.english("反应釜压力传感器"),
+        LocalizedText.english(name),
+        LocalizedText.english(name),
         Unsigned.uint(0), // writeMask
         Unsigned.uint(0), // userWriteMask
         null // 启用历史记录
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
index ddd94f84c0a..ddb8e161d08 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
@@ -490,7 +490,7 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
     eventNode.delete();
   }
 
-  private NodeId convertToOpcDataType(final TSDataType type) {
+  public static NodeId convertToOpcDataType(final TSDataType type) {
     switch (type) {
       case BOOLEAN:
         return Identifiers.Boolean;

Reply via email to