This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch client-opc
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/client-opc by this push:
new f78e6d78b39 fix
f78e6d78b39 is described below
commit f78e6d78b3908c9aa3dffd0c26f9c63d3417f1d9
Author: Caideyipi <[email protected]>
AuthorDate: Mon Dec 22 16:56:33 2025 +0800
fix
---
.../db/pipe/sink/protocol/opcua/OpcUaSink.java | 16 +++
.../protocol/opcua/client/IoTDBOpcUaClient.java | 147 ++++++++++++++++++++-
2 files changed, 162 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
index 6928385eca9..c2ca0341920 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
@@ -154,6 +154,22 @@ public class OpcUaSink implements PipeConnector {
Arrays.asList(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY),
Arrays.asList(CONNECTOR_IOTDB_USERNAME_KEY,
SINK_IOTDB_USERNAME_KEY),
false);
+
+ final PipeParameters parameters = validator.getParameters();
+ if (validator
+ .getParameters()
+ .hasAnyAttributes(CONNECTOR_OPC_UA_NODE_URL_KEY,
SINK_OPC_UA_NODE_URL_KEY)) {
+ validator.validate(
+ CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE::equals,
+ String.format(
+ "When the OPC UA sink points to an outer server or specifies
'with-quality', the %s or %s must be %s.",
+ CONNECTOR_OPC_UA_MODEL_KEY,
+ SINK_OPC_UA_MODEL_KEY,
+ CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE),
+ parameters.getStringOrDefault(
+ Arrays.asList(CONNECTOR_OPC_UA_MODEL_KEY, SINK_OPC_UA_MODEL_KEY),
+ CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE));
+ }
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
index b760785684b..cc6651338da 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
@@ -24,10 +24,31 @@ import
org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink;
import org.apache.tsfile.write.record.Tablet;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
+import org.eclipse.milo.opcua.sdk.core.AccessLevel;
+import org.eclipse.milo.opcua.sdk.core.ValueRanks;
+import org.eclipse.milo.opcua.stack.core.Identifiers;
import org.eclipse.milo.opcua.stack.core.UaException;
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
+import org.eclipse.milo.opcua.stack.core.types.builtin.ExtensionObject;
+import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
+import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
+import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
+import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
+import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesItem;
+import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesResponse;
+import org.eclipse.milo.opcua.stack.core.types.structured.DeleteNodesItem;
import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
+import org.eclipse.milo.opcua.stack.core.types.structured.ObjectAttributes;
+import org.eclipse.milo.opcua.stack.core.types.structured.VariableAttributes;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
@@ -54,7 +75,10 @@ public class IoTDBOpcUaClient {
client.connect().get();
}
- public void transfer(final Tablet tablet, final OpcUaSink sink) throws
UaException {}
+ // Only support tree model & client-server
+ public void transfer(final Tablet tablet, final OpcUaSink sink) throws
UaException {
+
+ }
/////////////////////////////// Getter ///////////////////////////////
@@ -73,4 +97,125 @@ public class IoTDBOpcUaClient {
IdentityProvider getIdentityProvider() {
return identityProvider;
}
+
+ public void runA(OpcUaClient client) throws Exception {
+ // synchronous connect
+ client.connect().get();
+ System.out.println("✅ 连接成功");
+
+ // 读取标签值c
+ NodeId nodeId = new NodeId(2, "root/sg/d1/s2");
+
+ // 1. 先读取当前值确认节点可访问
+ DataValue readValue = client.readValue(0, TimestampsToReturn.Both,
nodeId).get();
+ System.out.println("读取当前值: " + readValue.getValue().getValue());
+ System.out.println("读取状态: " + readValue.getStatusCode());
+
+ // 2. 尝试写入新值
+ Variant newValue = new Variant(42.0f);
+ DataValue writeValue = new DataValue(newValue, StatusCode.GOOD, new
DateTime(), new DateTime());
+
+ System.out.println("尝试写入值: " + newValue.getValue());
+
+ StatusCode writeStatus = client.writeValue(nodeId, writeValue).get();
+ System.out.println("写入状态: " + writeStatus);
+
+ client.deleteNodes(Collections.singletonList(new DeleteNodesItem(nodeId,
true)));
+
+ AddNodesResponse addStatus =
+ client
+ .addNodes(
+ Arrays.asList(
+ new AddNodesItem(
+ Identifiers.ObjectsFolder.expanded(),
+ Identifiers.Organizes,
+ new NodeId(2, "root").expanded(),
+ new QualifiedName(2, "root"),
+ NodeClass.Object,
+ ExtensionObject.encode(
+ client.getStaticSerializationContext(),
createFolder0Attributes()),
+ Identifiers.FolderType.expanded()),
+ new AddNodesItem(
+ new NodeId(2, "root").expanded(),
+ Identifiers.Organizes,
+ new NodeId(2, "root/sg").expanded(),
+ new QualifiedName(2, "sg"),
+ NodeClass.Object,
+ ExtensionObject.encode(
+ client.getStaticSerializationContext(),
createFolder1Attributes()),
+ Identifiers.FolderType.expanded()),
+ new AddNodesItem(
+ new NodeId(2, "root/sg").expanded(),
+ Identifiers.Organizes,
+ new NodeId(2, "root/sg/d1").expanded(),
+ new QualifiedName(2, "d2"),
+ NodeClass.Object,
+ ExtensionObject.encode(
+ client.getStaticSerializationContext(),
createFolder2Attributes()),
+ Identifiers.FolderType.expanded()),
+ new AddNodesItem(
+ new NodeId(2, "root/sg/d1").expanded(),
+ Identifiers.Organizes,
+ new NodeId(2, "root/sg/d1/s2").expanded(),
+ new QualifiedName(2, "s2"),
+ NodeClass.Variable,
+ ExtensionObject.encode(
+ client.getStaticSerializationContext(),
+ createPressureSensorAttributes()),
+ Identifiers.BaseDataVariableType.expanded())))
+ .get();
+ System.out.println("新增节点状态: " + addStatus);
+ client.disconnect().get();
+ }
+
+ public static VariableAttributes createPressureSensorAttributes() {
+ return new VariableAttributes(
+ Unsigned.uint(0xFFFF), // specifiedAttributes
+ LocalizedText.english("s2"),
+ LocalizedText.english("反应釜压力传感器"),
+ Unsigned.uint(0), // writeMask
+ Unsigned.uint(0), // userWriteMask
+ new Variant(101.3f), // 初始压力值 101.3 kPa
+ Identifiers.Float, // 浮点数类型
+ ValueRanks.Scalar, // 标量
+ null, // arrayDimensions
+ AccessLevel.toValue(AccessLevel.READ_WRITE),
+ AccessLevel.toValue(AccessLevel.READ_WRITE),
+ 500.0, // 500ms 采样间隔
+ false // 启用历史记录
+ );
+ }
+
+ public static ObjectAttributes createFolder0Attributes() {
+ return new ObjectAttributes(
+ Unsigned.uint(0xFFFF), // specifiedAttributes
+ LocalizedText.english("root"),
+ LocalizedText.english("反应釜压力传感器"),
+ Unsigned.uint(0), // writeMask
+ Unsigned.uint(0), // userWriteMask
+ null // 启用历史记录
+ );
+ }
+
+ public static ObjectAttributes createFolder1Attributes() {
+ return new ObjectAttributes(
+ Unsigned.uint(0xFFFF), // specifiedAttributes
+ LocalizedText.english("sg"),
+ LocalizedText.english("反应釜压力传感器"),
+ Unsigned.uint(0), // writeMask
+ Unsigned.uint(0), // userWriteMask
+ null // 启用历史记录
+ );
+ }
+
+ public static ObjectAttributes createFolder2Attributes() {
+ return new ObjectAttributes(
+ Unsigned.uint(0xFFFF), // specifiedAttributes
+ LocalizedText.english("d1"),
+ LocalizedText.english("反应釜压力传感器"),
+ Unsigned.uint(0), // writeMask
+ Unsigned.uint(0), // userWriteMask
+ null // 启用历史记录
+ );
+ }
}