This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch client-opc
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/client-opc by this push:
     new b8719251b4b framework
b8719251b4b is described below

commit b8719251b4b10f1734bb64e94889c23b27804894
Author: Caideyipi <[email protected]>
AuthorDate: Mon Dec 22 17:45:21 2025 +0800

    framework
---
 .../protocol/opcua/client/IoTDBOpcUaClient.java    | 127 ++++++++++++++++++++-
 .../sink/protocol/opcua/server/OpcUaNameSpace.java |  10 +-
 2 files changed, 127 insertions(+), 10 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
index d7f02d1ddd0..6d9c6a71a7f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
@@ -20,14 +20,18 @@
 package org.apache.iotdb.db.pipe.sink.protocol.opcua.client;
 
 import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 
+import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
 import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
 import org.eclipse.milo.opcua.sdk.core.AccessLevel;
 import org.eclipse.milo.opcua.sdk.core.ValueRanks;
 import org.eclipse.milo.opcua.stack.core.Identifiers;
-import org.eclipse.milo.opcua.stack.core.UaException;
+import org.eclipse.milo.opcua.stack.core.StatusCodes;
 import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
 import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
 import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
@@ -42,6 +46,7 @@ import 
org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass;
 import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
 import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesItem;
 import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesResponse;
+import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesResult;
 import org.eclipse.milo.opcua.stack.core.types.structured.DeleteNodesItem;
 import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
 import org.eclipse.milo.opcua.stack.core.types.structured.ObjectAttributes;
@@ -49,17 +54,19 @@ import 
org.eclipse.milo.opcua.stack.core.types.structured.VariableAttributes;
 
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.List;
+import java.util.Objects;
 import java.util.function.Predicate;
 
