This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch client-opc
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/client-opc by this push:
new efa0fe4077c bone
efa0fe4077c is described below
commit efa0fe4077c4c3021eb1727dff95037f17acc385
Author: Caideyipi <[email protected]>
AuthorDate: Mon Dec 22 16:20:30 2025 +0800
bone
---
iotdb-core/datanode/pom.xml | 5 +
.../db/pipe/sink/protocol/opcua/OpcUaSink.java | 77 +++++++++-
.../sink/protocol/opcua/client/ClientRunner.java | 147 +++++++++++++++++++
.../opcua/client/IoTDBKeyStoreLoaderClient.java | 124 ++++++++++++++++
.../protocol/opcua/client/IoTDBOpcUaClient.java | 158 +++++++++++++++++++++
.../pipe/config/constant/PipeSinkConstant.java | 17 +++
6 files changed, 526 insertions(+), 2 deletions(-)
diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml
index 284eb679672..d7d2628f36d 100644
--- a/iotdb-core/datanode/pom.xml
+++ b/iotdb-core/datanode/pom.xml
@@ -191,6 +191,11 @@
<groupId>org.eclipse.milo</groupId>
<artifactId>stack-server</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.eclipse.milo</groupId>
+ <artifactId>sdk-client</artifactId>
+ <version>${milo.version}</version>
+ </dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-http</artifactId>
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
index 9a505ce90f0..2c1ebf9a834 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.IoTDBOpcUaClient;
import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace;
import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaServerBuilder;
import org.apache.iotdb.db.storageengine.StorageEngine;
@@ -40,7 +41,11 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.record.Tablet;
+import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
+import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
+import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
import org.eclipse.milo.opcua.sdk.server.OpcUaServer;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,12 +71,20 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CON
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_PUB_SUB_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_NODE_URL_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_PLACEHOLDER_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_PLACEHOLDER_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_NAME_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_NAME_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_AES128_SHA256_RSAOAEP_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_AES256_SHA256_RSAPSS_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_128_RSA_15_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_256_SHA_256_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_256_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_NONE_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_VALUE_NAME_DEFAULT_VALUE;
@@ -84,9 +97,11 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SIN
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_HTTPS_BIND_PORT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_MODEL_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_NODE_URL_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_PLACEHOLDER_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_QUALITY_NAME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_SECURITY_DIR_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_SECURITY_POLICY_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_TCP_BIND_PORT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_VALUE_NAME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_WITH_QUALITY_KEY;
@@ -114,7 +129,12 @@ public class OpcUaSink implements PipeConnector {
String placeHolder;
@Nullable String valueName;
@Nullable String qualityName;
- private OpcUaNameSpace nameSpace;
+
+ // Inner server
+ private @Nullable OpcUaNameSpace nameSpace;
+
+ // Outer server
+ private @Nullable IoTDBOpcUaClient client;
@Override
public void validate(final PipeParameterValidator validator) throws
Exception {
@@ -139,6 +159,17 @@ public class OpcUaSink implements PipeConnector {
public void customize(
final PipeParameters parameters, final PipeConnectorRuntimeConfiguration
configuration)
throws Exception {
+ final String nodeUrl =
+ parameters.getStringByKeys(CONNECTOR_OPC_UA_NODE_URL_KEY,
SINK_OPC_UA_NODE_URL_KEY);
+ if (Objects.isNull(nodeUrl)) {
+ customizeServer(parameters, configuration);
+ } else {
+ customizeClient(nodeUrl, parameters);
+ }
+ }
+
+ private void customizeServer(
+ final PipeParameters parameters, final PipeConnectorRuntimeConfiguration
configuration) {
final int tcpBindPort =
parameters.getIntOrDefault(
Arrays.asList(CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY,
SINK_OPC_UA_TCP_BIND_PORT_KEY),
@@ -250,6 +281,48 @@ public class OpcUaSink implements PipeConnector {
}
}
+ private void customizeClient(final String nodeUrl, final PipeParameters
parameters) {
+ final SecurityPolicy policy;
+ switch (parameters
+ .getStringOrDefault(
+ Arrays.asList(CONNECTOR_OPC_UA_SECURITY_POLICY_KEY,
SINK_OPC_UA_SECURITY_POLICY_KEY),
+ CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_256_SHA_256_VALUE)
+ .toUpperCase()) {
+ case CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_NONE_VALUE:
+ policy = SecurityPolicy.None;
+ break;
+ case CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_128_RSA_15_VALUE:
+ policy = SecurityPolicy.Basic128Rsa15;
+ break;
+ case CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_256_VALUE:
+ policy = SecurityPolicy.Basic256;
+ break;
+ case CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_256_SHA_256_VALUE:
+ policy = SecurityPolicy.Basic256Sha256;
+ break;
+ case
CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_AES128_SHA256_RSAOAEP_VALUE:
+ policy = SecurityPolicy.Aes128_Sha256_RsaOaep;
+ break;
+ case CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_AES256_SHA256_RSAPSS_VALUE:
+ policy = SecurityPolicy.Aes256_Sha256_RsaPss;
+ break;
+ default:
+ policy = null;
+ break;
+ }
+
+ final IdentityProvider provider;
+ final String userName =
+ parameters.getStringByKeys(CONNECTOR_IOTDB_USER_KEY,
SINK_IOTDB_USER_KEY);
+ provider =
+ Objects.nonNull(userName)
+ ? new UsernameProvider(
+ userName,
+ parameters.getStringByKeys(CONNECTOR_IOTDB_PASSWORD_KEY,
SINK_IOTDB_PASSWORD_KEY))
+ : new AnonymousProvider();
+ client = new IoTDBOpcUaClient(nodeUrl, policy, provider);
+ }
+
@Override
public void handshake() throws Exception {
// Server side, do nothing
@@ -359,7 +432,7 @@ public class OpcUaSink implements PipeConnector {
}
}
- // Getter
+ /////////////////////////////// Getter ///////////////////////////////
public boolean isClientServerModel() {
return isClientServerModel;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
new file mode 100644
index 00000000000..b0bc8709098
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.protocol.opcua.client;
+
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import
org.eclipse.milo.opcua.stack.client.security.DefaultClientCertificateValidator;
+import org.eclipse.milo.opcua.stack.core.Stack;
+import org.eclipse.milo.opcua.stack.core.security.DefaultTrustListManager;
+import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.Security;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static
org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
+
+public class ClientRunner {
+
+ static {
+ // Required for SecurityPolicy.Aes256_Sha256_RsaPss
+ Security.addProvider(new BouncyCastleProvider());
+ }
+
+ private final CompletableFuture<OpcUaClient> future = new
CompletableFuture<>();
+
+ private final IoTDBOpcUaClient configurableUaClient;
+
+ public ClientRunner(IoTDBOpcUaClient configurableUaClient) {
+ this.configurableUaClient = configurableUaClient;
+ }
+
+ private OpcUaClient createClient() throws Exception {
+ final Path securityTempDir =
+ Paths.get(System.getProperty("java.io.tmpdir"), "client", "security");
+ Files.createDirectories(securityTempDir);
+ if (!Files.exists(securityTempDir)) {
+ throw new Exception("unable to create security dir: " + securityTempDir);
+ }
+
+ final File pkiDir = securityTempDir.resolve("pki").toFile();
+
+ System.out.println("security dir: " + securityTempDir.toAbsolutePath());
+ LoggerFactory.getLogger(getClass()).info("security pki dir: {}",
pkiDir.getAbsolutePath());
+
+ final IoTDBKeyStoreLoaderClient loader = new
IoTDBKeyStoreLoaderClient().load(securityTempDir);
+
+ final DefaultTrustListManager trustListManager = new
DefaultTrustListManager(pkiDir);
+
+ final DefaultClientCertificateValidator certificateValidator =
+ new DefaultClientCertificateValidator(trustListManager);
+
+ return OpcUaClient.create(
+ configurableUaClient.getNodeUrl(),
+ endpoints ->
endpoints.stream().filter(configurableUaClient.endpointFilter()).findFirst(),
+ configBuilder ->
+ configBuilder
+ .setApplicationName(LocalizedText.english("eclipse milo opc-ua
client"))
+ .setApplicationUri("urn:eclipse:milo:examples:client")
+ .setKeyPair(loader.getClientKeyPair())
+ .setCertificate(loader.getClientCertificate())
+ .setCertificateChain(loader.getClientCertificateChain())
+ .setCertificateValidator(certificateValidator)
+
.setIdentityProvider(configurableUaClient.getIdentityProvider())
+ .setRequestTimeout(uint(5000))
+ .build());
+ }
+
+ public void run() {
+ try {
+ final OpcUaClient client = createClient();
+
+ future.whenCompleteAsync(
+ (c, ex) -> {
+ if (ex != null) {
+ System.out.println("Error running example: " + ex.getMessage());
+ }
+
+ try {
+ client.disconnect().get();
+ Stack.releaseSharedResources();
+ } catch (InterruptedException | ExecutionException e) {
+ Thread.currentThread().interrupt();
+ System.out.println("Error disconnecting: {}" + e.getMessage());
+ }
+
+ try {
+ Thread.sleep(1000);
+ System.exit(0);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ e.printStackTrace();
+ }
+ });
+
+ try {
+ configurableUaClient.run(client);
+ future.get(100000, TimeUnit.SECONDS);
+ } catch (Throwable t) {
+ System.out.println("Error running client example: " + t.getMessage() +
t);
+ future.completeExceptionally(t);
+ }
+ } catch (Throwable t) {
+ System.out.println("Error getting client: {}" + t.getMessage());
+
+ future.completeExceptionally(t);
+
+ try {
+ Thread.sleep(1000);
+ System.exit(0);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ e.printStackTrace();
+ }
+ }
+
+ try {
+ Thread.sleep(999_999_999);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ e.printStackTrace();
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBKeyStoreLoaderClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBKeyStoreLoaderClient.java
new file mode 100644
index 00000000000..fc524a60c71
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBKeyStoreLoaderClient.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.protocol.opcua.client;
+
+import org.eclipse.milo.opcua.sdk.server.util.HostnameUtil;
+import org.eclipse.milo.opcua.stack.core.util.SelfSignedCertificateBuilder;
+import org.eclipse.milo.opcua.stack.core.util.SelfSignedCertificateGenerator;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.Key;
+import java.security.KeyPair;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.PublicKey;
+import java.security.cert.X509Certificate;
+import java.util.Arrays;
+import java.util.regex.Pattern;
+
+class IoTDBKeyStoreLoaderClient {
+
+ private static final Pattern IP_ADDR_PATTERN =
+
Pattern.compile("^(([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.){3}([01]?\\d\\d?|2[0-4]\\d|25[0-5])$");
+
+ private static final String CLIENT_ALIAS = "client-ai";
+ private static final char[] PASSWORD = "root".toCharArray();
+
+ private X509Certificate[] clientCertificateChain;
+ private X509Certificate clientCertificate;
+ private KeyPair clientKeyPair;
+
+ IoTDBKeyStoreLoaderClient load(Path baseDir) throws Exception {
+ final KeyStore keyStore = KeyStore.getInstance("PKCS12");
+
+ final Path serverKeyStore = baseDir.resolve("example-client.pfx");
+
+ System.out.println("Loading KeyStore at " + serverKeyStore);
+
+ if (!Files.exists(serverKeyStore)) {
+ keyStore.load(null, PASSWORD);
+
+ final KeyPair keyPair =
SelfSignedCertificateGenerator.generateRsaKeyPair(2048);
+
+ final SelfSignedCertificateBuilder builder =
+ new SelfSignedCertificateBuilder(keyPair)
+ .setCommonName("Eclipse Milo Example Client")
+ .setOrganization("digitalpetri")
+ .setOrganizationalUnit("dev")
+ .setLocalityName("Folsom")
+ .setStateName("CA")
+ .setCountryCode("US")
+ .setApplicationUri("urn:eclipse:milo:examples:client")
+ .addDnsName("localhost")
+ .addIpAddress("127.0.0.1");
+
+ // Get as many hostnames and IP addresses as we can listed in the
certificate.
+ for (String hostname : HostnameUtil.getHostnames("0.0.0.0")) {
+ if (IP_ADDR_PATTERN.matcher(hostname).matches()) {
+ builder.addIpAddress(hostname);
+ } else {
+ builder.addDnsName(hostname);
+ }
+ }
+
+ final X509Certificate certificate = builder.build();
+
+ keyStore.setKeyEntry(
+ CLIENT_ALIAS, keyPair.getPrivate(), PASSWORD, new X509Certificate[]
{certificate});
+ try (OutputStream out = Files.newOutputStream(serverKeyStore)) {
+ keyStore.store(out, PASSWORD);
+ }
+ } else {
+ try (InputStream in = Files.newInputStream(serverKeyStore)) {
+ keyStore.load(in, PASSWORD);
+ }
+ }
+
+ final Key clientPrivateKey = keyStore.getKey(CLIENT_ALIAS, PASSWORD);
+ if (clientPrivateKey instanceof PrivateKey) {
+ clientCertificate = (X509Certificate)
keyStore.getCertificate(CLIENT_ALIAS);
+
+ clientCertificateChain =
+ Arrays.stream(keyStore.getCertificateChain(CLIENT_ALIAS))
+ .map(X509Certificate.class::cast)
+ .toArray(X509Certificate[]::new);
+
+ final PublicKey serverPublicKey = clientCertificate.getPublicKey();
+ clientKeyPair = new KeyPair(serverPublicKey, (PrivateKey)
clientPrivateKey);
+ }
+
+ return this;
+ }
+
+ X509Certificate getClientCertificate() {
+ return clientCertificate;
+ }
+
+ public X509Certificate[] getClientCertificateChain() {
+ return clientCertificateChain;
+ }
+
+ KeyPair getClientKeyPair() {
+ return clientKeyPair;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
new file mode 100644
index 00000000000..a46f2153e95
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.protocol.opcua.client;
+
+import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
+import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
+import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
+import org.eclipse.milo.opcua.stack.core.AttributeId;
+import org.eclipse.milo.opcua.stack.core.Identifiers;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.builtin.ExtensionObject;
+import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
+import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
+import org.eclipse.milo.opcua.stack.core.types.structured.ContentFilter;
+import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
+import org.eclipse.milo.opcua.stack.core.types.structured.EventFilter;
+import
org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest;
+import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters;
+import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
+import
org.eclipse.milo.opcua.stack.core.types.structured.SimpleAttributeOperand;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+
+import static
org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
+
+public class IoTDBOpcUaClient {
+
+ private final String nodeUrl;
+
+ private final SecurityPolicy securityPolicy;
+ private final IdentityProvider identityProvider;
+
+ private final AtomicLong clientHandles = new AtomicLong(1L);
+
+ public IoTDBOpcUaClient(
+ final String nodeUrl,
+ final SecurityPolicy securityPolicy,
+ final IdentityProvider identityProvider) {
+ this.nodeUrl = nodeUrl;
+ this.securityPolicy = securityPolicy;
+ this.identityProvider = identityProvider;
+ }
+
+ public void run(OpcUaClient client) throws Exception {
+ // synchronous connect
+ client.connect().get();
+
+ // create a subscription and a monitored item
+ final UaSubscription subscription =
+ client.getSubscriptionManager().createSubscription(200.0).get();
+
+ final ReadValueId readValueId =
+ new ReadValueId(
+ Identifiers.Server, AttributeId.EventNotifier.uid(), null,
QualifiedName.NULL_VALUE);
+
+ // client handle must be unique per item
+ final UInteger clientHandle = uint(clientHandles.getAndIncrement());
+
+ final EventFilter eventFilter =
+ new EventFilter(
+ new SimpleAttributeOperand[] {
+ new SimpleAttributeOperand(
+ Identifiers.BaseEventType,
+ new QualifiedName[] {new QualifiedName(0, "Time")},
+ AttributeId.Value.uid(),
+ null),
+ new SimpleAttributeOperand(
+ Identifiers.BaseEventType,
+ new QualifiedName[] {new QualifiedName(0, "Message")},
+ AttributeId.Value.uid(),
+ null),
+ new SimpleAttributeOperand(
+ Identifiers.BaseEventType,
+ new QualifiedName[] {new QualifiedName(0, "SourceName")},
+ AttributeId.Value.uid(),
+ null),
+ new SimpleAttributeOperand(
+ Identifiers.BaseEventType,
+ new QualifiedName[] {new QualifiedName(0, "SourceNode")},
+ AttributeId.Value.uid(),
+ null)
+ },
+ new ContentFilter(null));
+
+ final MonitoringParameters parameters =
+ new MonitoringParameters(
+ clientHandle,
+ 0.0,
+ ExtensionObject.encode(client.getStaticSerializationContext(),
eventFilter),
+ uint(10000),
+ true);
+
+ final MonitoredItemCreateRequest request =
+ new MonitoredItemCreateRequest(readValueId, MonitoringMode.Reporting,
parameters);
+
+ final List<UaMonitoredItem> items =
+ subscription
+ .createMonitoredItems(TimestampsToReturn.Both,
Collections.singletonList(request))
+ .get();
+
+ // do something with the value updates
+ final UaMonitoredItem monitoredItem = items.get(0);
+
+ final AtomicInteger eventCount = new AtomicInteger(0);
+
+ monitoredItem.setEventConsumer(
+ (item, vs) -> {
+ eventCount.incrementAndGet();
+ System.out.println("Event Received from " +
item.getReadValueId().getNodeId());
+
+ for (int i = 0; i < vs.length; i++) {
+ System.out.println(("\tvariant[" + i + "]: " + vs[i].getValue()));
+ }
+ });
+ }
+
+ /////////////////////////////// Getter ///////////////////////////////
+
+ String getNodeUrl() {
+ return nodeUrl;
+ }
+
+ Predicate<EndpointDescription> endpointFilter() {
+ return e -> getSecurityPolicy().getUri().equals(e.getSecurityPolicyUri());
+ }
+
+ SecurityPolicy getSecurityPolicy() {
+ return securityPolicy;
+ }
+
+ IdentityProvider getIdentityProvider() {
+ return identityProvider;
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
index ecdc01237e9..6fd228db7ec 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
@@ -194,6 +194,23 @@ public class PipeSinkConstant {
public static final String SINK_OPC_UA_QUALITY_NAME_KEY =
"sink.opcua.quality-name";
public static final String CONNECTOR_OPC_UA_QUALITY_NAME_DEFAULT_VALUE =
"quality";
+ public static final String CONNECTOR_OPC_UA_NODE_URL_KEY =
"connector.opcua.node-url";
+ public static final String SINK_OPC_UA_NODE_URL_KEY = "sink.opcua.node-url";
+
+ public static final String CONNECTOR_OPC_UA_SECURITY_POLICY_KEY =
+ "connector.opcua.security-policy";
+ public static final String SINK_OPC_UA_SECURITY_POLICY_KEY =
"sink.opcua.security-policy";
+ public static final String
CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_NONE_VALUE = "NONE";
+ public static final String
CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_128_RSA_15_VALUE =
+ "BASIC128RSA15";
+ public static final String
CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_256_VALUE = "BASIC256";
+ public static final String
CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_BASIC_256_SHA_256_VALUE =
+ "BASIC256SHA256";
+ public static final String
CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_AES128_SHA256_RSAOAEP_VALUE =
+ "AES128_SHA256_RSAOAEP";
+ public static final String
CONNECTOR_OPC_UA_QUALITY_SECURITY_POLICY_AES256_SHA256_RSAPSS_VALUE =
+ "AES256_SHA256_RSAPSS";
+
public static final String CONNECTOR_LEADER_CACHE_ENABLE_KEY =
"connector.leader-cache.enable";
public static final String SINK_LEADER_CACHE_ENABLE_KEY =
"sink.leader-cache.enable";
public static final boolean CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE =
true;