This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch new_opc_server
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/new_opc_server by this push:
new 44861b24770 trial
44861b24770 is described below
commit 44861b2477090f2698f26337f6959b4f78c3880c
Author: Caideyipi <[email protected]>
AuthorDate: Thu Dec 4 11:42:35 2025 +0800
trial
---
.../java/org/apache/iotdb/opcua/ClientExample.java | 2 +-
.../java/org/apache/iotdb/opcua/ClientTest.java | 70 +++++++++++++++++++---
.../pipe/sink/protocol/opcua/OpcUaNameSpace.java | 2 +-
3 files changed, 65 insertions(+), 9 deletions(-)
diff --git
a/example/pipe-opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientExample.java
b/example/pipe-opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientExample.java
index 63240815a67..8403254dad7 100644
---
a/example/pipe-opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientExample.java
+++
b/example/pipe-opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientExample.java
@@ -31,7 +31,7 @@ import java.util.function.Predicate;
public interface ClientExample {
default String getEndpointUrl() {
- return "opc.tcp://localhost:49320";
+ return "opc.tcp://localhost:12686/iotdb";
}
default Predicate<EndpointDescription> endpointFilter() {
diff --git
a/example/pipe-opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientTest.java
b/example/pipe-opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientTest.java
index 797bb80e256..b31a1a298c8 100644
---
a/example/pipe-opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientTest.java
+++
b/example/pipe-opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientTest.java
@@ -19,18 +19,28 @@
package org.apache.iotdb.opcua;
+import io.netty.buffer.ByteBuf;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.sdk.core.AccessLevel;
+import org.eclipse.milo.opcua.sdk.core.ValueRanks;
import org.eclipse.milo.opcua.stack.core.Identifiers;
+import
org.eclipse.milo.opcua.stack.core.serialization.OpcUaBinaryStreamEncoder;
+import org.eclipse.milo.opcua.stack.core.types.builtin.ByteString;
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
+import org.eclipse.milo.opcua.stack.core.types.builtin.ExtensionObject;
+import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
+import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned;
import org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass;
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesItem;
import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesResponse;
import org.eclipse.milo.opcua.stack.core.types.structured.DeleteNodesItem;
+import org.eclipse.milo.opcua.stack.core.types.structured.VariableAttributes;
+import org.eclipse.milo.opcua.stack.core.util.BufferUtil;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
@@ -49,8 +59,8 @@ public class ClientTest implements ClientExample {
client.connect().get();
System.out.println("✅ 连接成功");
- // 读取标签值
- NodeId nodeId = new NodeId(2, "chan2.grass.glasia");
+ // 读取标签值c
+ NodeId nodeId = new NodeId(2, "root/sg/d1/s1");
// 1. 先读取当前值确认节点可访问
DataValue readValue = client.readValue(0, TimestampsToReturn.Both,
nodeId).get();
@@ -67,21 +77,67 @@ public class ClientTest implements ClientExample {
System.out.println("写入状态: " + writeStatus);
client.deleteNodes(Collections.singletonList(new DeleteNodesItem(nodeId,
true)));
+
+ final OpcUaBinaryStreamEncoder encoder =
+ new OpcUaBinaryStreamEncoder(client.getStaticSerializationContext());
+ final ByteBuf byteBuf = BufferUtil.pooledBuffer();
+ new VariableAttributes.Codec()
+ .encode(
+ client.getStaticSerializationContext(),
+ encoder.setBuffer(byteBuf),
+ createPressureSensorAttributes());
AddNodesResponse addStatus =
client
.addNodes(
Collections.singletonList(
new AddNodesItem(
- new NodeId(2, "chen.grass").expanded(),
+ new NodeId(2, "root/sg/d1").expanded(),
Identifiers.Organizes,
- new NodeId(2, "chen.grass.bishop").expanded(),
- new QualifiedName(2, "bishop"),
+ new NodeId(2, "root/sg/d1/s2").expanded(),
+ new QualifiedName(2, "s2"),
NodeClass.Variable,
- null,
+ new ExtensionObject(
+ convertByteBufToByteString(byteBuf),
+ Identifiers.OPCBinarySchema_TypeSystem),
Identifiers.BaseDataVariableType.expanded())))
.get();
System.out.println("新增节点状态: " + addStatus);
-
+ byteBuf.clear();
client.disconnect().get();
}
+
+ public static VariableAttributes createPressureSensorAttributes() {
+ return new VariableAttributes(
+ Unsigned.uint(0xFFFF), // specifiedAttributes
+ LocalizedText.english("s2"),
+ LocalizedText.english("反应釜压力传感器"),
+ Unsigned.uint(0), // writeMask
+ Unsigned.uint(0), // userWriteMask
+ new Variant(101.3), // 初始压力值 101.3 kPa
+ Identifiers.Float, // 浮点数类型
+ ValueRanks.Scalar, // 标量
+ null, // arrayDimensions
+ AccessLevel.toValue(AccessLevel.READ_WRITE),
+ AccessLevel.toValue(AccessLevel.READ_WRITE),
+ 500.0, // 500ms 采样间隔
+ false // 启用历史记录
+ );
+ }
+
+ // 方法1:将 ByteBuf 转换为 ByteString
+ public static ByteString convertByteBufToByteString(ByteBuf byteBuf) {
+ // 确保 ByteBuf 可读
+ if (byteBuf == null || byteBuf.readableBytes() == 0) {
+ return ByteString.NULL_VALUE; // 返回空 ByteString
+ }
+
+ // 创建与 ByteBuf 可读字节数相同的字节数组
+ byte[] bytes = new byte[byteBuf.readableBytes()];
+
+ // 将 ByteBuf 数据读取到字节数组
+ byteBuf.readBytes(bytes);
+
+ // 使用 ByteString.of() 创建 ByteString
+ return ByteString.of(bytes);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java
index e9e80f3c3c7..bf05d784278 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java
@@ -243,7 +243,7 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
new UaVariableNode.UaVariableNodeBuilder(getNodeContext())
.setNodeId(newNodeId(currentFolder + name))
.setAccessLevel(AccessLevel.READ_WRITE)
- .setUserAccessLevel(AccessLevel.READ_ONLY)
+ .setUserAccessLevel(AccessLevel.READ_WRITE)
.setBrowseName(newQualifiedName(name))
.setDisplayName(LocalizedText.english(name))
.setDataType(convertToOpcDataType(type))