This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch TableModelV1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d0067ec94f3e75c6194fa5a64330a94b8aa5b8c9
Author: Caideyipi <[email protected]>
AuthorDate: Fri Aug 9 11:07:38 2024 +0800

    Pipe: Implemented Client-Server model in OPC UA Sink & Enable anonymous 
access settings & Corrected date time format (#13100)
    
    (cherry picked from commit 78439da4127eb42bd398dd0aa87f30f629cd501a)
---
 iotdb-core/datanode/pom.xml                        |   4 +
 .../connector/protocol/opcua/OpcUaConnector.java   | 207 +++---------
 .../connector/protocol/opcua/OpcUaNameSpace.java   | 374 +++++++++++++++++++++
 .../protocol/opcua/OpcUaServerBuilder.java         |  32 +-
 .../config/constant/PipeConnectorConstant.java     |  13 +
 pom.xml                                            |  11 +
 6 files changed, 477 insertions(+), 164 deletions(-)

diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml
index 7ddf561b3fd..cc6aa284a6f 100644
--- a/iotdb-core/datanode/pom.xml
+++ b/iotdb-core/datanode/pom.xml
@@ -178,6 +178,10 @@
             <groupId>org.eclipse.milo</groupId>
             <artifactId>stack-core</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.eclipse.milo</groupId>
+            <artifactId>sdk-core</artifactId>
+        </dependency>
         <dependency>
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
index ba134b5dcc9..390b2fe73bb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.pipe.connector.protocol.opcua;
 
-import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.pipe.api.PipeConnector;
@@ -30,25 +29,15 @@ import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
-import org.apache.tsfile.common.constant.TsFileConstant;
-import org.apache.tsfile.enums.TSDataType;
-import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.write.record.Tablet;
 import org.eclipse.milo.opcua.sdk.server.OpcUaServer;
-import org.eclipse.milo.opcua.sdk.server.model.nodes.objects.BaseEventTypeNode;
-import org.eclipse.milo.opcua.stack.core.Identifiers;
 import org.eclipse.milo.opcua.stack.core.UaException;
-import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
-import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
-import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.time.LocalDate;
 import java.util.Arrays;
 import java.util.Map;
-import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -56,15 +45,23 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_HTTPS_BIND_PORT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_MODEL_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_MODEL_PUB_SUB_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_SECURITY_DIR_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PASSWORD_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_USER_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_OPC_UA_HTTPS_BIND_PORT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_OPC_UA_MODEL_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_OPC_UA_SECURITY_DIR_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_OPC_UA_TCP_BIND_PORT_KEY;
 
@@ -77,15 +74,25 @@ public class OpcUaConnector implements PipeConnector {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(OpcUaConnector.class);
 
-  private static final Map<String, Pair<AtomicInteger, OpcUaServer>>
-      SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP = new ConcurrentHashMap<>();
+  private static final Map<String, Pair<AtomicInteger, OpcUaNameSpace>>
+      SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP = new 
ConcurrentHashMap<>();
 
   private String serverKey;
-  private OpcUaServer server;
+  private OpcUaNameSpace nameSpace;
 
   @Override
   public void validate(final PipeParameterValidator validator) throws 
Exception {
-    // All the parameters are optional
+    validator
+        .validateAttributeValueRange(
+            CONNECTOR_OPC_UA_MODEL_KEY,
+            true,
+            CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE,
+            CONNECTOR_OPC_UA_MODEL_PUB_SUB_VALUE)
+        .validateAttributeValueRange(
+            SINK_OPC_UA_MODEL_KEY,
+            true,
+            CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE,
+            CONNECTOR_OPC_UA_MODEL_PUB_SUB_VALUE);
   }
 
   @Override
@@ -113,12 +120,18 @@ public class OpcUaConnector implements PipeConnector {
         parameters.getStringOrDefault(
             Arrays.asList(CONNECTOR_OPC_UA_SECURITY_DIR_KEY, 
SINK_OPC_UA_SECURITY_DIR_KEY),
             CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE);
-
-    synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP) {
+    final boolean enableAnonymousAccess =
+        parameters.getBooleanOrDefault(
+            Arrays.asList(
+                CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY,
+                SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY),
+            CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE);
+
+    synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP) {
       serverKey = httpsBindPort + ":" + tcpBindPort;
 
-      server =
-          SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP
+      nameSpace =
+          SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP
               .computeIfAbsent(
                   serverKey,
                   key -> {
@@ -130,15 +143,26 @@ public class OpcUaConnector implements PipeConnector {
                               .setUser(user)
                               .setPassword(password)
                               .setSecurityDir(securityDir)
+                              .setEnableAnonymousAccess(enableAnonymousAccess)
                               .build();
-                      newServer.startup();
-                      return new Pair<>(new AtomicInteger(0), newServer);
+                      nameSpace =
+                          new OpcUaNameSpace(
+                              newServer,
+                              parameters
+                                  .getStringOrDefault(
+                                      Arrays.asList(
+                                          CONNECTOR_OPC_UA_MODEL_KEY, 
SINK_OPC_UA_MODEL_KEY),
+                                      CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE)
+                                  
.equals(CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE));
+                      nameSpace.startup();
+                      newServer.startup().get();
+                      return new Pair<>(new AtomicInteger(0), nameSpace);
                     } catch (final Exception e) {
                       throw new PipeException("Failed to build and startup 
OpcUaServer", e);
                     }
                   })
               .getRight();
-      
SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP.get(serverKey).getLeft().incrementAndGet();
+      
SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP.get(serverKey).getLeft().incrementAndGet();
     }
   }
 
@@ -171,14 +195,13 @@ public class OpcUaConnector implements PipeConnector {
     }
 
     if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
-      transferTabletWrapper(server, (PipeInsertNodeTabletInsertionEvent) 
tabletInsertionEvent);
+      transferTabletWrapper((PipeInsertNodeTabletInsertionEvent) 
tabletInsertionEvent);
     } else {
-      transferTabletWrapper(server, (PipeRawTabletInsertionEvent) 
tabletInsertionEvent);
+      transferTabletWrapper((PipeRawTabletInsertionEvent) 
tabletInsertionEvent);
     }
   }
 
   private void transferTabletWrapper(
-      final OpcUaServer server,
       final PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent)
       throws UaException {
     try {
@@ -188,7 +211,7 @@ public class OpcUaConnector implements PipeConnector {
         return;
       }
       for (final Tablet tablet : 
pipeInsertNodeTabletInsertionEvent.convertToTablets()) {
-        transferTablet(server, tablet);
+        nameSpace.transfer(tablet);
       }
     } finally {
       pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount(
@@ -196,148 +219,28 @@ public class OpcUaConnector implements PipeConnector {
     }
   }
 
-  private void transferTabletWrapper(
-      final OpcUaServer server, final PipeRawTabletInsertionEvent 
pipeRawTabletInsertionEvent)
+  private void transferTabletWrapper(final PipeRawTabletInsertionEvent 
pipeRawTabletInsertionEvent)
       throws UaException {
     try {
       // We increase the reference count for this event to determine if the 
event may be released.
       if 
(!pipeRawTabletInsertionEvent.increaseReferenceCount(OpcUaConnector.class.getName()))
 {
         return;
       }
-      transferTablet(server, pipeRawTabletInsertionEvent.convertToTablet());
+      nameSpace.transfer(pipeRawTabletInsertionEvent.convertToTablet());
     } finally {
       
pipeRawTabletInsertionEvent.decreaseReferenceCount(OpcUaConnector.class.getName(),
 false);
     }
   }
 
-  /**
-   * Transfer {@link Tablet} into eventNodes and post it on the eventBus, so 
that they will be heard
-   * at the subscribers. Notice that an eventNode is reused to reduce object 
creation costs.
-   *
-   * @param server OpcUaServer
-   * @param tablet the tablet to send
-   * @throws UaException if failed to create {@link Event}
-   */
-  private void transferTablet(final OpcUaServer server, final Tablet tablet) 
throws UaException {
-    // There is no nameSpace, so that nameSpaceIndex is always 0
-    final int pseudoNameSpaceIndex = 0;
-    final BaseEventTypeNode eventNode =
-        server
-            .getEventFactory()
-            .createEvent(
-                new NodeId(pseudoNameSpaceIndex, UUID.randomUUID()), 
Identifiers.BaseEventType);
-    // Use eventNode here because other nodes doesn't support values and times 
simultaneously
-    for (int columnIndex = 0; columnIndex < tablet.getSchemas().size(); 
++columnIndex) {
-
-      final TSDataType dataType = 
tablet.getSchemas().get(columnIndex).getType();
-
-      // Source name --> Sensor path, like root.test.d_0.s_0
-      eventNode.setSourceName(
-          tablet.getDeviceId()
-              + TsFileConstant.PATH_SEPARATOR
-              + tablet.getSchemas().get(columnIndex).getMeasurementId());
-
-      // Source node --> Sensor type, like double
-      eventNode.setSourceNode(convertToOpcDataType(dataType));
-
-      for (int rowIndex = 0; rowIndex < tablet.rowSize; ++rowIndex) {
-        // Filter null value
-        if (tablet.bitMaps[columnIndex].isMarked(rowIndex)) {
-          continue;
-        }
-
-        // Time --> TimeStamp
-        eventNode.setTime(new DateTime(tablet.timestamps[rowIndex]));
-
-        // Message --> Value
-        switch (dataType) {
-          case BOOLEAN:
-            eventNode.setMessage(
-                LocalizedText.english(
-                    Boolean.toString(((boolean[]) 
tablet.values[columnIndex])[rowIndex])));
-            break;
-          case INT32:
-            eventNode.setMessage(
-                LocalizedText.english(
-                    Integer.toString(((int[]) 
tablet.values[columnIndex])[rowIndex])));
-            break;
-          case DATE:
-            eventNode.setMessage(
-                LocalizedText.english(
-                    (((LocalDate[]) 
tablet.values[columnIndex])[rowIndex]).toString()));
-            break;
-          case INT64:
-          case TIMESTAMP:
-            eventNode.setMessage(
-                LocalizedText.english(
-                    Long.toString(((long[]) 
tablet.values[columnIndex])[rowIndex])));
-            break;
-          case FLOAT:
-            eventNode.setMessage(
-                LocalizedText.english(
-                    Float.toString(((float[]) 
tablet.values[columnIndex])[rowIndex])));
-            break;
-          case DOUBLE:
-            eventNode.setMessage(
-                LocalizedText.english(
-                    Double.toString(((double[]) 
tablet.values[columnIndex])[rowIndex])));
-            break;
-          case TEXT:
-          case BLOB:
-          case STRING:
-            eventNode.setMessage(
-                LocalizedText.english(
-                    ((Binary[]) 
tablet.values[columnIndex])[rowIndex].toString()));
-            break;
-          case VECTOR:
-          case UNKNOWN:
-          default:
-            throw new PipeRuntimeNonCriticalException(
-                "Unsupported data type: " + 
tablet.getSchemas().get(columnIndex).getType());
-        }
-
-        // Send the event
-        server.getEventBus().post(eventNode);
-      }
-    }
-    eventNode.delete();
-  }
-
-  private NodeId convertToOpcDataType(final TSDataType type) {
-    switch (type) {
-      case BOOLEAN:
-        return Identifiers.Boolean;
-      case INT32:
-        return Identifiers.Int32;
-      case DATE:
-        return Identifiers.DateTime;
-      case INT64:
-      case TIMESTAMP:
-        return Identifiers.Int64;
-      case FLOAT:
-        return Identifiers.Float;
-      case DOUBLE:
-        return Identifiers.Double;
-      case TEXT:
-      case BLOB:
-      case STRING:
-        return Identifiers.String;
-      case VECTOR:
-      case UNKNOWN:
-      default:
-        throw new PipeRuntimeNonCriticalException("Unsupported data type: " + 
type);
-    }
-  }
-
   @Override
   public void close() throws Exception {
     if (serverKey == null) {
       return;
     }
 
-    synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP) {
-      final Pair<AtomicInteger, OpcUaServer> pair =
-          SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP.get(serverKey);
+    synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP) {
+      final Pair<AtomicInteger, OpcUaNameSpace> pair =
+          SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP.get(serverKey);
       if (pair == null) {
         return;
       }
@@ -346,7 +249,7 @@ public class OpcUaConnector implements PipeConnector {
         try {
           pair.getRight().shutdown();
         } finally {
-          SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP.remove(serverKey);
+          SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP.remove(serverKey);
         }
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java
new file mode 100644
index 00000000000..52c07d49c50
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.protocol.opcua;
+
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
+import org.apache.iotdb.db.pipe.connector.util.PipeTabletEventSorter;
+import org.apache.iotdb.db.utils.DateTimeUtils;
+import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
+import org.apache.iotdb.pipe.api.event.Event;
+
+import org.apache.tsfile.common.constant.TsFileConstant;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.eclipse.milo.opcua.sdk.core.AccessLevel;
+import org.eclipse.milo.opcua.sdk.core.Reference;
+import org.eclipse.milo.opcua.sdk.server.Lifecycle;
+import org.eclipse.milo.opcua.sdk.server.OpcUaServer;
+import org.eclipse.milo.opcua.sdk.server.api.DataItem;
+import org.eclipse.milo.opcua.sdk.server.api.ManagedNamespaceWithLifecycle;
+import org.eclipse.milo.opcua.sdk.server.api.MonitoredItem;
+import org.eclipse.milo.opcua.sdk.server.model.nodes.objects.BaseEventTypeNode;
+import org.eclipse.milo.opcua.sdk.server.nodes.UaFolderNode;
+import org.eclipse.milo.opcua.sdk.server.nodes.UaVariableNode;
+import org.eclipse.milo.opcua.sdk.server.util.SubscriptionModel;
+import org.eclipse.milo.opcua.stack.core.Identifiers;
+import org.eclipse.milo.opcua.stack.core.UaException;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
+import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
+import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
+
+import java.sql.Date;
+import java.time.LocalDate;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+
+public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle {
+  public static final String NAMESPACE_URI = "urn:apache:iotdb:opc-server";
+  private final boolean isClientServerModel;
+  private final SubscriptionModel subscriptionModel;
+
+  OpcUaNameSpace(final OpcUaServer server, final boolean isClientServerModel) {
+    super(server, NAMESPACE_URI);
+    this.isClientServerModel = isClientServerModel;
+
+    subscriptionModel = new SubscriptionModel(server, this);
+    getLifecycleManager().addLifecycle(subscriptionModel);
+    getLifecycleManager()
+        .addLifecycle(
+            new Lifecycle() {
+              @Override
+              public void startup() {
+                // Do nothing
+              }
+
+              @Override
+              public void shutdown() {
+                getServer().shutdown();
+              }
+            });
+  }
+
+  void transfer(final Tablet tablet) throws UaException {
+    if (isClientServerModel) {
+      transferTabletForClientServerModel(tablet);
+    } else {
+      transferTabletForPubSubModel(tablet);
+    }
+  }
+
+  private void transferTabletForClientServerModel(final Tablet tablet) {
+    new 
PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
+
+    final String[] segments = tablet.getDeviceId().split("\\.");
+    if (segments.length == 0) {
+      throw new PipeRuntimeCriticalException("The segments of tablets must 
exist");
+    }
+    final StringBuilder currentStr = new StringBuilder();
+    UaFolderNode folderNode = null;
+    NodeId folderNodeId;
+    for (final String segment : segments) {
+      final UaFolderNode nextFolderNode;
+
+      currentStr.append(segment);
+      folderNodeId = newNodeId(currentStr.toString());
+      currentStr.append("/");
+
+      if (!getNodeManager().containsNode(folderNodeId)) {
+        nextFolderNode =
+            new UaFolderNode(
+                getNodeContext(),
+                folderNodeId,
+                newQualifiedName(segment),
+                LocalizedText.english(segment));
+        getNodeManager().addNode(nextFolderNode);
+        if (Objects.nonNull(folderNode)) {
+          folderNode.addOrganizes(nextFolderNode);
+        } else {
+          nextFolderNode.addReference(
+              new Reference(
+                  folderNodeId,
+                  Identifiers.Organizes,
+                  Identifiers.ObjectsFolder.expanded(),
+                  false));
+        }
+        folderNode = nextFolderNode;
+      } else {
+        folderNode =
+            (UaFolderNode)
+                getNodeManager()
+                    .getNode(folderNodeId)
+                    .orElseThrow(
+                        () ->
+                            new PipeRuntimeCriticalException(
+                                String.format(
+                                    "The folder node for %s does not exist.",
+                                    tablet.getDeviceId())));
+      }
+    }
+
+    final String currentFolder = currentStr.toString();
+    for (int i = 0; i < tablet.getSchemas().size(); ++i) {
+      final IMeasurementSchema measurementSchema = tablet.getSchemas().get(i);
+      final String name = measurementSchema.getMeasurementId();
+      final TSDataType type = measurementSchema.getType();
+      final NodeId nodeId = newNodeId(currentFolder + name);
+      final UaVariableNode measurementNode;
+      if (!getNodeManager().containsNode(nodeId)) {
+        measurementNode =
+            new UaVariableNode.UaVariableNodeBuilder(getNodeContext())
+                .setNodeId(newNodeId(currentFolder + name))
+                .setAccessLevel(AccessLevel.READ_WRITE)
+                .setUserAccessLevel(AccessLevel.READ_ONLY)
+                .setBrowseName(newQualifiedName(name))
+                .setDisplayName(LocalizedText.english(name))
+                .setDataType(convertToOpcDataType(type))
+                .setTypeDefinition(Identifiers.BaseDataVariableType)
+                .build();
+        getNodeManager().addNode(measurementNode);
+        folderNode.addOrganizes(measurementNode);
+      } else {
+        // This must exist
+        measurementNode =
+            (UaVariableNode)
+                getNodeManager()
+                    .getNode(nodeId)
+                    .orElseThrow(
+                        () ->
+                            new PipeRuntimeCriticalException(
+                                String.format("The Node %s does not exist.", 
nodeId)));
+      }
+
+      int lastNonnullIndex = -1;
+      for (int j = 0; j < tablet.rowSize; ++j) {
+        if (!tablet.bitMaps[i].isMarked(j)) {
+          lastNonnullIndex = j;
+          break;
+        }
+      }
+
+      if (lastNonnullIndex != -1) {
+        final long utcTimestamp = 
timestampToUtc(tablet.timestamps[lastNonnullIndex]);
+        if (Objects.isNull(measurementNode.getValue())
+            || 
Objects.requireNonNull(measurementNode.getValue().getSourceTime()).getUtcTime()
+                < utcTimestamp) {
+          measurementNode.setValue(
+              new DataValue(
+                  new Variant(getTabletObjectValue4Opc(tablet.values[i], 
lastNonnullIndex, type)),
+                  StatusCode.GOOD,
+                  new DateTime(utcTimestamp),
+                  new DateTime()));
+        }
+      }
+    }
+  }
+
+  private static Object getTabletObjectValue4Opc(
+      final Object column, final int rowIndex, final TSDataType type) {
+    switch (type) {
+      case BOOLEAN:
+        return ((boolean[]) column)[rowIndex];
+      case INT32:
+        return ((int[]) column)[rowIndex];
+      case DATE:
+        return new DateTime(Date.valueOf(((LocalDate[]) column)[rowIndex]));
+      case INT64:
+        return ((long[]) column)[rowIndex];
+      case TIMESTAMP:
+        return new DateTime(timestampToUtc(((long[]) column)[rowIndex]));
+      case FLOAT:
+        return ((float[]) column)[rowIndex];
+      case DOUBLE:
+        return ((double[]) column)[rowIndex];
+      case TEXT:
+      case BLOB:
+      case STRING:
+        return ((Binary[]) column)[rowIndex].toString();
+      default:
+        throw new UnSupportedDataTypeException("UnSupported dataType " + type);
+    }
+  }
+
+  private static long timestampToUtc(final long timeStamp) {
+    return TimestampPrecisionUtils.currPrecision.toNanos(timeStamp) / 100L + 
116444736000000000L;
+  }
+
+  /**
+   * Transfer {@link Tablet} into eventNodes and post it on the eventBus, so 
that they will be heard
+   * at the subscribers. Notice that an eventNode is reused to reduce object 
creation costs.
+   *
+   * @param tablet the tablet to send
+   * @throws UaException if failed to create {@link Event}
+   */
+  private void transferTabletForPubSubModel(final Tablet tablet) throws 
UaException {
+    final BaseEventTypeNode eventNode =
+        getServer()
+            .getEventFactory()
+            .createEvent(
+                new NodeId(getNamespaceIndex(), UUID.randomUUID()), 
Identifiers.BaseEventType);
+    // Use eventNode here because other nodes doesn't support values and times 
simultaneously
+    for (int columnIndex = 0; columnIndex < tablet.getSchemas().size(); 
++columnIndex) {
+
+      final TSDataType dataType = 
tablet.getSchemas().get(columnIndex).getType();
+
+      // Source name --> Sensor path, like root.test.d_0.s_0
+      eventNode.setSourceName(
+          tablet.getDeviceId()
+              + TsFileConstant.PATH_SEPARATOR
+              + tablet.getSchemas().get(columnIndex).getMeasurementId());
+
+      // Source node --> Sensor type, like double
+      eventNode.setSourceNode(convertToOpcDataType(dataType));
+
+      for (int rowIndex = 0; rowIndex < tablet.rowSize; ++rowIndex) {
+        // Filter null value
+        if (tablet.bitMaps[columnIndex].isMarked(rowIndex)) {
+          continue;
+        }
+
+        // Time --> TimeStamp
+        eventNode.setTime(new 
DateTime(timestampToUtc(tablet.timestamps[rowIndex])));
+
+        // Message --> Value
+        switch (dataType) {
+          case BOOLEAN:
+            eventNode.setMessage(
+                LocalizedText.english(
+                    Boolean.toString(((boolean[]) 
tablet.values[columnIndex])[rowIndex])));
+            break;
+          case INT32:
+            eventNode.setMessage(
+                LocalizedText.english(
+                    Integer.toString(((int[]) 
tablet.values[columnIndex])[rowIndex])));
+            break;
+          case DATE:
+            eventNode.setMessage(
+                LocalizedText.english(
+                    (((LocalDate[]) tablet.values[columnIndex])[rowIndex])
+                        .atStartOfDay(ZoneId.systemDefault())
+                        .toString()));
+            break;
+          case INT64:
+            eventNode.setMessage(
+                LocalizedText.english(
+                    Long.toString(((long[]) 
tablet.values[columnIndex])[rowIndex])));
+            break;
+          case TIMESTAMP:
+            eventNode.setMessage(
+                LocalizedText.english(
+                    DateTimeUtils.convertLongToDate(
+                        ((long[]) tablet.values[columnIndex])[rowIndex])));
+            break;
+          case FLOAT:
+            eventNode.setMessage(
+                LocalizedText.english(
+                    Float.toString(((float[]) 
tablet.values[columnIndex])[rowIndex])));
+            break;
+          case DOUBLE:
+            eventNode.setMessage(
+                LocalizedText.english(
+                    Double.toString(((double[]) 
tablet.values[columnIndex])[rowIndex])));
+            break;
+          case TEXT:
+          case BLOB:
+          case STRING:
+            eventNode.setMessage(
+                LocalizedText.english(
+                    ((Binary[]) 
tablet.values[columnIndex])[rowIndex].toString()));
+            break;
+          case VECTOR:
+          case UNKNOWN:
+          default:
+            throw new PipeRuntimeNonCriticalException(
+                "Unsupported data type: " + 
tablet.getSchemas().get(columnIndex).getType());
+        }
+
+        // Send the event
+        getServer().getEventBus().post(eventNode);
+      }
+    }
+    eventNode.delete();
+  }
+
+  private NodeId convertToOpcDataType(final TSDataType type) {
+    switch (type) {
+      case BOOLEAN:
+        return Identifiers.Boolean;
+      case INT32:
+        return Identifiers.Int32;
+      case DATE:
+      case TIMESTAMP:
+        return Identifiers.DateTime;
+      case INT64:
+        return Identifiers.Int64;
+      case FLOAT:
+        return Identifiers.Float;
+      case DOUBLE:
+        return Identifiers.Double;
+      case TEXT:
+      case BLOB:
+      case STRING:
+        return Identifiers.String;
+      case VECTOR:
+      case UNKNOWN:
+      default:
+        throw new PipeRuntimeNonCriticalException("Unsupported data type: " + 
type);
+    }
+  }
+
+  @Override
+  public void onDataItemsCreated(final List<DataItem> dataItems) {
+    subscriptionModel.onDataItemsCreated(dataItems);
+  }
+
+  @Override
+  public void onDataItemsModified(final List<DataItem> dataItems) {
+    subscriptionModel.onDataItemsModified(dataItems);
+  }
+
+  @Override
+  public void onDataItemsDeleted(final List<DataItem> dataItems) {
+    subscriptionModel.onDataItemsDeleted(dataItems);
+  }
+
+  @Override
+  public void onMonitoringModeChanged(final List<MonitoredItem> 
monitoredItems) {
+    subscriptionModel.onMonitoringModeChanged(monitoredItems);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerBuilder.java
index 85982318404..ee7d0481ed8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaServerBuilder.java
@@ -79,6 +79,7 @@ public class OpcUaServerBuilder {
   private String user;
   private String password;
   private Path securityDir;
+  private boolean enableAnonymousAccess;
 
   public OpcUaServerBuilder() {
     tcpBindPort = 
PipeConnectorConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE;
@@ -86,33 +87,40 @@ public class OpcUaServerBuilder {
     user = PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
     password = PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
     securityDir = 
Paths.get(PipeConnectorConstant.CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE);
+    enableAnonymousAccess =
+        
PipeConnectorConstant.CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE;
   }
 
-  public OpcUaServerBuilder setTcpBindPort(int tcpBindPort) {
+  public OpcUaServerBuilder setTcpBindPort(final int tcpBindPort) {
     this.tcpBindPort = tcpBindPort;
     return this;
   }
 
-  public OpcUaServerBuilder setHttpsBindPort(int httpsBindPort) {
+  public OpcUaServerBuilder setHttpsBindPort(final int httpsBindPort) {
     this.httpsBindPort = httpsBindPort;
     return this;
   }
 
-  public OpcUaServerBuilder setUser(String user) {
+  public OpcUaServerBuilder setUser(final String user) {
     this.user = user;
     return this;
   }
 
-  public OpcUaServerBuilder setPassword(String password) {
+  public OpcUaServerBuilder setPassword(final String password) {
     this.password = password;
     return this;
   }
 
-  public OpcUaServerBuilder setSecurityDir(String securityDir) {
+  public OpcUaServerBuilder setSecurityDir(final String securityDir) {
     this.securityDir = Paths.get(securityDir);
     return this;
   }
 
+  public OpcUaServerBuilder setEnableAnonymousAccess(final boolean 
enableAnonymousAccess) {
+    this.enableAnonymousAccess = enableAnonymousAccess;
+    return this;
+  }
+
   public OpcUaServer build() throws Exception {
     Files.createDirectories(securityDir);
     if (!Files.exists(securityDir)) {
@@ -133,7 +141,7 @@ public class OpcUaServerBuilder {
         new DefaultCertificateManager(loader.getServerKeyPair(), 
loader.getServerCertificate());
 
     final OpcUaServerConfig serverConfig;
-    try (DefaultTrustListManager trustListManager = new 
DefaultTrustListManager(pkiDir)) {
+    try (final DefaultTrustListManager trustListManager = new 
DefaultTrustListManager(pkiDir)) {
       LOGGER.info(
           "Certificate directory is: {}, Please move certificates from the 
reject dir to the trusted directory to allow encrypted access",
           pkiDir.getAbsolutePath());
@@ -151,7 +159,7 @@ public class OpcUaServerBuilder {
 
       final UsernameIdentityValidator identityValidator =
           new UsernameIdentityValidator(
-              true,
+              enableAnonymousAccess,
               authChallenge -> {
                 String inputUsername = authChallenge.getUsername();
                 String inputPassword = authChallenge.getPassword();
@@ -215,7 +223,7 @@ public class OpcUaServerBuilder {
   }
 
   private Set<EndpointConfiguration> createEndpointConfigurations(
-      X509Certificate certificate, int tcpBindPort, int httpsBindPort) {
+      final X509Certificate certificate, final int tcpBindPort, final int 
httpsBindPort) {
     final Set<EndpointConfiguration> endpointConfigurations = new 
LinkedHashSet<>();
 
     final List<String> bindAddresses = newArrayList();
@@ -225,8 +233,8 @@ public class OpcUaServerBuilder {
     hostnames.add(HostnameUtil.getHostname());
     hostnames.addAll(HostnameUtil.getHostnames(WILD_CARD_ADDRESS));
 
-    for (String bindAddress : bindAddresses) {
-      for (String hostname : hostnames) {
+    for (final String bindAddress : bindAddresses) {
+      for (final String hostname : hostnames) {
         final EndpointConfiguration.Builder builder =
             EndpointConfiguration.newBuilder()
                 .setBindAddress(bindAddress)
@@ -279,7 +287,7 @@ public class OpcUaServerBuilder {
   }
 
   private EndpointConfiguration buildTcpEndpoint(
-      EndpointConfiguration.Builder base, int tcpBindPort) {
+      final EndpointConfiguration.Builder base, final int tcpBindPort) {
     return base.copy()
         .setTransportProfile(TransportProfile.TCP_UASC_UABINARY)
         .setBindPort(tcpBindPort)
@@ -287,7 +295,7 @@ public class OpcUaServerBuilder {
   }
 
   private EndpointConfiguration buildHttpsEndpoint(
-      EndpointConfiguration.Builder base, int httpsBindPort) {
+      final EndpointConfiguration.Builder base, final int httpsBindPort) {
     return base.copy()
         .setTransportProfile(TransportProfile.HTTPS_UABINARY)
         .setBindPort(httpsBindPort)
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index 61e773da3e4..62af58eece4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -130,6 +130,13 @@ public class PipeConnectorConstant {
   public static final String SINK_WEBSOCKET_PORT_KEY = "sink.websocket.port";
   public static final int CONNECTOR_WEBSOCKET_PORT_DEFAULT_VALUE = 8080;
 
+  public static final String CONNECTOR_OPC_UA_MODEL_KEY = 
"connector.opcua.model";
+  public static final String SINK_OPC_UA_MODEL_KEY = "sink.opcua.model";
+  public static final String CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE = 
"client-server";
+  public static final String CONNECTOR_OPC_UA_MODEL_PUB_SUB_VALUE = "pub-sub";
+  public static final String CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE =
+      CONNECTOR_OPC_UA_MODEL_PUB_SUB_VALUE;
+
   public static final String CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY = 
"connector.opcua.tcp.port";
   public static final String SINK_OPC_UA_TCP_BIND_PORT_KEY = 
"sink.opcua.tcp.port";
   public static final int CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE = 12686;
@@ -145,6 +152,12 @@ public class PipeConnectorConstant {
           ? CommonDescriptor.getInstance().getConfDir() + File.separatorChar + 
"opc_security"
           : System.getProperty("user.home") + File.separatorChar + 
"iotdb_opc_security";
 
+  public static final String CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY =
+      "connector.opcua.enable-anonymous-access";
+  public static final String SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY =
+      "sink.opcua.enable-anonymous-access";
+  public static final boolean 
CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE = true;
+
   public static final String CONNECTOR_LEADER_CACHE_ENABLE_KEY = 
"connector.leader-cache.enable";
   public static final String SINK_LEADER_CACHE_ENABLE_KEY = 
"sink.leader-cache.enable";
   public static final boolean CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE = 
true;
diff --git a/pom.xml b/pom.xml
index aee4489a0f6..4e2194eaf3f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -370,6 +370,17 @@
                     </exclusion>
                 </exclusions>
             </dependency>
+            <dependency>
+                <groupId>org.eclipse.milo</groupId>
+                <artifactId>sdk-core</artifactId>
+                <version>${milo.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>com.sun.activation</groupId>
+                        <artifactId>jakarta.activation</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
             <dependency>
                 <groupId>org.eclipse.milo</groupId>
                 <artifactId>stack-server</artifactId>


Reply via email to