This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch client-opc
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/client-opc by this push:
     new f78e6d78b39 fix
f78e6d78b39 is described below

commit f78e6d78b3908c9aa3dffd0c26f9c63d3417f1d9
Author: Caideyipi <[email protected]>
AuthorDate: Mon Dec 22 16:56:33 2025 +0800

    fix
---
 .../db/pipe/sink/protocol/opcua/OpcUaSink.java     |  16 +++
 .../protocol/opcua/client/IoTDBOpcUaClient.java    | 147 ++++++++++++++++++++-
 2 files changed, 162 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
index 6928385eca9..c2ca0341920 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
@@ -154,6 +154,22 @@ public class OpcUaSink implements PipeConnector {
             Arrays.asList(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY),
             Arrays.asList(CONNECTOR_IOTDB_USERNAME_KEY, 
SINK_IOTDB_USERNAME_KEY),
             false);
+
+    final PipeParameters parameters = validator.getParameters();
+    if (validator
+        .getParameters()
+        .hasAnyAttributes(CONNECTOR_OPC_UA_NODE_URL_KEY, 
SINK_OPC_UA_NODE_URL_KEY)) {
+      validator.validate(
+          CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE::equals,
+          String.format(
+              "When the OPC UA sink points to an outer server or specifies 
'with-quality', the %s or %s must be %s.",
+              CONNECTOR_OPC_UA_MODEL_KEY,
+              SINK_OPC_UA_MODEL_KEY,
+              CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE),
+          parameters.getStringOrDefault(
+              Arrays.asList(CONNECTOR_OPC_UA_MODEL_KEY, SINK_OPC_UA_MODEL_KEY),
+              CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE));
+    }
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
index b760785684b..cc6651338da 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
@@ -24,10 +24,31 @@ import 
org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink;
 import org.apache.tsfile.write.record.Tablet;
 import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
 import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
+import org.eclipse.milo.opcua.sdk.core.AccessLevel;
+import org.eclipse.milo.opcua.sdk.core.ValueRanks;
+import org.eclipse.milo.opcua.stack.core.Identifiers;
 import org.eclipse.milo.opcua.stack.core.UaException;
 import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
+import org.eclipse.milo.opcua.stack.core.types.builtin.ExtensionObject;
+import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
+import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
+import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
+import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
+import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesItem;
+import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesResponse;
+import org.eclipse.milo.opcua.stack.core.types.structured.DeleteNodesItem;
 import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
+import org.eclipse.milo.opcua.stack.core.types.structured.ObjectAttributes;
+import org.eclipse.milo.opcua.stack.core.types.structured.VariableAttributes;
 
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Predicate;
 
@@ -54,7 +75,10 @@ public class IoTDBOpcUaClient {
     client.connect().get();
   }
 
-  public void transfer(final Tablet tablet, final OpcUaSink sink) throws 
UaException {}
+  // Only support tree model & client-server
+  public void transfer(final Tablet tablet, final OpcUaSink sink) throws 
UaException {
+
+  }
 
   /////////////////////////////// Getter ///////////////////////////////
 
