This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch client-opc
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/client-opc by this push:
     new efa0fe4077c bone
efa0fe4077c is described below

commit efa0fe4077c4c3021eb1727dff95037f17acc385
Author: Caideyipi <[email protected]>
AuthorDate: Mon Dec 22 16:20:30 2025 +0800

    bone
---
 iotdb-core/datanode/pom.xml                        |   5 +
 .../db/pipe/sink/protocol/opcua/OpcUaSink.java     |  77 +++++++++-
 .../sink/protocol/opcua/client/ClientRunner.java   | 147 +++++++++++++++++++
 .../opcua/client/IoTDBKeyStoreLoaderClient.java    | 124 ++++++++++++++++
 .../protocol/opcua/client/IoTDBOpcUaClient.java    | 158 +++++++++++++++++++++
 .../pipe/config/constant/PipeSinkConstant.java     |  17 +++
 6 files changed, 526 insertions(+), 2 deletions(-)

diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml
index 284eb679672..d7d2628f36d 100644
--- a/iotdb-core/datanode/pom.xml
+++ b/iotdb-core/datanode/pom.xml
@@ -191,6 +191,11 @@
             <groupId>org.eclipse.milo</groupId>
             <artifactId>stack-server</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.eclipse.milo</groupId>
+            <artifactId>sdk-client</artifactId>
+            <version>${milo.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.eclipse.jetty</groupId>
             <artifactId>jetty-http</artifactId>
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
index 9a505ce90f0..2c1ebf9a834 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.IoTDBOpcUaClient;
 import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace;
 import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaServerBuilder;
 import org.apache.iotdb.db.storageengine.StorageEngine;
@@ -40,7 +41,11 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.write.record.Tablet;
+import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
+import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
+import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
 import org.eclipse.milo.opcua.sdk.server.OpcUaServer;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,12 +71,20 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CON
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_PUB_SUB_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_NODE_URL_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_PLACEHOLDER_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_PLACEHOLDER_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_NAME_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_NAME_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_AES128_SHA256_RSAOAEP_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_AES256_SHA256_RSAPSS_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_128_RSA_15_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_256_SHA_256_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_256_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_NONE_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_VALUE_NAME_DEFAULT_VALUE;
@@ -84,9 +97,11 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SIN
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_HTTPS_BIND_PORT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_MODEL_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_NODE_URL_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_PLACEHOLDER_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_QUALITY_NAME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_SECURITY_DIR_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_SECURITY_POLICY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_TCP_BIND_PORT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_VALUE_NAME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_WITH_QUALITY_KEY;
@@ -114,7 +129,12 @@ public class OpcUaSink implements PipeConnector {
   String placeHolder;
   @Nullable String valueName;
   @Nullable String qualityName;
-  private OpcUaNameSpace nameSpace;
+
+  // Inner server
+  private @Nullable OpcUaNameSpace nameSpace;
+
+  // Outer server
+  private @Nullable IoTDBOpcUaClient client;
 
   @Override
   public void validate(final PipeParameterValidator validator) throws 
Exception {
@@ -139,6 +159,17 @@ public class OpcUaSink implements PipeConnector {
   public void customize(
       final PipeParameters parameters, final PipeConnectorRuntimeConfiguration 
configuration)
       throws Exception {
+    final String nodeUrl =
+        parameters.getStringByKeys(CONNECTOR_OPC_UA_NODE_URL_KEY, 
SINK_OPC_UA_NODE_URL_KEY);
+    if (Objects.isNull(nodeUrl)) {
+      customizeServer(parameters, configuration);
+    } else {
+      customizeClient(nodeUrl, parameters);
+    }
+  }
+
+  private void customizeServer(
+      final PipeParameters parameters, final PipeConnectorRuntimeConfiguration 
configuration) {
     final int tcpBindPort =
         parameters.getIntOrDefault(
             Arrays.asList(CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY, 
SINK_OPC_UA_TCP_BIND_PORT_KEY),
@@ -250,6 +281,48 @@ public class OpcUaSink implements PipeConnector {
     }
   }
 
+  private void customizeClient(final String nodeUrl, final PipeParameters 
parameters) {
+    final SecurityPolicy policy;
+    switch (parameters
+        .getStringOrDefault(
+            Arrays.asList(CONNECTOR_OPC_UA_SECURITY_POLICY_KEY, 
SINK_OPC_UA_SECURITY_POLICY_KEY),
+            CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_256_SHA_256_VALUE)
+        .toUpperCase()) {
+      case CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_NONE_VALUE:
+        policy = SecurityPolicy.None;
+        break;
+      case CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_128_RSA_15_VALUE:
+        policy = SecurityPolicy.Basic128Rsa15;
+        break;
+      case CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_256_VALUE:
+        policy = SecurityPolicy.Basic256;
+        break;
+      case CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_256_SHA_256_VALUE:
+        policy = SecurityPolicy.Basic256Sha256;
+        break;
+      case 
CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_AES128_SHA256_RSAOAEP_VALUE:
+        policy = SecurityPolicy.Aes128_Sha256_RsaOaep;
+        break;
+      case CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_AES256_SHA256_RSAPSS_VALUE:
+        policy = SecurityPolicy.Aes256_Sha256_RsaPss;
+        break;
+      default:
+        policy = null;
+        break;
+    }
+
+    final IdentityProvider provider;
+    final String userName =
+        parameters.getStringByKeys(CONNECTOR_IOTDB_USER_KEY, 
SINK_IOTDB_USER_KEY);
+    provider =
+        Objects.nonNull(userName)
+            ? new UsernameProvider(
+                userName,
+                parameters.getStringByKeys(CONNECTOR_IOTDB_PASSWORD_KEY, 
SINK_IOTDB_PASSWORD_KEY))
+            : new AnonymousProvider();
+    client = new IoTDBOpcUaClient(nodeUrl, policy, provider);
+  }
+
   @Override
   public void handshake() throws Exception {
     // Server side, do nothing
@@ -359,7 +432,7 @@ public class OpcUaSink implements PipeConnector {
     }
   }
 
-  // Getter
+  /////////////////////////////// Getter ///////////////////////////////
 
   public boolean isClientServerModel() {
     return isClientServerModel;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
new file mode 100644
index 00000000000..b0bc8709098
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.protocol.opcua.client;
+
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import 
org.eclipse.milo.opcua.stack.client.security.DefaultClientCertificateValidator;
+import org.eclipse.milo.opcua.stack.core.Stack;
+import org.eclipse.milo.opcua.stack.core.security.DefaultTrustListManager;
+import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.Security;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
+
+public class ClientRunner {
+
+  static {
+    // Required for SecurityPolicy.Aes256_Sha256_RsaPss
+    Security.addProvider(new BouncyCastleProvider());
+  }
+
+  private final CompletableFuture<OpcUaClient> future = new 
CompletableFuture<>();
+
+  private final IoTDBOpcUaClient configurableUaClient;
+
+  public ClientRunner(IoTDBOpcUaClient configurableUaClient) {
+    this.configurableUaClient = configurableUaClient;
+  }
+
+  private OpcUaClient createClient() throws Exception {
+    final Path securityTempDir =
+        Paths.get(System.getProperty("java.io.tmpdir"), "client", "security");
+    Files.createDirectories(securityTempDir);
+    if (!Files.exists(securityTempDir)) {
+      throw new Exception("unable to create security dir: " + securityTempDir);
+    }
+
+    final File pkiDir = securityTempDir.resolve("pki").toFile();
+
+    System.out.println("security dir: " + securityTempDir.toAbsolutePath());
+    LoggerFactory.getLogger(getClass()).info("security pki dir: {}", 
pkiDir.getAbsolutePath());
+
+    final IoTDBKeyStoreLoaderClient loader = new 
IoTDBKeyStoreLoaderClient().load(securityTempDir);
+
+    final DefaultTrustListManager trustListManager = new 
DefaultTrustListManager(pkiDir);
+
+    final DefaultClientCertificateValidator certificateValidator =
+        new DefaultClientCertificateValidator(trustListManager);
+
+    return OpcUaClient.create(
+        configurableUaClient.getNodeUrl(),
+        endpoints -> 
endpoints.stream().filter(configurableUaClient.endpointFilter()).findFirst(),
+        configBuilder ->
+            configBuilder
+                .setApplicationName(LocalizedText.english("eclipse milo opc-ua 
client"))
+                .setApplicationUri("urn:eclipse:milo:examples:client")
+                .setKeyPair(loader.getClientKeyPair())
+                .setCertificate(loader.getClientCertificate())
+                .setCertificateChain(loader.getClientCertificateChain())
+                .setCertificateValidator(certificateValidator)
+                
.setIdentityProvider(configurableUaClient.getIdentityProvider())
+                .setRequestTimeout(uint(5000))
+                .build());
+  }
+
+  public void run() {
+    try {
+      final OpcUaClient client = createClient();
+
+      future.whenCompleteAsync(
+          (c, ex) -> {
+            if (ex != null) {
+              System.out.println("Error running example: " + ex.getMessage());
+            }
+
+            try {
+              client.disconnect().get();
+              Stack.releaseSharedResources();
+            } catch (InterruptedException | ExecutionException e) {
+              Thread.currentThread().interrupt();
+              System.out.println("Error disconnecting: {}" + e.getMessage());
+            }
+
+            try {
+              Thread.sleep(1000);
+              System.exit(0);
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+              e.printStackTrace();
+            }
+          });
+
+      try {
+        configurableUaClient.run(client);
+        future.get(100000, TimeUnit.SECONDS);
+      } catch (Throwable t) {
+        System.out.println("Error running client example: " + t.getMessage() + 
t);
+        future.completeExceptionally(t);
+      }
+    } catch (Throwable t) {
+      System.out.println("Error getting client: {}" + t.getMessage());
+
+      future.completeExceptionally(t);
+
+      try {
+        Thread.sleep(1000);
+        System.exit(0);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        e.printStackTrace();
+      }
+    }
+
+    try {
+      Thread.sleep(999_999_999);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      e.printStackTrace();
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBKeyStoreLoaderClient.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBKeyStoreLoaderClient.java
new file mode 100644
index 00000000000..fc524a60c71
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBKeyStoreLoaderClient.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.protocol.opcua.client;
+
+import org.eclipse.milo.opcua.sdk.server.util.HostnameUtil;
+import org.eclipse.milo.opcua.stack.core.util.SelfSignedCertificateBuilder;
+import org.eclipse.milo.opcua.stack.core.util.SelfSignedCertificateGenerator;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.Key;
+import java.security.KeyPair;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.PublicKey;
+import java.security.cert.X509Certificate;
+import java.util.Arrays;
+import java.util.regex.Pattern;
+
+class IoTDBKeyStoreLoaderClient {
+
+  private static final Pattern IP_ADDR_PATTERN =
+      
Pattern.compile("^(([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.){3}([01]?\\d\\d?|2[0-4]\\d|25[0-5])$");
+
+  private static final String CLIENT_ALIAS = "client-ai";
+  private static final char[] PASSWORD = "root".toCharArray();
+
+  private X509Certificate[] clientCertificateChain;
+  private X509Certificate clientCertificate;
+  private KeyPair clientKeyPair;
+
+  IoTDBKeyStoreLoaderClient load(Path baseDir) throws Exception {
+    final KeyStore keyStore = KeyStore.getInstance("PKCS12");
+
+    final Path serverKeyStore = baseDir.resolve("example-client.pfx");
+
+    System.out.println("Loading KeyStore at " + serverKeyStore);
+
+    if (!Files.exists(serverKeyStore)) {
+      keyStore.load(null, PASSWORD);
+
+      final KeyPair keyPair = 
SelfSignedCertificateGenerator.generateRsaKeyPair(2048);
+
+      final SelfSignedCertificateBuilder builder =
+          new SelfSignedCertificateBuilder(keyPair)
+              .setCommonName("Eclipse Milo Example Client")
+              .setOrganization("digitalpetri")
+              .setOrganizationalUnit("dev")
+              .setLocalityName("Folsom")
+              .setStateName("CA")
+              .setCountryCode("US")
+              .setApplicationUri("urn:eclipse:milo:examples:client")
+              .addDnsName("localhost")
+              .addIpAddress("127.0.0.1");
+
+      // Get as many hostnames and IP addresses as we can listed in the 
certificate.
+      for (String hostname : HostnameUtil.getHostnames("0.0.0.0")) {
+        if (IP_ADDR_PATTERN.matcher(hostname).matches()) {
+          builder.addIpAddress(hostname);
+        } else {
+          builder.addDnsName(hostname);
+        }
+      }
+
+      final X509Certificate certificate = builder.build();
+
+      keyStore.setKeyEntry(
+          CLIENT_ALIAS, keyPair.getPrivate(), PASSWORD, new X509Certificate[] 
{certificate});
+      try (OutputStream out = Files.newOutputStream(serverKeyStore)) {
+        keyStore.store(out, PASSWORD);
+      }
+    } else {
+      try (InputStream in = Files.newInputStream(serverKeyStore)) {
+        keyStore.load(in, PASSWORD);
+      }
+    }
+
+    final Key clientPrivateKey = keyStore.getKey(CLIENT_ALIAS, PASSWORD);
+    if (clientPrivateKey instanceof PrivateKey) {
+      clientCertificate = (X509Certificate) 
keyStore.getCertificate(CLIENT_ALIAS);
+
+      clientCertificateChain =
+          Arrays.stream(keyStore.getCertificateChain(CLIENT_ALIAS))
+              .map(X509Certificate.class::cast)
+              .toArray(X509Certificate[]::new);
+
+      final PublicKey serverPublicKey = clientCertificate.getPublicKey();
+      clientKeyPair = new KeyPair(serverPublicKey, (PrivateKey) 
clientPrivateKey);
+    }
+
+    return this;
+  }
+
+  X509Certificate getClientCertificate() {
+    return clientCertificate;
+  }
+
+  public X509Certificate[] getClientCertificateChain() {
+    return clientCertificateChain;
+  }
+
+  KeyPair getClientKeyPair() {
+    return clientKeyPair;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
new file mode 100644
index 00000000000..a46f2153e95
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.protocol.opcua.client;
+
+import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
+import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
+import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
+import org.eclipse.milo.opcua.stack.core.AttributeId;
+import org.eclipse.milo.opcua.stack.core.Identifiers;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.builtin.ExtensionObject;
+import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
+import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
+import org.eclipse.milo.opcua.stack.core.types.structured.ContentFilter;
+import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
+import org.eclipse.milo.opcua.stack.core.types.structured.EventFilter;
+import 
org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest;
+import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters;
+import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
+import 
org.eclipse.milo.opcua.stack.core.types.structured.SimpleAttributeOperand;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+
+import static 
org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
+
+public class IoTDBOpcUaClient {
+
+  private final String nodeUrl;
+
+  private final SecurityPolicy securityPolicy;
+  private final IdentityProvider identityProvider;
+
+  private final AtomicLong clientHandles = new AtomicLong(1L);
+
+  public IoTDBOpcUaClient(
+      final String nodeUrl,
+      final SecurityPolicy securityPolicy,
+      final IdentityProvider identityProvider) {
+    this.nodeUrl = nodeUrl;
+    this.securityPolicy = securityPolicy;
+    this.identityProvider = identityProvider;
+  }
+
+  public void run(OpcUaClient client) throws Exception {
+    // synchronous connect
+    client.connect().get();
+
+    // create a subscription and a monitored item
+    final UaSubscription subscription =
+        client.getSubscriptionManager().createSubscription(200.0).get();
+
+    final ReadValueId readValueId =
+        new ReadValueId(
+            Identifiers.Server, AttributeId.EventNotifier.uid(), null, 
QualifiedName.NULL_VALUE);
+
+    // client handle must be unique per item
+    final UInteger clientHandle = uint(clientHandles.getAndIncrement());
+
+    final EventFilter eventFilter =
+        new EventFilter(
+            new SimpleAttributeOperand[] {
+              new SimpleAttributeOperand(
+                  Identifiers.BaseEventType,
+                  new QualifiedName[] {new QualifiedName(0, "Time")},
+                  AttributeId.Value.uid(),
+                  null),
+              new SimpleAttributeOperand(
+                  Identifiers.BaseEventType,
+                  new QualifiedName[] {new QualifiedName(0, "Message")},
+                  AttributeId.Value.uid(),
+                  null),
+              new SimpleAttributeOperand(
+                  Identifiers.BaseEventType,
+                  new QualifiedName[] {new QualifiedName(0, "SourceName")},
+                  AttributeId.Value.uid(),
+                  null),
+              new SimpleAttributeOperand(
+                  Identifiers.BaseEventType,
+                  new QualifiedName[] {new QualifiedName(0, "SourceNode")},
+                  AttributeId.Value.uid(),
+                  null)
+            },
+            new ContentFilter(null));
+
+    final MonitoringParameters parameters =
+        new MonitoringParameters(
+            clientHandle,
+            0.0,
+            ExtensionObject.encode(client.getStaticSerializationContext(), 
eventFilter),
+            uint(10000),
+            true);
+
+    final MonitoredItemCreateRequest request =
+        new MonitoredItemCreateRequest(readValueId, MonitoringMode.Reporting, 
parameters);
+
+    final List<UaMonitoredItem> items =
+        subscription
+            .createMonitoredItems(TimestampsToReturn.Both, 
Collections.singletonList(request))
+            .get();
+
+    // do something with the value updates
+    final UaMonitoredItem monitoredItem = items.get(0);
+
+    final AtomicInteger eventCount = new AtomicInteger(0);
+
+    monitoredItem.setEventConsumer(
+        (item, vs) -> {
+          eventCount.incrementAndGet();
+          System.out.println("Event Received from " + 
item.getReadValueId().getNodeId());
+
+          for (int i = 0; i < vs.length; i++) {
+            System.out.println(("\tvariant[" + i + "]: " + vs[i].getValue()));
+          }
+        });
+  }
+
+  /////////////////////////////// Getter ///////////////////////////////
+
+  String getNodeUrl() {
+    return nodeUrl;
+  }
+
+  Predicate<EndpointDescription> endpointFilter() {
+    return e -> getSecurityPolicy().getUri().equals(e.getSecurityPolicyUri());
+  }
+
+  SecurityPolicy getSecurityPolicy() {
+    return securityPolicy;
+  }
+
+  IdentityProvider getIdentityProvider() {
+    return identityProvider;
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
index ecdc01237e9..6fd228db7ec 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
@@ -194,6 +194,23 @@ public class PipeSinkConstant {
   public static final String SINK_OPC_UA_QUALITY_NAME_KEY = 
"sink.opcua.quality-name";
   public static final String CONNECTOR_OPC_UA_QUALITY_NAME_DEFAULT_VALUE = 
"quality";
 
+  public static final String CONNECTOR_OPC_UA_NODE_URL_KEY = 
"connector.opcua.node-url";
+  public static final String SINK_OPC_UA_NODE_URL_KEY = "sink.opcua.node-url";
+
+  public static final String CONNECTOR_OPC_UA_SECURITY_POLICY_KEY =
+      "connector.opcua.security-policy";
+  public static final String SINK_OPC_UA_SECURITY_POLICY_KEY = 
"sink.opcua.security-policy";
+  public static final String 
CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_NONE_VALUE = "NONE";
+  public static final String 
CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_128_RSA_15_VALUE =
+      "BASIC128RSA15";
+  public static final String 
CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_256_VALUE = "BASIC256";
+  public static final String 
CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_256_SHA_256_VALUE =
+      "BASIC256SHA256";
+  public static final String 
CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_AES128_SHA256_RSAOAEP_VALUE =
+      "AES128_SHA256_RSAOAEP";
+  public static final String 
CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_AES256_SHA256_RSAPSS_VALUE =
+      "AES256_SHA256_RSAPSS";
+
   public static final String CONNECTOR_LEADER_CACHE_ENABLE_KEY = 
"connector.leader-cache.enable";
   public static final String SINK_LEADER_CACHE_ENABLE_KEY = 
"sink.leader-cache.enable";
   public static final boolean CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE = 
true;

Reply via email to