This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 13f6e7e61d3 [IOTDB-6234] Pipe: Added client example for 
opc-ua-connector (#11460)
13f6e7e61d3 is described below

commit 13f6e7e61d368a9c4ea664f855205a8d4f7f64e7
Author: Caideyipi <[email protected]>
AuthorDate: Thu Nov 2 20:48:53 2023 +0800

    [IOTDB-6234] Pipe: Added client example for opc-ua-connector (#11460)
---
 example/opc-ua-sink/pom.xml                        |  45 +++++++
 .../java/org/apache/iotdb/opcua/ClientExample.java |  50 +++++++
 .../apache/iotdb/opcua/ClientExampleRunner.java    | 144 +++++++++++++++++++++
 .../java/org/apache/iotdb/opcua/ClientTest.java    | 129 ++++++++++++++++++
 .../iotdb/opcua/IoTDBKeyStoreLoaderClient.java     | 128 ++++++++++++++++++
 example/pom.xml                                    |   1 +
 6 files changed, 497 insertions(+)

diff --git a/example/opc-ua-sink/pom.xml b/example/opc-ua-sink/pom.xml
new file mode 100644
index 00000000000..5c19435e8b1
--- /dev/null
+++ b/example/opc-ua-sink/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+    <parent>
+        <groupId>org.apache.iotdb</groupId>
+        <artifactId>iotdb-examples</artifactId>
+        <version>1.3.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>opc-ua-sink-example</artifactId>
+    <name>IoTDB: Example: OPCUA Sink</name>
+    <dependencies>
+        <dependency>
+            <groupId>org.eclipse.milo</groupId>
+            <artifactId>sdk-client</artifactId>
+            <version>${milo.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.milo</groupId>
+            <artifactId>sdk-server</artifactId>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git 
a/example/opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientExample.java 
b/example/opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientExample.java
new file mode 100644
index 00000000000..6b7f6997763
--- /dev/null
+++ 
b/example/opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientExample.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.opcua;
+
+import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
+import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
+
+public interface ClientExample {
+
+  default String getEndpointUrl() {
+    return "opc.tcp://127.0.0.1:12686/iotdb";
+  }
+
+  default Predicate<EndpointDescription> endpointFilter() {
+    return e -> getSecurityPolicy().getUri().equals(e.getSecurityPolicyUri());
+  }
+
+  default SecurityPolicy getSecurityPolicy() {
+    return SecurityPolicy.Basic256Sha256;
+  }
+
+  default IdentityProvider getIdentityProvider() {
+    return new AnonymousProvider();
+  }
+
+  void run(OpcUaClient client, CompletableFuture<OpcUaClient> future) throws 
Exception;
+}
diff --git 
a/example/opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientExampleRunner.java
 
b/example/opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientExampleRunner.java
new file mode 100644
index 00000000000..b9a90c2fee9
--- /dev/null
+++ 
b/example/opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientExampleRunner.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.opcua;
+
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import 
org.eclipse.milo.opcua.stack.client.security.DefaultClientCertificateValidator;
+import org.eclipse.milo.opcua.stack.core.Stack;
+import org.eclipse.milo.opcua.stack.core.security.DefaultTrustListManager;
+import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.Security;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
+
+public class ClientExampleRunner {
+
+  static {
+    // Required for SecurityPolicy.Aes256_Sha256_RsaPss
+    Security.addProvider(new BouncyCastleProvider());
+  }
+
+  private final CompletableFuture<OpcUaClient> future = new 
CompletableFuture<>();
+
+  private final ClientExample clientExample;
+
+  public ClientExampleRunner(ClientExample clientExample) {
+    this.clientExample = clientExample;
+  }
+
+  private OpcUaClient createClient() throws Exception {
+    Path securityTempDir = Paths.get(System.getProperty("java.io.tmpdir"), 
"client", "security");
+    Files.createDirectories(securityTempDir);
+    if (!Files.exists(securityTempDir)) {
+      throw new Exception("unable to create security dir: " + securityTempDir);
+    }
+
+    File pkiDir = securityTempDir.resolve("pki").toFile();
+
+    System.out.println("security dir: " + securityTempDir.toAbsolutePath());
+    LoggerFactory.getLogger(getClass()).info("security pki dir: {}", 
pkiDir.getAbsolutePath());
+
+    IoTDBKeyStoreLoaderClient loader = new 
IoTDBKeyStoreLoaderClient().load(securityTempDir);
+
+    DefaultTrustListManager trustListManager = new 
DefaultTrustListManager(pkiDir);
+
+    DefaultClientCertificateValidator certificateValidator =
+        new DefaultClientCertificateValidator(trustListManager);
+
+    return OpcUaClient.create(
+        clientExample.getEndpointUrl(),
+        endpoints -> 
endpoints.stream().filter(clientExample.endpointFilter()).findFirst(),
+        configBuilder ->
+            configBuilder
+                .setApplicationName(LocalizedText.english("eclipse milo opc-ua 
client"))
+                .setApplicationUri("urn:eclipse:milo:examples:client")
+                .setKeyPair(loader.getClientKeyPair())
+                .setCertificate(loader.getClientCertificate())
+                .setCertificateChain(loader.getClientCertificateChain())
+                .setCertificateValidator(certificateValidator)
+                .setIdentityProvider(clientExample.getIdentityProvider())
+                .setRequestTimeout(uint(5000))
+                .build());
+  }
+
+  public void run() {
+    try {
+      OpcUaClient client = createClient();
+
+      future.whenCompleteAsync(
+          (c, ex) -> {
+            if (ex != null) {
+              System.out.println("Error running example: " + ex.getMessage());
+            }
+
+            try {
+              client.disconnect().get();
+              Stack.releaseSharedResources();
+            } catch (InterruptedException | ExecutionException e) {
+              Thread.currentThread().interrupt();
+              System.out.println("Error disconnecting: {}" + e.getMessage());
+            }
+
+            try {
+              Thread.sleep(1000);
+              System.exit(0);
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+              e.printStackTrace();
+            }
+          });
+
+      try {
+        clientExample.run(client, future);
+        future.get(100000, TimeUnit.SECONDS);
+      } catch (Throwable t) {
+        System.out.println("Error running client example: " + t.getMessage() + 
t);
+        future.completeExceptionally(t);
+      }
+    } catch (Throwable t) {
+      System.out.println("Error getting client: {}" + t.getMessage());
+
+      future.completeExceptionally(t);
+
+      try {
+        Thread.sleep(1000);
+        System.exit(0);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+
+    try {
+      Thread.sleep(999_999_999);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+}
diff --git 
a/example/opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientTest.java 
b/example/opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientTest.java
new file mode 100644
index 00000000000..515de3ba24b
--- /dev/null
+++ b/example/opc-ua-sink/src/main/java/org/apache/iotdb/opcua/ClientTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.opcua;
+
+import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
+import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
+import org.eclipse.milo.opcua.stack.core.AttributeId;
+import org.eclipse.milo.opcua.stack.core.Identifiers;
+import org.eclipse.milo.opcua.stack.core.types.builtin.ExtensionObject;
+import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
+import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
+import org.eclipse.milo.opcua.stack.core.types.structured.ContentFilter;
+import org.eclipse.milo.opcua.stack.core.types.structured.EventFilter;
+import 
org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest;
+import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters;
+import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
+import 
org.eclipse.milo.opcua.stack.core.types.structured.SimpleAttributeOperand;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static 
org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
+
+public class ClientTest implements ClientExample {
+
+  public static void main(String[] args) {
+    ClientTest example = new ClientTest();
+
+    new ClientExampleRunner(example).run();
+  }
+
+  private final AtomicLong clientHandles = new AtomicLong(1L);
+
+  @Override
+  public void run(OpcUaClient client, CompletableFuture<OpcUaClient> future) 
throws Exception {
+    // synchronous connect
+    client.connect().get();
+
+    // create a subscription and a monitored item
+    UaSubscription subscription = 
client.getSubscriptionManager().createSubscription(200.0).get();
+
+    ReadValueId readValueId =
+        new ReadValueId(
+            Identifiers.Server, AttributeId.EventNotifier.uid(), null, 
QualifiedName.NULL_VALUE);
+
+    // client handle must be unique per item
+    UInteger clientHandle = uint(clientHandles.getAndIncrement());
+
+    EventFilter eventFilter =
+        new EventFilter(
+            new SimpleAttributeOperand[] {
+              new SimpleAttributeOperand(
+                  Identifiers.BaseEventType,
+                  new QualifiedName[] {new QualifiedName(0, "Time")},
+                  AttributeId.Value.uid(),
+                  null),
+              new SimpleAttributeOperand(
+                  Identifiers.BaseEventType,
+                  new QualifiedName[] {new QualifiedName(0, "Message")},
+                  AttributeId.Value.uid(),
+                  null),
+              new SimpleAttributeOperand(
+                  Identifiers.BaseEventType,
+                  new QualifiedName[] {new QualifiedName(0, "SourceName")},
+                  AttributeId.Value.uid(),
+                  null),
+              new SimpleAttributeOperand(
+                  Identifiers.BaseEventType,
+                  new QualifiedName[] {new QualifiedName(0, "SourceNode")},
+                  AttributeId.Value.uid(),
+                  null)
+            },
+            new ContentFilter(null));
+
+    MonitoringParameters parameters =
+        new MonitoringParameters(
+            clientHandle,
+            0.0,
+            ExtensionObject.encode(client.getStaticSerializationContext(), 
eventFilter),
+            uint(10000),
+            true);
+
+    MonitoredItemCreateRequest request =
+        new MonitoredItemCreateRequest(readValueId, MonitoringMode.Reporting, 
parameters);
+
+    List<UaMonitoredItem> items =
+        subscription
+            .createMonitoredItems(TimestampsToReturn.Both, 
Collections.singletonList(request))
+            .get();
+
+    // do something with the value updates
+    UaMonitoredItem monitoredItem = items.get(0);
+
+    final AtomicInteger eventCount = new AtomicInteger(0);
+
+    monitoredItem.setEventConsumer(
+        (item, vs) -> {
+          eventCount.incrementAndGet();
+          System.out.println("Event Received from " + 
item.getReadValueId().getNodeId());
+
+          for (int i = 0; i < vs.length; i++) {
+            System.out.println(("\tvariant[" + i + "]: " + vs[i].getValue()));
+          }
+        });
+  }
+}
diff --git 
a/example/opc-ua-sink/src/main/java/org/apache/iotdb/opcua/IoTDBKeyStoreLoaderClient.java
 
b/example/opc-ua-sink/src/main/java/org/apache/iotdb/opcua/IoTDBKeyStoreLoaderClient.java
new file mode 100644
index 00000000000..0b15fe93b31
--- /dev/null
+++ 
b/example/opc-ua-sink/src/main/java/org/apache/iotdb/opcua/IoTDBKeyStoreLoaderClient.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.opcua;
+
+import org.eclipse.milo.opcua.sdk.server.util.HostnameUtil;
+import org.eclipse.milo.opcua.stack.core.util.SelfSignedCertificateBuilder;
+import org.eclipse.milo.opcua.stack.core.util.SelfSignedCertificateGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.Key;
+import java.security.KeyPair;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.PublicKey;
+import java.security.cert.X509Certificate;
+import java.util.Arrays;
+import java.util.regex.Pattern;
+
+class IoTDBKeyStoreLoaderClient {
+
+  private static final Pattern IP_ADDR_PATTERN =
+      
Pattern.compile("^(([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.){3}([01]?\\d\\d?|2[0-4]\\d|25[0-5])$");
+
+  private static final String CLIENT_ALIAS = "client-ai";
+  private static final char[] PASSWORD = "root".toCharArray();
+
+  private final Logger logger = LoggerFactory.getLogger(getClass());
+
+  private X509Certificate[] clientCertificateChain;
+  private X509Certificate clientCertificate;
+  private KeyPair clientKeyPair;
+
+  IoTDBKeyStoreLoaderClient load(Path baseDir) throws Exception {
+    KeyStore keyStore = KeyStore.getInstance("PKCS12");
+
+    Path serverKeyStore = baseDir.resolve("example-client.pfx");
+
+    System.out.println("Loading KeyStore at " + serverKeyStore);
+
+    if (!Files.exists(serverKeyStore)) {
+      keyStore.load(null, PASSWORD);
+
+      KeyPair keyPair = 
SelfSignedCertificateGenerator.generateRsaKeyPair(2048);
+
+      SelfSignedCertificateBuilder builder =
+          new SelfSignedCertificateBuilder(keyPair)
+              .setCommonName("Eclipse Milo Example Client")
+              .setOrganization("digitalpetri")
+              .setOrganizationalUnit("dev")
+              .setLocalityName("Folsom")
+              .setStateName("CA")
+              .setCountryCode("US")
+              .setApplicationUri("urn:eclipse:milo:examples:client")
+              .addDnsName("localhost")
+              .addIpAddress("127.0.0.1");
+
+      // Get as many hostnames and IP addresses as we can listed in the 
certificate.
+      for (String hostname : HostnameUtil.getHostnames("0.0.0.0")) {
+        if (IP_ADDR_PATTERN.matcher(hostname).matches()) {
+          builder.addIpAddress(hostname);
+        } else {
+          builder.addDnsName(hostname);
+        }
+      }
+
+      X509Certificate certificate = builder.build();
+
+      keyStore.setKeyEntry(
+          CLIENT_ALIAS, keyPair.getPrivate(), PASSWORD, new X509Certificate[] 
{certificate});
+      try (OutputStream out = Files.newOutputStream(serverKeyStore)) {
+        keyStore.store(out, PASSWORD);
+      }
+    } else {
+      try (InputStream in = Files.newInputStream(serverKeyStore)) {
+        keyStore.load(in, PASSWORD);
+      }
+    }
+
+    Key clientPrivateKey = keyStore.getKey(CLIENT_ALIAS, PASSWORD);
+    if (clientPrivateKey instanceof PrivateKey) {
+      clientCertificate = (X509Certificate) 
keyStore.getCertificate(CLIENT_ALIAS);
+
+      clientCertificateChain =
+          Arrays.stream(keyStore.getCertificateChain(CLIENT_ALIAS))
+              .map(X509Certificate.class::cast)
+              .toArray(X509Certificate[]::new);
+
+      PublicKey serverPublicKey = clientCertificate.getPublicKey();
+      clientKeyPair = new KeyPair(serverPublicKey, (PrivateKey) 
clientPrivateKey);
+    }
+
+    return this;
+  }
+
+  X509Certificate getClientCertificate() {
+    return clientCertificate;
+  }
+
+  public X509Certificate[] getClientCertificateChain() {
+    return clientCertificateChain;
+  }
+
+  KeyPair getClientKeyPair() {
+    return clientKeyPair;
+  }
+}
diff --git a/example/pom.xml b/example/pom.xml
index 2680c44805c..68e27f7c009 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -39,6 +39,7 @@
         <module>flink</module>
         <module>mqtt</module>
         <module>mqtt-customize</module>
+        <module>opc-ua-sink</module>
         <module>pulsar</module>
         <module>udf</module>
         <module>trigger</module>

Reply via email to