@@ -73,4 +97,125 @@ public class IoTDBOpcUaClient {
   IdentityProvider getIdentityProvider() {
     return identityProvider;
   }
+
+  public void runA(OpcUaClient client) throws Exception {
+    // synchronous connect
+    client.connect().get();
+    System.out.println("✅ 连接成功");
+
+    // 读取标签值c
+    NodeId nodeId = new NodeId(2, "root/sg/d1/s2");
+
+    // 1. 先读取当前值确认节点可访问
+    DataValue readValue = client.readValue(0, TimestampsToReturn.Both, 
nodeId).get();
+    System.out.println("读取当前值: " + readValue.getValue().getValue());
+    System.out.println("读取状态: " + readValue.getStatusCode());
+
+    // 2. 尝试写入新值
+    Variant newValue = new Variant(42.0f);
+    DataValue writeValue = new DataValue(newValue, StatusCode.GOOD, new 
DateTime(), new DateTime());
+
+    System.out.println("尝试写入值: " + newValue.getValue());
+
+    StatusCode writeStatus = client.writeValue(nodeId, writeValue).get();
+    System.out.println("写入状态: " + writeStatus);
+
+    client.deleteNodes(Collections.singletonList(new DeleteNodesItem(nodeId, 
true)));
+
+    AddNodesResponse addStatus =
+        client
+            .addNodes(
+                Arrays.asList(
+                    new AddNodesItem(
+                        Identifiers.ObjectsFolder.expanded(),
+                        Identifiers.Organizes,
+                        new NodeId(2, "root").expanded(),
+                        new QualifiedName(2, "root"),
+                        NodeClass.Object,
+                        ExtensionObject.encode(
+                            client.getStaticSerializationContext(), 
createFolder0Attributes()),
+                        Identifiers.FolderType.expanded()),
+                    new AddNodesItem(
+                        new NodeId(2, "root").expanded(),
+                        Identifiers.Organizes,
+                        new NodeId(2, "root/sg").expanded(),
+                        new QualifiedName(2, "sg"),
+                        NodeClass.Object,
+                        ExtensionObject.encode(
+                            client.getStaticSerializationContext(), 
createFolder1Attributes()),
+                        Identifiers.FolderType.expanded()),
+                    new AddNodesItem(
+                        new NodeId(2, "root/sg").expanded(),
+                        Identifiers.Organizes,
+                        new NodeId(2, "root/sg/d1").expanded(),
+                        new QualifiedName(2, "d2"),
+                        NodeClass.Object,
+                        ExtensionObject.encode(
+                            client.getStaticSerializationContext(), 
createFolder2Attributes()),
+                        Identifiers.FolderType.expanded()),
+                    new AddNodesItem(
+                        new NodeId(2, "root/sg/d1").expanded(),
+                        Identifiers.Organizes,
+                        new NodeId(2, "root/sg/d1/s2").expanded(),
+                        new QualifiedName(2, "s2"),
+                        NodeClass.Variable,
+                        ExtensionObject.encode(
+                            client.getStaticSerializationContext(),
+                            createPressureSensorAttributes()),
+                        Identifiers.BaseDataVariableType.expanded())))
+            .get();
+    System.out.println("新增节点状态: " + addStatus);
+    client.disconnect().get();
+  }
+
+  public static VariableAttributes createPressureSensorAttributes() {
+    return new VariableAttributes(
+        Unsigned.uint(0xFFFF), // specifiedAttributes
+        LocalizedText.english("s2"),
+        LocalizedText.english("反应釜压力传感器"),
+        Unsigned.uint(0), // writeMask
+        Unsigned.uint(0), // userWriteMask
+        new Variant(101.3f), // 初始压力值 101.3 kPa
+        Identifiers.Float, // 浮点数类型
+        ValueRanks.Scalar, // 标量
+        null, // arrayDimensions
+        AccessLevel.toValue(AccessLevel.READ_WRITE),
+        AccessLevel.toValue(AccessLevel.READ_WRITE),
+        500.0, // 500ms 采样间隔
+        false // 启用历史记录
+        );
+  }
+
+  public static ObjectAttributes createFolder0Attributes() {
+    return new ObjectAttributes(
+        Unsigned.uint(0xFFFF), // specifiedAttributes
+        LocalizedText.english("root"),
+        LocalizedText.english("反应釜压力传感器"),
+        Unsigned.uint(0), // writeMask
+        Unsigned.uint(0), // userWriteMask
+        null // 启用历史记录
+        );
+  }
+
+  public static ObjectAttributes createFolder1Attributes() {
+    return new ObjectAttributes(
+        Unsigned.uint(0xFFFF), // specifiedAttributes
+        LocalizedText.english("sg"),
+        LocalizedText.english("反应釜压力传感器"),
+        Unsigned.uint(0), // writeMask
+        Unsigned.uint(0), // userWriteMask
+        null // 启用历史记录
+        );
+  }
+
+  public static ObjectAttributes createFolder2Attributes() {
+    return new ObjectAttributes(
+        Unsigned.uint(0xFFFF), // specifiedAttributes
+        LocalizedText.english("d1"),
+        LocalizedText.english("反应釜压力传感器"),
+        Unsigned.uint(0), // writeMask
+        Unsigned.uint(0), // userWriteMask
+        null // 启用历史记录
+        );
+  }
 }

Reply via email to