This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch client-opc
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/client-opc by this push:
new b8719251b4b framework
b8719251b4b is described below
commit b8719251b4b10f1734bb64e94889c23b27804894
Author: Caideyipi <[email protected]>
AuthorDate: Mon Dec 22 17:45:21 2025 +0800
framework
---
.../protocol/opcua/client/IoTDBOpcUaClient.java | 127 ++++++++++++++++++++-
.../sink/protocol/opcua/server/OpcUaNameSpace.java | 10 +-
2 files changed, 127 insertions(+), 10 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
index d7f02d1ddd0..6d9c6a71a7f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
@@ -20,14 +20,18 @@
package org.apache.iotdb.db.pipe.sink.protocol.opcua.client;
import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
import org.eclipse.milo.opcua.sdk.core.AccessLevel;
import org.eclipse.milo.opcua.sdk.core.ValueRanks;
import org.eclipse.milo.opcua.stack.core.Identifiers;
-import org.eclipse.milo.opcua.stack.core.UaException;
+import org.eclipse.milo.opcua.stack.core.StatusCodes;
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
@@ -42,6 +46,7 @@ import
org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass;
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesItem;
import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesResponse;
+import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesResult;
import org.eclipse.milo.opcua.stack.core.types.structured.DeleteNodesItem;
import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
import org.eclipse.milo.opcua.stack.core.types.structured.ObjectAttributes;
@@ -49,17 +54,19 @@ import
org.eclipse.milo.opcua.stack.core.types.structured.VariableAttributes;
import java.util.Arrays;
import java.util.Collections;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.List;
+import java.util.Objects;
import java.util.function.Predicate;
+import static
org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.timestampToUtc;
+
public class IoTDBOpcUaClient {
private final String nodeUrl;
private final SecurityPolicy securityPolicy;
private final IdentityProvider identityProvider;
-
- private final AtomicLong clientHandles = new AtomicLong(1L);
+ private OpcUaClient client;
public IoTDBOpcUaClient(
final String nodeUrl,
@@ -70,13 +77,121 @@ public class IoTDBOpcUaClient {
this.identityProvider = identityProvider;
}
- public void run(OpcUaClient client) throws Exception {
+ public void run(final OpcUaClient client) throws Exception {
// synchronous connect
+ this.client = client;
client.connect().get();
}
// Only support tree model & client-server
- public void transfer(final Tablet tablet, final OpcUaSink sink) throws
UaException {}
+ public void transfer(final Tablet tablet, final OpcUaSink sink) throws
Exception {
+ OpcUaNameSpace.transferTabletForClientServerModel(
+ tablet, false, sink, this::transferTabletRowForClientServerModel);
+ }
+
+ private void transferTabletRowForClientServerModel(
+ final String[] segments,
+ final List<IMeasurementSchema> measurementSchemas,
+ final List<Long> timestamps,
+ final List<Object> values,
+ final OpcUaSink sink)
+ throws Exception {
+ StatusCode currentQuality =
+ Objects.isNull(sink.getValueName()) ? StatusCode.GOOD :
StatusCode.UNCERTAIN;
+ Object value = null;
+ long timestamp = 0;
+ NodeId nodeId = null;
+
+ for (int i = 0; i < measurementSchemas.size(); ++i) {
+ if (Objects.isNull(values.get(i))) {
+ continue;
+ }
+ final String name = measurementSchemas.get(i).getMeasurementName();
+ final TSDataType type = measurementSchemas.get(i).getType();
+ if (Objects.nonNull(sink.getQualityName()) &&
sink.getQualityName().equals(name)) {
+ if (!type.equals(TSDataType.BOOLEAN)) {
+ throw new UnsupportedOperationException(
+ "The quality value only supports boolean type, while true ==
GOOD and false == BAD.");
+ }
+ currentQuality = values.get(i) == Boolean.TRUE ? StatusCode.GOOD :
StatusCode.BAD;
+ continue;
+ }
+ if (Objects.nonNull(sink.getValueName()) &&
!sink.getValueName().equals(name)) {
+ throw new UnsupportedOperationException(
+ "When the 'with-quality' mode is enabled, the measurement must be
either \"value-name\" or \"quality-name\"");
+ }
+ nodeId = new NodeId(2, String.join("/", segments));
+
+ final long utcTimestamp =
timestampToUtc(timestamps.get(timestamps.size() > 1 ? i : 0));
+ value = values.get(i);
+ timestamp = utcTimestamp;
+ }
+ final DataValue dataValue =
+ new DataValue(new Variant(value), currentQuality, new
DateTime(timestamp), new DateTime());
+ StatusCode writeStatus = client.writeValue(nodeId, dataValue).get();
+
+ if (writeStatus.getValue() == StatusCodes.Bad_NodeIdUnknown) {
+ final AddNodesResponse addStatus =
+ client
+ .addNodes(
+ Arrays.asList(
+ new AddNodesItem(
+ Identifiers.ObjectsFolder.expanded(),
+ Identifiers.Organizes,
+ new NodeId(2, "root").expanded(),
+ new QualifiedName(2, "root"),
+ NodeClass.Object,
+ ExtensionObject.encode(
+ client.getStaticSerializationContext(),
createFolder0Attributes()),
+ Identifiers.FolderType.expanded()),
+ new AddNodesItem(
+ new NodeId(2, "root").expanded(),
+ Identifiers.Organizes,
+ new NodeId(2, "root/sg").expanded(),
+ new QualifiedName(2, "sg"),
+ NodeClass.Object,
+ ExtensionObject.encode(
+ client.getStaticSerializationContext(),
createFolder1Attributes()),
+ Identifiers.FolderType.expanded()),
+ new AddNodesItem(
+ new NodeId(2, "root/sg").expanded(),
+ Identifiers.Organizes,
+ new NodeId(2, "root/sg/d1").expanded(),
+ new QualifiedName(2, "d2"),
+ NodeClass.Object,
+ ExtensionObject.encode(
+ client.getStaticSerializationContext(),
createFolder2Attributes()),
+ Identifiers.FolderType.expanded()),
+ new AddNodesItem(
+ new NodeId(2, "root/sg/d1").expanded(),
+ Identifiers.Organizes,
+ new NodeId(2, "root/sg/d1/s2").expanded(),
+ new QualifiedName(2, "s2"),
+ NodeClass.Variable,
+ ExtensionObject.encode(
+ client.getStaticSerializationContext(),
+ createPressureSensorAttributes()),
+ Identifiers.BaseDataVariableType.expanded())))
+ .get();
+ for (final AddNodesResult result : addStatus.getResults()) {
+ if (!result.getStatusCode().equals(StatusCode.GOOD)
+ && !(result.getStatusCode().getValue() ==
StatusCodes.Bad_NodeIdExists)) {
+ throw new PipeException(
+ "Failed to create nodes after transfer data value, write status:
"
+ + writeStatus
+ + ", creation status: "
+ + addStatus);
+ }
+ }
+ writeStatus = client.writeValue(nodeId, dataValue).get();
+ if (writeStatus.getValue() != StatusCode.GOOD.getValue()) {
+ throw new PipeException(
+ "Failed to transfer dataValue after successfully created nodes,
error: " + writeStatus);
+ }
+ } else {
+ throw new PipeException("Failed to transfer dataValue, error: " +
writeStatus);
+ }
+ }
/////////////////////////////// Getter ///////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
index c60680a1f1e..ddd94f84c0a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
@@ -96,7 +96,7 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
}
public void transfer(final Tablet tablet, final boolean isTableModel, final
OpcUaSink sink)
- throws UaException {
+ throws Exception {
if (sink.isClientServerModel()) {
transferTabletForClientServerModel(
tablet, isTableModel, sink,
this::transferTabletRowForClientServerModel);
@@ -109,7 +109,8 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
final Tablet tablet,
final boolean isTableModel,
final OpcUaSink sink,
- final TabletRowConsumer consumer) {
+ final TabletRowConsumer consumer)
+ throws Exception {
final List<IMeasurementSchema> schemas = tablet.getSchemas();
final List<IMeasurementSchema> newSchemas = new ArrayList<>();
if (!isTableModel) {
@@ -177,7 +178,8 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
final List<IMeasurementSchema> measurementSchemas,
final List<Long> timestamps,
final List<Object> values,
- final OpcUaSink sink);
+ final OpcUaSink sink)
+ throws Exception;
}
private void transferTabletRowForClientServerModel(
@@ -357,7 +359,7 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
}
}
- private static long timestampToUtc(final long timeStamp) {
+ public static long timestampToUtc(final long timeStamp) {
return TimestampPrecisionUtils.currPrecision.toNanos(timeStamp) / 100L +
116444736000000000L;
}