+import static 
org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.timestampToUtc;
+
 public class IoTDBOpcUaClient {
 
   private final String nodeUrl;
 
   private final SecurityPolicy securityPolicy;
   private final IdentityProvider identityProvider;
-
-  private final AtomicLong clientHandles = new AtomicLong(1L);
+  private OpcUaClient client;
 
   public IoTDBOpcUaClient(
       final String nodeUrl,
@@ -70,13 +77,121 @@ public class IoTDBOpcUaClient {
     this.identityProvider = identityProvider;
   }
 
-  public void run(OpcUaClient client) throws Exception {
+  public void run(final OpcUaClient client) throws Exception {
     // synchronous connect
+    this.client = client;
     client.connect().get();
   }
 
   // Only support tree model & client-server
-  public void transfer(final Tablet tablet, final OpcUaSink sink) throws 
UaException {}
+  public void transfer(final Tablet tablet, final OpcUaSink sink) throws 
Exception {
+    OpcUaNameSpace.transferTabletForClientServerModel(
+        tablet, false, sink, this::transferTabletRowForClientServerModel);
+  }
+
+  private void transferTabletRowForClientServerModel(
+      final String[] segments,
+      final List<IMeasurementSchema> measurementSchemas,
+      final List<Long> timestamps,
+      final List<Object> values,
+      final OpcUaSink sink)
+      throws Exception {
+    StatusCode currentQuality =
+        Objects.isNull(sink.getValueName()) ? StatusCode.GOOD : 
StatusCode.UNCERTAIN;
+    Object value = null;
+    long timestamp = 0;
+    NodeId nodeId = null;
+
+    for (int i = 0; i < measurementSchemas.size(); ++i) {
+      if (Objects.isNull(values.get(i))) {
+        continue;
+      }
+      final String name = measurementSchemas.get(i).getMeasurementName();
+      final TSDataType type = measurementSchemas.get(i).getType();
+      if (Objects.nonNull(sink.getQualityName()) && 
sink.getQualityName().equals(name)) {
+        if (!type.equals(TSDataType.BOOLEAN)) {
+          throw new UnsupportedOperationException(
+              "The quality value only supports boolean type, while true == 
GOOD and false == BAD.");
+        }
+        currentQuality = values.get(i) == Boolean.TRUE ? StatusCode.GOOD : 
StatusCode.BAD;
+        continue;
+      }
+      if (Objects.nonNull(sink.getValueName()) && 
!sink.getValueName().equals(name)) {
+        throw new UnsupportedOperationException(
+            "When the 'with-quality' mode is enabled, the measurement must be 
either \"value-name\" or \"quality-name\"");
+      }
+      nodeId = new NodeId(2, String.join("/", segments));
+
+      final long utcTimestamp = 
timestampToUtc(timestamps.get(timestamps.size() > 1 ? i : 0));
+      value = values.get(i);
+      timestamp = utcTimestamp;
+    }
+    final DataValue dataValue =
+        new DataValue(new Variant(value), currentQuality, new 
DateTime(timestamp), new DateTime());
+    StatusCode writeStatus = client.writeValue(nodeId, dataValue).get();
+
+    if (writeStatus.getValue() == StatusCodes.Bad_NodeIdUnknown) {
+      final AddNodesResponse addStatus =
+          client
+              .addNodes(
+                  Arrays.asList(
+                      new AddNodesItem(
+                          Identifiers.ObjectsFolder.expanded(),
+                          Identifiers.Organizes,
+                          new NodeId(2, "root").expanded(),
+                          new QualifiedName(2, "root"),
+                          NodeClass.Object,
+                          ExtensionObject.encode(
+                              client.getStaticSerializationContext(), 
createFolder0Attributes()),
+                          Identifiers.FolderType.expanded()),
+                      new AddNodesItem(
+                          new NodeId(2, "root").expanded(),
+                          Identifiers.Organizes,
+                          new NodeId(2, "root/sg").expanded(),
+                          new QualifiedName(2, "sg"),
+                          NodeClass.Object,
+                          ExtensionObject.encode(
+                              client.getStaticSerializationContext(), 
createFolder1Attributes()),
+                          Identifiers.FolderType.expanded()),
+                      new AddNodesItem(
+                          new NodeId(2, "root/sg").expanded(),
+                          Identifiers.Organizes,
+                          new NodeId(2, "root/sg/d1").expanded(),
+                          new QualifiedName(2, "d2"),
+                          NodeClass.Object,
+                          ExtensionObject.encode(
+                              client.getStaticSerializationContext(), 
createFolder2Attributes()),
+                          Identifiers.FolderType.expanded()),
+                      new AddNodesItem(
+                          new NodeId(2, "root/sg/d1").expanded(),
+                          Identifiers.Organizes,
+                          new NodeId(2, "root/sg/d1/s2").expanded(),
+                          new QualifiedName(2, "s2"),
+                          NodeClass.Variable,
+                          ExtensionObject.encode(
+                              client.getStaticSerializationContext(),
+                              createPressureSensorAttributes()),
+                          Identifiers.BaseDataVariableType.expanded())))
+              .get();
+      for (final AddNodesResult result : addStatus.getResults()) {
+        if (!result.getStatusCode().equals(StatusCode.GOOD)
+            && !(result.getStatusCode().getValue() == 
StatusCodes.Bad_NodeIdExists)) {
+          throw new PipeException(
+              "Failed to create nodes after transfer data value, write status: 
"
+                  + writeStatus
+                  + ", creation status: "
+                  + addStatus);
+        }
+      }
+      writeStatus = client.writeValue(nodeId, dataValue).get();
+      if (writeStatus.getValue() != StatusCode.GOOD.getValue()) {
+        throw new PipeException(
+            "Failed to transfer dataValue after successfully created nodes, 
error: " + writeStatus);
+      }
+    } else {
+      throw new PipeException("Failed to transfer dataValue, error: " + 
writeStatus);
+    }
+  }
 
   /////////////////////////////// Getter ///////////////////////////////
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
index c60680a1f1e..ddd94f84c0a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
@@ -96,7 +96,7 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
   }
 
   public void transfer(final Tablet tablet, final boolean isTableModel, final 
OpcUaSink sink)
-      throws UaException {
+      throws Exception {
     if (sink.isClientServerModel()) {
       transferTabletForClientServerModel(
           tablet, isTableModel, sink, 
this::transferTabletRowForClientServerModel);
@@ -109,7 +109,8 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
       final Tablet tablet,
       final boolean isTableModel,
       final OpcUaSink sink,
-      final TabletRowConsumer consumer) {
+      final TabletRowConsumer consumer)
+      throws Exception {
     final List<IMeasurementSchema> schemas = tablet.getSchemas();
     final List<IMeasurementSchema> newSchemas = new ArrayList<>();
     if (!isTableModel) {
@@ -177,7 +178,8 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
         final List<IMeasurementSchema> measurementSchemas,
         final List<Long> timestamps,
         final List<Object> values,
-        final OpcUaSink sink);
+        final OpcUaSink sink)
+        throws Exception;
   }
 
   private void transferTabletRowForClientServerModel(
@@ -357,7 +359,7 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
     }
   }
 
-  private static long timestampToUtc(final long timeStamp) {
+  public static long timestampToUtc(final long timeStamp) {
     return TimestampPrecisionUtils.currPrecision.toNanos(timeStamp) / 100L + 
116444736000000000L;
   }
 

Reply via email to