This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch client-opc
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/client-opc by this push:
new 97897c39cf2 fix
97897c39cf2 is described below
commit 97897c39cf2a314094e115aba831eb5134419557
Author: Caideyipi <[email protected]>
AuthorDate: Mon Dec 22 18:18:34 2025 +0800
fix
---
.../db/pipe/sink/protocol/opcua/OpcUaSink.java | 8 +-
.../protocol/opcua/client/IoTDBOpcUaClient.java | 201 ++++++++-------------
.../sink/protocol/opcua/server/OpcUaNameSpace.java | 2 +-
3 files changed, 81 insertions(+), 130 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
index c2ca0341920..d3788f0d505 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
@@ -361,7 +361,13 @@ public class OpcUaSink implements PipeConnector {
transferByTablet(
tabletInsertionEvent,
LOGGER,
- (tablet, isTableModel) -> nameSpace.transfer(tablet, isTableModel,
this));
+ (tablet, isTableModel) -> {
+ if (Objects.nonNull(nameSpace)) {
+ nameSpace.transfer(tablet, isTableModel, this);
+ } else if (Objects.nonNull(client)) {
+ client.transfer(tablet, this);
+ }
+ });
}
public static void transferByTablet(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
index 6d9c6a71a7f..db9fd171dad 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
@@ -35,6 +35,7 @@ import org.eclipse.milo.opcua.stack.core.StatusCodes;
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
+import org.eclipse.milo.opcua.stack.core.types.builtin.ExpandedNodeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.ExtensionObject;
import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
@@ -43,25 +44,23 @@ import
org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned;
import org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass;
-import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesItem;
import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesResponse;
import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesResult;
-import org.eclipse.milo.opcua.stack.core.types.structured.DeleteNodesItem;
import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
import org.eclipse.milo.opcua.stack.core.types.structured.ObjectAttributes;
import org.eclipse.milo.opcua.stack.core.types.structured.VariableAttributes;
-import java.util.Arrays;
-import java.util.Collections;
+import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Predicate;
+import static
org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.convertToOpcDataType;
import static
org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.timestampToUtc;
public class IoTDBOpcUaClient {
-
+ private static final int NAME_SPACE_INDEX = 2;
private final String nodeUrl;
private final SecurityPolicy securityPolicy;
@@ -101,6 +100,7 @@ public class IoTDBOpcUaClient {
Object value = null;
long timestamp = 0;
NodeId nodeId = null;
+ NodeId opcDataType = null;
for (int i = 0; i < measurementSchemas.size(); ++i) {
if (Objects.isNull(values.get(i))) {
@@ -120,59 +120,21 @@ public class IoTDBOpcUaClient {
throw new UnsupportedOperationException(
"When the 'with-quality' mode is enabled, the measurement must be
either \"value-name\" or \"quality-name\"");
}
- nodeId = new NodeId(2, String.join("/", segments));
+ nodeId = new NodeId(NAME_SPACE_INDEX, String.join("/", segments));
final long utcTimestamp =
timestampToUtc(timestamps.get(timestamps.size() > 1 ? i : 0));
value = values.get(i);
timestamp = utcTimestamp;
+ opcDataType = convertToOpcDataType(type);
}
+ final Variant variant = new Variant(value);
final DataValue dataValue =
- new DataValue(new Variant(value), currentQuality, new
DateTime(timestamp), new DateTime());
+ new DataValue(variant, currentQuality, new DateTime(timestamp), new
DateTime());
StatusCode writeStatus = client.writeValue(nodeId, dataValue).get();
if (writeStatus.getValue() == StatusCodes.Bad_NodeIdUnknown) {
final AddNodesResponse addStatus =
- client
- .addNodes(
- Arrays.asList(
- new AddNodesItem(
- Identifiers.ObjectsFolder.expanded(),
- Identifiers.Organizes,
- new NodeId(2, "root").expanded(),
- new QualifiedName(2, "root"),
- NodeClass.Object,
- ExtensionObject.encode(
- client.getStaticSerializationContext(),
createFolder0Attributes()),
- Identifiers.FolderType.expanded()),
- new AddNodesItem(
- new NodeId(2, "root").expanded(),
- Identifiers.Organizes,
- new NodeId(2, "root/sg").expanded(),
- new QualifiedName(2, "sg"),
- NodeClass.Object,
- ExtensionObject.encode(
- client.getStaticSerializationContext(),
createFolder1Attributes()),
- Identifiers.FolderType.expanded()),
- new AddNodesItem(
- new NodeId(2, "root/sg").expanded(),
- Identifiers.Organizes,
- new NodeId(2, "root/sg/d1").expanded(),
- new QualifiedName(2, "d2"),
- NodeClass.Object,
- ExtensionObject.encode(
- client.getStaticSerializationContext(),
createFolder2Attributes()),
- Identifiers.FolderType.expanded()),
- new AddNodesItem(
- new NodeId(2, "root/sg/d1").expanded(),
- Identifiers.Organizes,
- new NodeId(2, "root/sg/d1/s2").expanded(),
- new QualifiedName(2, "s2"),
- NodeClass.Variable,
- ExtensionObject.encode(
- client.getStaticSerializationContext(),
- createPressureSensorAttributes()),
- Identifiers.BaseDataVariableType.expanded())))
- .get();
+ client.addNodes(getNodesToAdd(segments, opcDataType, variant)).get();
for (final AddNodesResult result : addStatus.getResults()) {
if (!result.getStatusCode().equals(StatusCode.GOOD)
&& !(result.getStatusCode().getValue() ==
StatusCodes.Bad_NodeIdExists)) {
@@ -193,6 +155,56 @@ public class IoTDBOpcUaClient {
}
}
+ public List<AddNodesItem> getNodesToAdd(
+ final String[] segments, final NodeId opcDataType, final Variant
initialValue) {
+ final List<AddNodesItem> addNodesItems = new ArrayList<>();
+ final StringBuilder sb = new StringBuilder(segments[0]);
+ ExpandedNodeId curNodeId = new NodeId(NAME_SPACE_INDEX,
segments[0]).expanded();
+ addNodesItems.add(
+ new AddNodesItem(
+ Identifiers.ObjectsFolder.expanded(),
+ Identifiers.Organizes,
+ curNodeId,
+ new QualifiedName(NAME_SPACE_INDEX, segments[0]),
+ NodeClass.Object,
+ ExtensionObject.encode(
+ client.getStaticSerializationContext(),
createFolderAttributes(segments[0])),
+ Identifiers.FolderType.expanded()));
+
+ // segments.length >= 3
+ for (int i = 1; i < segments.length - 1; ++i) {
+ sb.append("/").append(segments[i]);
+ final ExpandedNodeId nextId = new NodeId(NAME_SPACE_INDEX,
sb.toString()).expanded();
+ addNodesItems.add(
+ new AddNodesItem(
+ curNodeId,
+ Identifiers.Organizes,
+ nextId,
+ new QualifiedName(NAME_SPACE_INDEX, segments[i]),
+ NodeClass.Object,
+ ExtensionObject.encode(
+ client.getStaticSerializationContext(),
createFolderAttributes(segments[i])),
+ Identifiers.FolderType.expanded()));
+ curNodeId = nextId;
+ }
+
+ final String measurementName = segments[segments.length - 1];
+ sb.append("/").append(measurementName);
+ addNodesItems.add(
+ new AddNodesItem(
+ curNodeId,
+ Identifiers.Organizes,
+ new NodeId(NAME_SPACE_INDEX, sb.toString()).expanded(),
+ new QualifiedName(NAME_SPACE_INDEX, measurementName),
+ NodeClass.Variable,
+ ExtensionObject.encode(
+ client.getStaticSerializationContext(),
+ createMeasurementAttributes(measurementName, opcDataType,
initialValue)),
+ Identifiers.BaseDataVariableType.expanded()));
+
+ return addNodesItems;
+ }
+
/////////////////////////////// Getter ///////////////////////////////
String getNodeUrl() {
@@ -211,99 +223,32 @@ public class IoTDBOpcUaClient {
return identityProvider;
}
- public void runA(OpcUaClient client) throws Exception {
- // synchronous connect
- client.connect().get();
- System.out.println("✅ 连接成功");
-
- // 读取标签值c
- NodeId nodeId = new NodeId(2, "root/sg/d1/s2");
-
- // 1. 先读取当前值确认节点可访问
- DataValue readValue = client.readValue(0, TimestampsToReturn.Both,
nodeId).get();
- System.out.println("读取当前值: " + readValue.getValue().getValue());
- System.out.println("读取状态: " + readValue.getStatusCode());
+ /////////////////////////////// Attribute creator
///////////////////////////////
- // 2. 尝试写入新值
- Variant newValue = new Variant(42.0f);
- DataValue writeValue = new DataValue(newValue, StatusCode.GOOD, new
DateTime(), new DateTime());
-
- System.out.println("尝试写入值: " + newValue.getValue());
-
- StatusCode writeStatus = client.writeValue(nodeId, writeValue).get();
- System.out.println("写入状态: " + writeStatus);
-
- client.deleteNodes(Collections.singletonList(new DeleteNodesItem(nodeId,
true)));
-
- AddNodesResponse addStatus =
- client
- .addNodes(
- Arrays.asList(
- new AddNodesItem(
- Identifiers.ObjectsFolder.expanded(),
- Identifiers.Organizes,
- new NodeId(2, "root").expanded(),
- new QualifiedName(2, "root"),
- NodeClass.Object,
- ExtensionObject.encode(
- client.getStaticSerializationContext(),
createFolder0Attributes()),
- Identifiers.FolderType.expanded()),
- new AddNodesItem(
- new NodeId(2, "root").expanded(),
- Identifiers.Organizes,
- new NodeId(2, "root/sg").expanded(),
- new QualifiedName(2, "sg"),
- NodeClass.Object,
- ExtensionObject.encode(
- client.getStaticSerializationContext(),
createFolder1Attributes()),
- Identifiers.FolderType.expanded()),
- new AddNodesItem(
- new NodeId(2, "root/sg").expanded(),
- Identifiers.Organizes,
- new NodeId(2, "root/sg/d1").expanded(),
- new QualifiedName(2, "d2"),
- NodeClass.Object,
- ExtensionObject.encode(
- client.getStaticSerializationContext(),
createFolder2Attributes()),
- Identifiers.FolderType.expanded()),
- new AddNodesItem(
- new NodeId(2, "root/sg/d1").expanded(),
- Identifiers.Organizes,
- new NodeId(2, "root/sg/d1/s2").expanded(),
- new QualifiedName(2, "s2"),
- NodeClass.Variable,
- ExtensionObject.encode(
- client.getStaticSerializationContext(),
- createPressureSensorAttributes()),
- Identifiers.BaseDataVariableType.expanded())))
- .get();
- System.out.println("新增节点状态: " + addStatus);
- client.disconnect().get();
- }
-
- public static VariableAttributes createPressureSensorAttributes() {
+ public static VariableAttributes createMeasurementAttributes(
+ final String name, final NodeId objectType, final Variant initialValue) {
return new VariableAttributes(
Unsigned.uint(0xFFFF), // specifiedAttributes
- LocalizedText.english("s2"),
- LocalizedText.english("反应釜压力传感器"),
+ LocalizedText.english(name),
+ LocalizedText.english(name),
Unsigned.uint(0), // writeMask
Unsigned.uint(0), // userWriteMask
- new Variant(101.3f), // 初始压力值 101.3 kPa
- Identifiers.Float, // 浮点数类型
- ValueRanks.Scalar, // 标量
+ new Variant(initialValue),
+ objectType,
+ ValueRanks.Scalar,
null, // arrayDimensions
AccessLevel.toValue(AccessLevel.READ_WRITE),
AccessLevel.toValue(AccessLevel.READ_WRITE),
- 500.0, // 500ms 采样间隔
- false // 启用历史记录
+ 500.0, // samplingInterval
+ false // historizing
);
}
- public static ObjectAttributes createFolder0Attributes() {
+ public static ObjectAttributes createFolderAttributes(final String name) {
return new ObjectAttributes(
Unsigned.uint(0xFFFF), // specifiedAttributes
- LocalizedText.english("root"),
- LocalizedText.english("反应釜压力传感器"),
+ LocalizedText.english(name),
+ LocalizedText.english(name),
Unsigned.uint(0), // writeMask
Unsigned.uint(0), // userWriteMask
null // 启用历史记录
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
index ddd94f84c0a..ddb8e161d08 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
@@ -490,7 +490,7 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
eventNode.delete();
}
- private NodeId convertToOpcDataType(final TSDataType type) {
+ public static NodeId convertToOpcDataType(final TSDataType type) {
switch (type) {
case BOOLEAN:
return Identifiers.Boolean;