This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 13f6e7e61d3 [IOTDB-6234] Pipe: Added client example for
opc-ua-connector (#11460)
13f6e7e61d3 is described below
commit 13f6e7e61d368a9c4ea664f855205a8d4f7f64e7
Author: Caideyipi <[email protected]>
AuthorDate: Thu Nov 2 20:48:53 2023 +0800
[IOTDB-6234] Pipe: Added client example for opc-ua-connector (#11460)
---
example/opc-ua-sink/pom.xml | 45 +++++++
.../java/org/apache/iotdb/opcua/ClientExample.java | 50 +++++++
.../apache/iotdb/opcua/ClientExampleRunner.java | 144 +++++++++++++++++++++
.../java/org/apache/iotdb/opcua/ClientTest.java | 129 ++++++++++++++++++
.../iotdb/opcua/IoTDBKeyStoreLoaderClient.java | 128 ++++++++++++++++++
example/pom.xml | 1 +
6 files changed, 497 insertions(+)
diff --git a/example/opc-ua-sink/pom.xml b/example/opc-ua-sink/pom.xml
new file mode 100644
index 00000000000..5c19435e8b1
--- /dev/null
+++ b/example/opc-ua-sink/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <parent>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-examples</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>opc-ua-sink-example</artifactId>
+ <name>IoTDB: Example: OPCUA Sink</name>
+ <dependencies>
+ <dependency>
+ <groupId>org.eclipse.milo</groupId>
+ <artifactId>sdk-client</artifactId>
+ <version>${milo.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.milo</groupId>
+ <artifactId>sdk-server</artifactId>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git
a/example/opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientExample.java
b/example/opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientExample.java
new file mode 100644
index 00000000000..6b7f6997763
--- /dev/null
+++
b/example/opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientExample.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.opcua;
+
+import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
+import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
+
+public interface ClientExample {
+
+ default String getEndpointUrl() {
+ return "opc.tcp://127.0.0.1:12686/iotdb";
+ }
+
+ default Predicate<EndpointDescription> endpointFilter() {
+ return e -> getSecurityPolicy().getUri().equals(e.getSecurityPolicyUri());
+ }
+
+ default SecurityPolicy getSecurityPolicy() {
+ return SecurityPolicy.Basic256Sha256;
+ }
+
+ default IdentityProvider getIdentityProvider() {
+ return new AnonymousProvider();
+ }
+
+ void run(OpcUaClient client, CompletableFuture<OpcUaClient> future) throws
Exception;
+}
diff --git
a/example/opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientExampleRunner.java
b/example/opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientExampleRunner.java
new file mode 100644
index 00000000000..b9a90c2fee9
--- /dev/null
+++
b/example/opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientExampleRunner.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.opcua;
+
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import
org.eclipse.milo.opcua.stack.client.security.DefaultClientCertificateValidator;
+import org.eclipse.milo.opcua.stack.core.Stack;
+import org.eclipse.milo.opcua.stack.core.security.DefaultTrustListManager;
+import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.Security;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static
org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
+
+public class ClientExampleRunner {
+
+ static {
+ // Required for SecurityPolicy.Aes256_Sha256_RsaPss
+ Security.addProvider(new BouncyCastleProvider());
+ }
+
+ private final CompletableFuture<OpcUaClient> future = new
CompletableFuture<>();
+
+ private final ClientExample clientExample;
+
+ public ClientExampleRunner(ClientExample clientExample) {
+ this.clientExample = clientExample;
+ }
+
+ private OpcUaClient createClient() throws Exception {
+ Path securityTempDir = Paths.get(System.getProperty("java.io.tmpdir"),
"client", "security");
+ Files.createDirectories(securityTempDir);
+ if (!Files.exists(securityTempDir)) {
+ throw new Exception("unable to create security dir: " + securityTempDir);
+ }
+
+ File pkiDir = securityTempDir.resolve("pki").toFile();
+
+ System.out.println("security dir: " + securityTempDir.toAbsolutePath());
+ LoggerFactory.getLogger(getClass()).info("security pki dir: {}",
pkiDir.getAbsolutePath());
+
+ IoTDBKeyStoreLoaderClient loader = new
IoTDBKeyStoreLoaderClient().load(securityTempDir);
+
+ DefaultTrustListManager trustListManager = new
DefaultTrustListManager(pkiDir);
+
+ DefaultClientCertificateValidator certificateValidator =
+ new DefaultClientCertificateValidator(trustListManager);
+
+ return OpcUaClient.create(
+ clientExample.getEndpointUrl(),
+ endpoints ->
endpoints.stream().filter(clientExample.endpointFilter()).findFirst(),
+ configBuilder ->
+ configBuilder
+ .setApplicationName(LocalizedText.english("eclipse milo opc-ua
client"))
+ .setApplicationUri("urn:eclipse:milo:examples:client")
+ .setKeyPair(loader.getClientKeyPair())
+ .setCertificate(loader.getClientCertificate())
+ .setCertificateChain(loader.getClientCertificateChain())
+ .setCertificateValidator(certificateValidator)
+ .setIdentityProvider(clientExample.getIdentityProvider())
+ .setRequestTimeout(uint(5000))
+ .build());
+ }
+
+ public void run() {
+ try {
+ OpcUaClient client = createClient();
+
+ future.whenCompleteAsync(
+ (c, ex) -> {
+ if (ex != null) {
+ System.out.println("Error running example: " + ex.getMessage());
+ }
+
+ try {
+ client.disconnect().get();
+ Stack.releaseSharedResources();
+ } catch (InterruptedException | ExecutionException e) {
+ Thread.currentThread().interrupt();
+ System.out.println("Error disconnecting: {}" + e.getMessage());
+ }
+
+ try {
+ Thread.sleep(1000);
+ System.exit(0);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ e.printStackTrace();
+ }
+ });
+
+ try {
+ clientExample.run(client, future);
+ future.get(100000, TimeUnit.SECONDS);
+ } catch (Throwable t) {
+ System.out.println("Error running client example: " + t.getMessage() +
t);
+ future.completeExceptionally(t);
+ }
+ } catch (Throwable t) {
+ System.out.println("Error getting client: {}" + t.getMessage());
+
+ future.completeExceptionally(t);
+
+ try {
+ Thread.sleep(1000);
+ System.exit(0);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ try {
+ Thread.sleep(999_999_999);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git
a/example/opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientTest.java
b/example/opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientTest.java
new file mode 100644
index 00000000000..515de3ba24b
--- /dev/null
+++ b/example/opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.opcua;
+
+import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
+import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
+import org.eclipse.milo.opcua.stack.core.AttributeId;
+import org.eclipse.milo.opcua.stack.core.Identifiers;
+import org.eclipse.milo.opcua.stack.core.types.builtin.ExtensionObject;
+import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
+import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
+import org.eclipse.milo.opcua.stack.core.types.structured.ContentFilter;
+import org.eclipse.milo.opcua.stack.core.types.structured.EventFilter;
+import
org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest;
+import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters;
+import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
+import
org.eclipse.milo.opcua.stack.core.types.structured.SimpleAttributeOperand;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static
org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
+
+public class ClientTest implements ClientExample {
+
+ public static void main(String[] args) {
+ ClientTest example = new ClientTest();
+
+ new ClientExampleRunner(example).run();
+ }
+
+ private final AtomicLong clientHandles = new AtomicLong(1L);
+
+ @Override
+ public void run(OpcUaClient client, CompletableFuture<OpcUaClient> future)
throws Exception {
+ // synchronous connect
+ client.connect().get();
+
+ // create a subscription and a monitored item
+ UaSubscription subscription =
client.getSubscriptionManager().createSubscription(200.0).get();
+
+ ReadValueId readValueId =
+ new ReadValueId(
+ Identifiers.Server, AttributeId.EventNotifier.uid(), null,
QualifiedName.NULL_VALUE);
+
+ // client handle must be unique per item
+ UInteger clientHandle = uint(clientHandles.getAndIncrement());
+
+ EventFilter eventFilter =
+ new EventFilter(
+ new SimpleAttributeOperand[] {
+ new SimpleAttributeOperand(
+ Identifiers.BaseEventType,
+ new QualifiedName[] {new QualifiedName(0, "Time")},
+ AttributeId.Value.uid(),
+ null),
+ new SimpleAttributeOperand(
+ Identifiers.BaseEventType,
+ new QualifiedName[] {new QualifiedName(0, "Message")},
+ AttributeId.Value.uid(),
+ null),
+ new SimpleAttributeOperand(
+ Identifiers.BaseEventType,
+ new QualifiedName[] {new QualifiedName(0, "SourceName")},
+ AttributeId.Value.uid(),
+ null),
+ new SimpleAttributeOperand(
+ Identifiers.BaseEventType,
+ new QualifiedName[] {new QualifiedName(0, "SourceNode")},
+ AttributeId.Value.uid(),
+ null)
+ },
+ new ContentFilter(null));
+
+ MonitoringParameters parameters =
+ new MonitoringParameters(
+ clientHandle,
+ 0.0,
+ ExtensionObject.encode(client.getStaticSerializationContext(),
eventFilter),
+ uint(10000),
+ true);
+
+ MonitoredItemCreateRequest request =
+ new MonitoredItemCreateRequest(readValueId, MonitoringMode.Reporting,
parameters);
+
+ List<UaMonitoredItem> items =
+ subscription
+ .createMonitoredItems(TimestampsToReturn.Both,
Collections.singletonList(request))
+ .get();
+
+ // do something with the value updates
+ UaMonitoredItem monitoredItem = items.get(0);
+
+ final AtomicInteger eventCount = new AtomicInteger(0);
+
+ monitoredItem.setEventConsumer(
+ (item, vs) -> {
+ eventCount.incrementAndGet();
+ System.out.println("Event Received from " +
item.getReadValueId().getNodeId());
+
+ for (int i = 0; i < vs.length; i++) {
+ System.out.println(("\tvariant[" + i + "]: " + vs[i].getValue()));
+ }
+ });
+ }
+}
diff --git
a/example/opc-ua-sink/src/main/java/org/apache/iotdb/opcua/IoTDBKeyStoreLoaderClient.java
b/example/opc-ua-sink/src/main/java/org/apache/iotdb/opcua/IoTDBKeyStoreLoaderClient.java
new file mode 100644
index 00000000000..0b15fe93b31
--- /dev/null
+++
b/example/opc-ua-sink/src/main/java/org/apache/iotdb/opcua/IoTDBKeyStoreLoaderClient.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.opcua;
+
+import org.eclipse.milo.opcua.sdk.server.util.HostnameUtil;
+import org.eclipse.milo.opcua.stack.core.util.SelfSignedCertificateBuilder;
+import org.eclipse.milo.opcua.stack.core.util.SelfSignedCertificateGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.Key;
+import java.security.KeyPair;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.PublicKey;
+import java.security.cert.X509Certificate;
+import java.util.Arrays;
+import java.util.regex.Pattern;
+
+class IoTDBKeyStoreLoaderClient {
+
+ private static final Pattern IP_ADDR_PATTERN =
+
Pattern.compile("^(([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.){3}([01]?\\d\\d?|2[0-4]\\d|25[0-5])$");
+
+ private static final String CLIENT_ALIAS = "client-ai";
+ private static final char[] PASSWORD = "root".toCharArray();
+
+ private final Logger logger = LoggerFactory.getLogger(getClass());
+
+ private X509Certificate[] clientCertificateChain;
+ private X509Certificate clientCertificate;
+ private KeyPair clientKeyPair;
+
+ IoTDBKeyStoreLoaderClient load(Path baseDir) throws Exception {
+ KeyStore keyStore = KeyStore.getInstance("PKCS12");
+
+ Path serverKeyStore = baseDir.resolve("example-client.pfx");
+
+ System.out.println("Loading KeyStore at " + serverKeyStore);
+
+ if (!Files.exists(serverKeyStore)) {
+ keyStore.load(null, PASSWORD);
+
+ KeyPair keyPair =
SelfSignedCertificateGenerator.generateRsaKeyPair(2048);
+
+ SelfSignedCertificateBuilder builder =
+ new SelfSignedCertificateBuilder(keyPair)
+ .setCommonName("Eclipse Milo Example Client")
+ .setOrganization("digitalpetri")
+ .setOrganizationalUnit("dev")
+ .setLocalityName("Folsom")
+ .setStateName("CA")
+ .setCountryCode("US")
+ .setApplicationUri("urn:eclipse:milo:examples:client")
+ .addDnsName("localhost")
+ .addIpAddress("127.0.0.1");
+
+ // Get as many hostnames and IP addresses as we can listed in the
certificate.
+ for (String hostname : HostnameUtil.getHostnames("0.0.0.0")) {
+ if (IP_ADDR_PATTERN.matcher(hostname).matches()) {
+ builder.addIpAddress(hostname);
+ } else {
+ builder.addDnsName(hostname);
+ }
+ }
+
+ X509Certificate certificate = builder.build();
+
+ keyStore.setKeyEntry(
+ CLIENT_ALIAS, keyPair.getPrivate(), PASSWORD, new X509Certificate[]
{certificate});
+ try (OutputStream out = Files.newOutputStream(serverKeyStore)) {
+ keyStore.store(out, PASSWORD);
+ }
+ } else {
+ try (InputStream in = Files.newInputStream(serverKeyStore)) {
+ keyStore.load(in, PASSWORD);
+ }
+ }
+
+ Key clientPrivateKey = keyStore.getKey(CLIENT_ALIAS, PASSWORD);
+ if (clientPrivateKey instanceof PrivateKey) {
+ clientCertificate = (X509Certificate)
keyStore.getCertificate(CLIENT_ALIAS);
+
+ clientCertificateChain =
+ Arrays.stream(keyStore.getCertificateChain(CLIENT_ALIAS))
+ .map(X509Certificate.class::cast)
+ .toArray(X509Certificate[]::new);
+
+ PublicKey serverPublicKey = clientCertificate.getPublicKey();
+ clientKeyPair = new KeyPair(serverPublicKey, (PrivateKey)
clientPrivateKey);
+ }
+
+ return this;
+ }
+
+ X509Certificate getClientCertificate() {
+ return clientCertificate;
+ }
+
+ public X509Certificate[] getClientCertificateChain() {
+ return clientCertificateChain;
+ }
+
+ KeyPair getClientKeyPair() {
+ return clientKeyPair;
+ }
+}
diff --git a/example/pom.xml b/example/pom.xml
index 2680c44805c..68e27f7c009 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -39,6 +39,7 @@
<module>flink</module>
<module>mqtt</module>
<module>mqtt-customize</module>
+ <module>opc-ua-sink</module>
<module>pulsar</module>
<module>udf</module>
<module>trigger</module>