This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new cb18a95fc01 Pipe: Implemented OPC Sink for outer server & Set 
configuration and changed the default value of the server security policies & 
Made the default quality configurable and does not throw when non-value/quality 
measurement is encountered (#16944)
cb18a95fc01 is described below

commit cb18a95fc014c116d3e0041a753d5493432189bb
Author: Caideyipi <[email protected]>
AuthorDate: Tue Dec 30 11:55:04 2025 +0800

    Pipe: Implemented OPC Sink for outer server & Set configuration and changed 
the default value of the server security policies & Made the default quality 
configurable and does not throw when non-value/quality measurement is 
encountered (#16944)
    
    * pj
    
    * cj
    
    * bone
    
    * fix
    
    * fix
    
    * framework
    
    * fix
    
    * trilog
    
    * framework
    
    * fix
    
    * fix
    
    * yl
    
    * stack-client
    
    * fix
    
    * might
    
    * sleep-removal
    
    * cleaning
    
    * fix
    
    * sec-dir
    
    * cleaning
    
    * remove-poison
    
    * f
    
    * fix
    
    * clean-sit
    
    * sit-comp
    
    * object
    
    * many-clean
    
    * sit-sit
    
    * fix
    
    * fix
    
    * fix
    
    * ref
    
    * sit
    
    * partial
    
    * security-policies
    
    * check-equals
    
    * check-err
    
    * fix
    
    * compile-fix
    
    * adjust
    
    * ut
    
    * refactor
    
    * fix_and_IT
    
    * fix
    
    * placeholder
    
    * rollback
    
    * eliminate-fault
    
    * pw
    
    * fix
    
    * f
    
    * fix
---
 integration-test/pom.xml                           |   1 -
 .../iotdb/pipe/it/single/AbstractPipeSingleIT.java |   2 +-
 .../iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java     | 190 +++++++++++--
 iotdb-core/datanode/pom.xml                        |  12 +
 .../db/pipe/sink/protocol/opcua/OpcUaSink.java     | 305 ++++++++++++++++++---
 .../sink/protocol/opcua/client/ClientRunner.java   | 112 ++++++++
 .../opcua/client/IoTDBKeyStoreLoaderClient.java    | 127 +++++++++
 .../protocol/opcua/client/IoTDBOpcUaClient.java    | 301 ++++++++++++++++++++
 .../opcua/{ => server}/OpcUaKeyStoreLoader.java    |   2 +-
 .../opcua/{ => server}/OpcUaNameSpace.java         |  83 ++++--
 .../opcua/{ => server}/OpcUaServerBuilder.java     |  97 ++++---
 .../apache/iotdb/db/pipe/sink/PipeSinkTest.java    |   3 +-
 .../pipe/config/constant/PipeSinkConstant.java     |  37 ++-
 pom.xml                                            |  10 +
 14 files changed, 1135 insertions(+), 147 deletions(-)

diff --git a/integration-test/pom.xml b/integration-test/pom.xml
index 150419970a0..f566c8e5995 100644
--- a/integration-test/pom.xml
+++ b/integration-test/pom.xml
@@ -183,7 +183,6 @@
         <dependency>
             <groupId>org.bouncycastle</groupId>
             <artifactId>bcprov-jdk18on</artifactId>
-            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>junit</groupId>
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
index 3bb3ecce21e..3ade13c7209 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
@@ -31,7 +31,7 @@ abstract class AbstractPipeSingleIT {
 
   @Before
   public void setUp() {
-    MultiEnvFactory.createEnv(2);
+    MultiEnvFactory.createEnv(1);
     env = MultiEnvFactory.getEnv(0);
     env.getConfig()
         .getCommonConfig()
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
index ee9e5400aad..4c04b793197 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
@@ -20,68 +20,174 @@
 package org.apache.iotdb.pipe.it.single;
 
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.ClientRunner;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.IoTDBOpcUaClient;
+import org.apache.iotdb.it.env.cluster.EnvUtils;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT1;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.pipe.it.dual.tablemodel.TableModelUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
+import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
+import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
+import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
+import java.io.File;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE;
+import static 
org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.timestampToUtc;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT1.class})
 public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
   @Test
-  public void testOPCUASink() throws Exception {
+  public void testOPCUAServerSink() throws Exception {
     try (final SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) env.getLeaderConfigNodeConnection()) {
 
       TestUtils.executeNonQuery(env, "insert into root.db.d1(time, s1) values 
(1, 1)", null);
 
-      final Map<String, String> connectorAttributes = new HashMap<>();
-      connectorAttributes.put("sink", "opc-ua-sink");
-      connectorAttributes.put("opcua.model", "client-server");
+      final Map<String, String> sinkAttributes = new HashMap<>();
+
+      sinkAttributes.put("sink", "opc-ua-sink");
+      sinkAttributes.put("opcua.model", "client-server");
+      sinkAttributes.put("security-policy", "None");
+
+      final int[] ports = EnvUtils.searchAvailablePorts();
+      final int tcpPort = ports[0];
+      final int httpsPort = ports[1];
+      sinkAttributes.put("tcp.port", Integer.toString(tcpPort));
+      sinkAttributes.put("https.port", Integer.toString(httpsPort));
 
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(),
           client
               .createPipe(
-                  new TCreatePipeReq("testPipe", connectorAttributes)
-                      .setExtractorAttributes(Collections.emptyMap())
+                  new TCreatePipeReq("testPipe", sinkAttributes)
+                      .setExtractorAttributes(Collections.singletonMap("user", 
"root"))
                       .setProcessorAttributes(Collections.emptyMap()))
               .getCode());
+
+      final OpcUaClient opcUaClient =
+          getOpcUaClient(
+              "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", 
SecurityPolicy.None, "root", "root");
+      DataValue value =
+          opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, 
"root/db/d1/s1")).get();
+      Assert.assertEquals(new Variant(1.0), value.getValue());
+      Assert.assertEquals(new DateTime(timestampToUtc(1)), 
value.getSourceTime());
+
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+          client
+              .alterPipe(
+                  new TAlterPipeReq()
+                      .setPipeName("testPipe")
+                      .setIsReplaceAllConnectorAttributes(false)
+                      
.setConnectorAttributes(Collections.singletonMap("with-quality", "true"))
+                      .setProcessorAttributes(Collections.emptyMap())
+                      .setExtractorAttributes(Collections.emptyMap()))
+              .getCode());
+
+      TestUtils.executeNonQuery(
+          env,
+          "insert into root.db.opc(time, value, quality, other) values (1, 1, 
false, 1)",
+          null);
+
+      long startTime = System.currentTimeMillis();
+      while (true) {
+        try {
+          value =
+              opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, 
"root/db/opc")).get();
+          Assert.assertEquals(new Variant(1.0), value.getValue());
+          Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
+          Assert.assertEquals(new DateTime(timestampToUtc(1)), 
value.getSourceTime());
+          break;
+        } catch (final Throwable t) {
+          if (System.currentTimeMillis() - startTime > 10_000L) {
+            throw t;
+          }
+        }
+      }
+
+      TestUtils.executeNonQuery(
+          env, "insert into root.db.opc(time, quality) values (2, true)", 
null);
+      TestUtils.executeNonQuery(env, "insert into root.db.opc(time, value) 
values (2, 2)", null);
+
+      startTime = System.currentTimeMillis();
+      while (true) {
+        try {
+          value =
+              opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, 
"root/db/opc")).get();
+          Assert.assertEquals(new DateTime(timestampToUtc(2)), 
value.getSourceTime());
+          Assert.assertEquals(new Variant(2.0), value.getValue());
+          Assert.assertEquals(StatusCode.UNCERTAIN, value.getStatusCode());
+          break;
+        } catch (final Throwable t) {
+          if (System.currentTimeMillis() - startTime > 10_000L) {
+            throw t;
+          }
+        }
+      }
+
+      opcUaClient.disconnect().get();
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.dropPipe("testPipe").getCode());
 
       // Test reconstruction
-      connectorAttributes.put("password123456", "test");
+      sinkAttributes.put("password", "test");
+      sinkAttributes.put("security-policy", "basic256sha256");
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(),
           client
               .createPipe(
-                  new TCreatePipeReq("testPipe", connectorAttributes)
+                  new TCreatePipeReq("testPipe", sinkAttributes)
                       .setExtractorAttributes(Collections.emptyMap())
                       .setProcessorAttributes(Collections.emptyMap()))
               .getCode());
 
+      // Banned none, only allows basic256sha256
+      Assert.assertThrows(
+          PipeException.class,
+          () ->
+              getOpcUaClient(
+                  "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb",
+                  SecurityPolicy.None,
+                  "root",
+                  "root"));
+
       // Test conflict
-      connectorAttributes.put("password123456", "conflict");
-      Assert.assertEquals(
-          TSStatusCode.PIPE_ERROR.getStatusCode(),
-          client
-              .createPipe(
-                  new TCreatePipeReq("testPipe", connectorAttributes)
-                      .setExtractorAttributes(Collections.emptyMap())
-                      .setProcessorAttributes(Collections.emptyMap()))
-              .getCode());
+      sinkAttributes.put("password", "conflict");
+      try {
+        TestUtils.executeNonQuery(
+            env, "create pipe test1 ('sink'='opc-ua-sink', 
'password'='conflict')", null);
+        Assert.fail();
+      } catch (final Exception e) {
+        Assert.assertEquals(
+            "org.apache.iotdb.jdbc.IoTDBSQLException: 1107: The existing 
server with tcp port 12686 and https port 8443's password **** conflicts to the 
new password ****, reject reusing.",
+            e.getMessage());
+      }
     }
   }
 
@@ -93,42 +199,74 @@ public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT 
{
       TableModelUtils.createDataBaseAndTable(env, "test", "test");
       TableModelUtils.insertData("test", "test", 0, 10, env);
 
-      final Map<String, String> connectorAttributes = new HashMap<>();
-      connectorAttributes.put("sink", "opc-ua-sink");
-      connectorAttributes.put("opcua.model", "client-server");
+      final Map<String, String> sourceAttributes = new HashMap<>();
+      final Map<String, String> sinkAttributes = new HashMap<>();
+      sourceAttributes.put("capture.table", "true");
+      sourceAttributes.put("user", "root");
+
+      sinkAttributes.put("sink", "opc-ua-sink");
+      sinkAttributes.put("opcua.model", "client-server");
+
+      final int[] ports = EnvUtils.searchAvailablePorts();
+      final int tcpPort = ports[0];
+      final int httpsPort = ports[1];
+      sinkAttributes.put("tcp.port", Integer.toString(tcpPort));
+      sinkAttributes.put("https.port", Integer.toString(httpsPort));
 
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(),
           client
               .createPipe(
-                  new TCreatePipeReq("testPipe", connectorAttributes)
-                      
.setExtractorAttributes(Collections.singletonMap("capture.table", "true"))
+                  new TCreatePipeReq("testPipe", sinkAttributes)
+                      .setExtractorAttributes(sourceAttributes)
                       .setProcessorAttributes(Collections.emptyMap()))
               .getCode());
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.dropPipe("testPipe").getCode());
 
       // Test reconstruction
-      connectorAttributes.put("password123456", "test");
+      sinkAttributes.put("password123456", "test");
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(),
           client
               .createPipe(
-                  new TCreatePipeReq("testPipe", connectorAttributes)
+                  new TCreatePipeReq("testPipe", sinkAttributes)
                       .setExtractorAttributes(Collections.emptyMap())
                       .setProcessorAttributes(Collections.emptyMap()))
               .getCode());
 
       // Test conflict
-      connectorAttributes.put("password123456", "conflict");
+      sinkAttributes.put("password123456", "conflict");
       Assert.assertEquals(
           TSStatusCode.PIPE_ERROR.getStatusCode(),
           client
               .createPipe(
-                  new TCreatePipeReq("testPipe", connectorAttributes)
+                  new TCreatePipeReq("testPipe", sinkAttributes)
                       .setExtractorAttributes(Collections.emptyMap())
                       .setProcessorAttributes(Collections.emptyMap()))
               .getCode());
     }
   }
+
+  private static OpcUaClient getOpcUaClient(
+      final String nodeUrl,
+      final SecurityPolicy policy,
+      final String userName,
+      final String password) {
+    final IoTDBOpcUaClient client;
+
+    final IdentityProvider provider =
+        Objects.nonNull(userName)
+            ? new UsernameProvider(userName, password)
+            : new AnonymousProvider();
+
+    final String securityDir =
+        CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE
+            + File.separatorChar
+            + 
UUID.nameUUIDFromBytes(nodeUrl.getBytes(TSFileConfig.STRING_CHARSET));
+
+    client = new IoTDBOpcUaClient(nodeUrl, policy, provider, false);
+    new ClientRunner(client, securityDir, password).run();
+    return client.getClient();
+  }
 }
diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml
index 284eb679672..85c96595381 100644
--- a/iotdb-core/datanode/pom.xml
+++ b/iotdb-core/datanode/pom.xml
@@ -191,6 +191,18 @@
             <groupId>org.eclipse.milo</groupId>
             <artifactId>stack-server</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.eclipse.milo</groupId>
+            <artifactId>stack-client</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.milo</groupId>
+            <artifactId>sdk-client</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.bouncycastle</groupId>
+            <artifactId>bcprov-jdk18on</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.eclipse.jetty</groupId>
             <artifactId>jetty-http</artifactId>
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
index 7404a8b03bc..071db688dd8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
@@ -24,6 +24,10 @@ import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.ClientRunner;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.IoTDBOpcUaClient;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaServerBuilder;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.pipe.api.PipeConnector;
@@ -36,9 +40,15 @@ import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
+import org.apache.tsfile.common.conf.TSFileConfig;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.write.record.Tablet;
+import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
+import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
+import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
 import org.eclipse.milo.opcua.sdk.server.OpcUaServer;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,28 +58,46 @@ import java.io.File;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_BAD_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_UNCERTAIN_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_HISTORIZING_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_HISTORIZING_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_HTTPS_BIND_PORT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_PUB_SUB_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_PLACEHOLDER_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_NODE_URL_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_PLACEHOLDER_4_NULL_TAG_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_PLACEHOLDER_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_NAME_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_NAME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_AES128_SHA256_RSAOAEP_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_AES256_SHA256_RSAPSS_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_128_RSA_15_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_SHA_256_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_NONE_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_SERVER_DEFAULT_VALUES;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_VALUE_NAME_DEFAULT_VALUE;
@@ -79,12 +107,16 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CON
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USERNAME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USER_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_DEFAULT_QUALITY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_HISTORIZING_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_HTTPS_BIND_PORT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_MODEL_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_NODE_URL_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_PLACEHOLDER_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_QUALITY_NAME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_SECURITY_DIR_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_SECURITY_POLICY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_TCP_BIND_PORT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_VALUE_NAME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_WITH_QUALITY_KEY;
@@ -107,12 +139,18 @@ public class OpcUaSink implements PipeConnector {
       SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP = new 
ConcurrentHashMap<>();
 
   private String serverKey;
-  boolean isClientServerModel;
-  String unQualifiedDatabaseName;
-  String placeHolder;
-  @Nullable String valueName;
-  @Nullable String qualityName;
-  private OpcUaNameSpace nameSpace;
+  private boolean isClientServerModel;
+  private String databaseName;
+  private String placeHolder4NullTag;
+  private @Nullable String valueName;
+  private @Nullable String qualityName;
+  private StatusCode defaultQuality;
+
+  // Inner server
+  private @Nullable OpcUaNameSpace nameSpace;
+
+  // Outer server
+  private @Nullable IoTDBOpcUaClient client;
 
   @Override
   public void validate(final PipeParameterValidator validator) throws 
Exception {
@@ -131,12 +169,89 @@ public class OpcUaSink implements PipeConnector {
             Arrays.asList(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY),
             Arrays.asList(CONNECTOR_IOTDB_USERNAME_KEY, 
SINK_IOTDB_USERNAME_KEY),
             false);
+
+    final PipeParameters parameters = validator.getParameters();
+    if (validator
+            .getParameters()
+            .hasAnyAttributes(CONNECTOR_OPC_UA_NODE_URL_KEY, 
SINK_OPC_UA_NODE_URL_KEY)
+        || parameters.getBooleanOrDefault(
+            Arrays.asList(CONNECTOR_OPC_UA_WITH_QUALITY_KEY, 
SINK_OPC_UA_WITH_QUALITY_KEY),
+            CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE)) {
+      validator.validate(
+          CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE::equals,
+          String.format(
+              "When the OPC UA sink points to an outer server or sets 
'with-quality' to true, the %s or %s must be %s.",
+              CONNECTOR_OPC_UA_MODEL_KEY,
+              SINK_OPC_UA_MODEL_KEY,
+              CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE),
+          parameters.getStringOrDefault(
+              Arrays.asList(CONNECTOR_OPC_UA_MODEL_KEY, SINK_OPC_UA_MODEL_KEY),
+              CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE));
+    }
   }
 
   @Override
   public void customize(
       final PipeParameters parameters, final PipeConnectorRuntimeConfiguration 
configuration)
       throws Exception {
+    final boolean withQuality =
+        parameters.getBooleanOrDefault(
+            Arrays.asList(CONNECTOR_OPC_UA_WITH_QUALITY_KEY, 
SINK_OPC_UA_WITH_QUALITY_KEY),
+            CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE);
+    valueName =
+        withQuality
+            ? parameters.getStringOrDefault(
+                Arrays.asList(CONNECTOR_OPC_UA_VALUE_NAME_KEY, 
SINK_OPC_UA_VALUE_NAME_KEY),
+                CONNECTOR_OPC_UA_VALUE_NAME_DEFAULT_VALUE)
+            : null;
+    qualityName =
+        withQuality
+            ? parameters.getStringOrDefault(
+                Arrays.asList(CONNECTOR_OPC_UA_QUALITY_NAME_KEY, 
SINK_OPC_UA_QUALITY_NAME_KEY),
+                CONNECTOR_OPC_UA_QUALITY_NAME_DEFAULT_VALUE)
+            : null;
+    defaultQuality =
+        getQuality(
+            withQuality
+                ? parameters.getStringOrDefault(
+                    Arrays.asList(
+                        CONNECTOR_OPC_UA_DEFAULT_QUALITY_KEY, 
SINK_OPC_UA_DEFAULT_QUALITY_KEY),
+                    CONNECTOR_OPC_UA_DEFAULT_QUALITY_UNCERTAIN_VALUE)
+                : CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE);
+    isClientServerModel =
+        parameters
+            .getStringOrDefault(
+                Arrays.asList(CONNECTOR_OPC_UA_MODEL_KEY, 
SINK_OPC_UA_MODEL_KEY),
+                CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE)
+            .equals(CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE);
+    placeHolder4NullTag =
+        parameters.getStringOrDefault(
+            Arrays.asList(CONNECTOR_OPC_UA_PLACEHOLDER_KEY, 
SINK_OPC_UA_PLACEHOLDER_KEY),
+            CONNECTOR_OPC_UA_PLACEHOLDER_4_NULL_TAG_DEFAULT_VALUE);
+    final DataRegion region =
+        StorageEngine.getInstance()
+            .getDataRegion(new 
DataRegionId(configuration.getRuntimeEnvironment().getRegionId()));
+    databaseName = Objects.nonNull(region) ? region.getDatabaseName() : 
"root.__temp_db";
+
+    if (withQuality && PathUtils.isTableModelDatabase(databaseName)) {
+      throw new PipeException(
+          "When the OPC UA sink sets 'with-quality' to true, the table model 
data is not supported.");
+    }
+
+    final String nodeUrl =
+        parameters.getStringByKeys(CONNECTOR_OPC_UA_NODE_URL_KEY, 
SINK_OPC_UA_NODE_URL_KEY);
+    if (Objects.isNull(nodeUrl)) {
+      customizeServer(parameters);
+    } else {
+      if (PathUtils.isTableModelDatabase(databaseName)) {
+        throw new PipeException(
+            "When the OPC UA sink points to an outer server, the table model 
data is not supported.");
+      }
+      customizeClient(nodeUrl, parameters);
+    }
+  }
+
+  private void customizeServer(final PipeParameters parameters) {
     final int tcpBindPort =
         parameters.getIntOrDefault(
             Arrays.asList(CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY, 
SINK_OPC_UA_TCP_BIND_PORT_KEY),
@@ -173,40 +288,21 @@ public class OpcUaSink implements PipeConnector {
                 CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY,
                 SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY),
             CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE);
-    placeHolder =
-        parameters.getStringOrDefault(
-            Arrays.asList(CONNECTOR_OPC_UA_PLACEHOLDER_KEY, 
SINK_OPC_UA_PLACEHOLDER_KEY),
-            CONNECTOR_OPC_UA_PLACEHOLDER_DEFAULT_VALUE);
-    final boolean withQuality =
-        parameters.getBooleanOrDefault(
-            Arrays.asList(CONNECTOR_OPC_UA_WITH_QUALITY_KEY, 
SINK_OPC_UA_WITH_QUALITY_KEY),
-            CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE);
-    valueName =
-        withQuality
-            ? parameters.getStringOrDefault(
-                Arrays.asList(CONNECTOR_OPC_UA_VALUE_NAME_KEY, 
SINK_OPC_UA_VALUE_NAME_KEY),
-                CONNECTOR_OPC_UA_VALUE_NAME_DEFAULT_VALUE)
-            : null;
-    qualityName =
-        withQuality
-            ? parameters.getStringOrDefault(
-                Arrays.asList(CONNECTOR_OPC_UA_QUALITY_NAME_KEY, 
SINK_OPC_UA_QUALITY_NAME_KEY),
-                CONNECTOR_OPC_UA_QUALITY_NAME_DEFAULT_VALUE)
-            : null;
-    isClientServerModel =
-        parameters
-            .getStringOrDefault(
-                Arrays.asList(CONNECTOR_OPC_UA_MODEL_KEY, 
SINK_OPC_UA_MODEL_KEY),
-                CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE)
-            .equals(CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE);
-
-    final DataRegion region =
-        StorageEngine.getInstance()
-            .getDataRegion(new 
DataRegionId(configuration.getRuntimeEnvironment().getRegionId()));
-    unQualifiedDatabaseName =
-        Objects.nonNull(region)
-            ? PathUtils.unQualifyDatabaseName(region.getDatabaseName())
-            : "__temp_db";
+    final Set<SecurityPolicy> securityPolicies =
+        (parameters.hasAnyAttributes(
+                    CONNECTOR_OPC_UA_SECURITY_POLICY_KEY, 
SINK_OPC_UA_SECURITY_POLICY_KEY)
+                ? Arrays.stream(
+                    parameters
+                        .getStringByKeys(
+                            CONNECTOR_OPC_UA_SECURITY_POLICY_KEY, 
SINK_OPC_UA_SECURITY_POLICY_KEY)
+                        .replace(" ", "")
+                        .split(","))
+                : 
CONNECTOR_OPC_UA_SECURITY_POLICY_SERVER_DEFAULT_VALUES.stream())
+            .map(this::getSecurityPolicy)
+            .collect(Collectors.toSet());
+    if (securityPolicies.isEmpty()) {
+      throw new PipeException("The security policy cannot be empty.");
+    }
 
     synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP) {
       serverKey = httpsBindPort + ":" + tcpBindPort;
@@ -225,7 +321,8 @@ public class OpcUaSink implements PipeConnector {
                                 .setUser(user)
                                 .setPassword(password)
                                 .setSecurityDir(securityDir)
-                                
.setEnableAnonymousAccess(enableAnonymousAccess);
+                                
.setEnableAnonymousAccess(enableAnonymousAccess)
+                                .setSecurityPolicies(securityPolicies);
                         final OpcUaServer newServer = builder.build();
                         nameSpace = new OpcUaNameSpace(newServer, builder);
                         nameSpace.startup();
@@ -234,7 +331,12 @@ public class OpcUaSink implements PipeConnector {
                       } else {
                         oldValue
                             .getRight()
-                            .checkEquals(user, password, securityDir, 
enableAnonymousAccess);
+                            .checkEquals(
+                                user,
+                                password,
+                                securityDir,
+                                enableAnonymousAccess,
+                                securityPolicies);
                         return oldValue;
                       }
                     } catch (final PipeException e) {
@@ -248,6 +350,80 @@ public class OpcUaSink implements PipeConnector {
     }
   }
 
+  private void customizeClient(final String nodeUrl, final PipeParameters 
parameters) {
+    final SecurityPolicy policy =
+        getSecurityPolicy(
+            parameters
+                .getStringOrDefault(
+                    Arrays.asList(
+                        CONNECTOR_OPC_UA_SECURITY_POLICY_KEY, 
SINK_OPC_UA_SECURITY_POLICY_KEY),
+                    CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_SHA_256_VALUE)
+                .toUpperCase());
+
+    final IdentityProvider provider;
+    final String userName =
+        parameters.getStringByKeys(CONNECTOR_IOTDB_USER_KEY, 
SINK_IOTDB_USER_KEY);
+    final String password =
+        parameters.getStringOrDefault(
+            Arrays.asList(CONNECTOR_IOTDB_PASSWORD_KEY, 
SINK_IOTDB_PASSWORD_KEY),
+            CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE);
+    provider =
+        Objects.nonNull(userName)
+            ? new UsernameProvider(userName, password)
+            : new AnonymousProvider();
+
+    final String securityDir =
+        IoTDBConfig.addDataHomeDir(
+            parameters.getStringOrDefault(
+                Arrays.asList(CONNECTOR_OPC_UA_SECURITY_DIR_KEY, 
SINK_OPC_UA_SECURITY_DIR_KEY),
+                CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE
+                    + File.separatorChar
+                    + 
UUID.nameUUIDFromBytes(nodeUrl.getBytes(TSFileConfig.STRING_CHARSET))));
+
+    client =
+        new IoTDBOpcUaClient(
+            nodeUrl,
+            policy,
+            provider,
+            parameters.getBooleanOrDefault(
+                Arrays.asList(CONNECTOR_OPC_UA_HISTORIZING_KEY, 
SINK_OPC_UA_HISTORIZING_KEY),
+                CONNECTOR_OPC_UA_HISTORIZING_DEFAULT_VALUE));
+    new ClientRunner(client, securityDir, password).run();
+  }
+
+  private SecurityPolicy getSecurityPolicy(final String securityPolicy) {
+    switch (securityPolicy.toUpperCase()) {
+      case CONNECTOR_OPC_UA_SECURITY_POLICY_NONE_VALUE:
+        return SecurityPolicy.None;
+      case CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_128_RSA_15_VALUE:
+        return SecurityPolicy.Basic128Rsa15;
+      case CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_VALUE:
+        return SecurityPolicy.Basic256;
+      case CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_SHA_256_VALUE:
+        return SecurityPolicy.Basic256Sha256;
+      case CONNECTOR_OPC_UA_SECURITY_POLICY_AES128_SHA256_RSAOAEP_VALUE:
+        return SecurityPolicy.Aes128_Sha256_RsaOaep;
+      case CONNECTOR_OPC_UA_SECURITY_POLICY_AES256_SHA256_RSAPSS_VALUE:
+        return SecurityPolicy.Aes256_Sha256_RsaPss;
+      default:
+        throw new PipeException(
+            "The security policy can only be 'None', 'Basic128Rsa15', 
'Basic256', 'Basic256Sha256', 'Aes128_Sha256_RsaOaep' or 
'Aes256_Sha256_RsaPss'.");
+    }
+  }
+
+  private StatusCode getQuality(final String quality) {
+    switch (quality.toUpperCase()) {
+      case CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE:
+        return StatusCode.GOOD;
+      case CONNECTOR_OPC_UA_DEFAULT_QUALITY_BAD_VALUE:
+        return StatusCode.BAD;
+      case CONNECTOR_OPC_UA_DEFAULT_QUALITY_UNCERTAIN_VALUE:
+        return StatusCode.UNCERTAIN;
+      default:
+        throw new PipeException("The default quality can only be 'GOOD', 'BAD' 
or 'UNCERTAIN'.");
+    }
+  }
+
   @Override
   public void handshake() throws Exception {
     // Server side, do nothing
@@ -268,7 +444,16 @@ public class OpcUaSink implements PipeConnector {
     transferByTablet(
         tabletInsertionEvent,
         LOGGER,
-        (tablet, isTableModel) -> nameSpace.transfer(tablet, isTableModel, 
this));
+        (tablet, isTableModel) -> {
+          if (Objects.nonNull(nameSpace)) {
+            nameSpace.transfer(tablet, isTableModel, this);
+          } else if (Objects.nonNull(client)) {
+            client.transfer(tablet, this);
+          } else {
+            throw new PipeException(
+                "No OPC client or server is specified when transferring 
tablet");
+          }
+        });
   }
 
   public static void transferByTablet(
@@ -336,6 +521,10 @@ public class OpcUaSink implements PipeConnector {
 
   @Override
   public void close() throws Exception {
+    if (Objects.nonNull(client)) {
+      client.disconnect();
+    }
+
     if (serverKey == null) {
       return;
     }
@@ -356,4 +545,32 @@ public class OpcUaSink implements PipeConnector {
       }
     }
   }
+
+  /////////////////////////////// Getter ///////////////////////////////
+
+  public boolean isClientServerModel() {
+    return isClientServerModel;
+  }
+
+  public String getDatabaseName() {
+    return databaseName;
+  }
+
+  public String getPlaceHolder4NullTag() {
+    return placeHolder4NullTag;
+  }
+
+  @Nullable
+  public String getValueName() {
+    return valueName;
+  }
+
+  @Nullable
+  public String getQualityName() {
+    return qualityName;
+  }
+
+  public StatusCode getDefaultQuality() {
+    return defaultQuality;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
new file mode 100644
index 00000000000..725ecbbae98
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.protocol.opcua.client;
+
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import 
org.eclipse.milo.opcua.stack.client.security.DefaultClientCertificateValidator;
+import org.eclipse.milo.opcua.stack.core.security.DefaultTrustListManager;
+import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.Security;
+
+import static 
org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
+
+public class ClientRunner {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(ClientRunner.class);
+
+  static {
+    // Required for SecurityPolicy.Aes256_Sha256_RsaPss
+    Security.addProvider(new BouncyCastleProvider());
+  }
+
+  private final IoTDBOpcUaClient configurableUaClient;
+  private final Path securityDir;
+  private final String password;
+
+  public ClientRunner(
+      final IoTDBOpcUaClient configurableUaClient,
+      final String securityDir,
+      final String password) {
+    this.configurableUaClient = configurableUaClient;
+    this.securityDir = Paths.get(securityDir);
+    this.password = password;
+  }
+
+  private OpcUaClient createClient() throws Exception {
+    Files.createDirectories(securityDir);
+    if (!Files.exists(securityDir)) {
+      throw new Exception("unable to create security dir: " + securityDir);
+    }
+
+    final File pkiDir = securityDir.resolve("pki").toFile();
+
+    logger.info("security dir: {}", securityDir.toAbsolutePath());
+    logger.info("security pki dir: {}", pkiDir.getAbsolutePath());
+
+    final IoTDBKeyStoreLoaderClient loader =
+        new IoTDBKeyStoreLoaderClient().load(securityDir, 
password.toCharArray());
+
+    final DefaultTrustListManager trustListManager = new 
DefaultTrustListManager(pkiDir);
+
+    final DefaultClientCertificateValidator certificateValidator =
+        new DefaultClientCertificateValidator(trustListManager);
+
+    return OpcUaClient.create(
+        configurableUaClient.getNodeUrl(),
+        endpoints -> 
endpoints.stream().filter(configurableUaClient.endpointFilter()).findFirst(),
+        configBuilder ->
+            configBuilder
+                .setApplicationName(LocalizedText.english("Apache IoTDB OPC UA 
client"))
+                .setApplicationUri("urn:apache:iotdb:opc-ua-client")
+                .setKeyPair(loader.getClientKeyPair())
+                .setCertificate(loader.getClientCertificate())
+                .setCertificateChain(loader.getClientCertificateChain())
+                .setCertificateValidator(certificateValidator)
+                
.setIdentityProvider(configurableUaClient.getIdentityProvider())
+                .setRequestTimeout(uint(5000))
+                .build());
+  }
+
+  public void run() {
+    try {
+      final OpcUaClient client = createClient();
+
+      try {
+        configurableUaClient.run(client);
+      } catch (final Exception e) {
+        throw new PipeException(
+            "Error running opc client: " + e.getClass().getSimpleName() + ": " 
+ e.getMessage());
+      }
+    } catch (final Exception e) {
+      throw new PipeException(
+          "Error getting opc client: " + e.getClass().getSimpleName() + ": " + 
e.getMessage());
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBKeyStoreLoaderClient.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBKeyStoreLoaderClient.java
new file mode 100644
index 00000000000..bfaf378822c
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBKeyStoreLoaderClient.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.protocol.opcua.client;
+
+import org.eclipse.milo.opcua.sdk.server.util.HostnameUtil;
+import org.eclipse.milo.opcua.stack.core.util.SelfSignedCertificateBuilder;
+import org.eclipse.milo.opcua.stack.core.util.SelfSignedCertificateGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.Key;
+import java.security.KeyPair;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.PublicKey;
+import java.security.cert.X509Certificate;
+import java.util.Arrays;
+import java.util.regex.Pattern;
+
+class IoTDBKeyStoreLoaderClient {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(ClientRunner.class);
+
+  private static final Pattern IP_ADDR_PATTERN =
+      
Pattern.compile("^(([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.){3}([01]?\\d\\d?|2[0-4]\\d|25[0-5])$");
+
+  private static final String CLIENT_ALIAS = "client-ai";
+
+  private X509Certificate[] clientCertificateChain;
+  private X509Certificate clientCertificate;
+  private KeyPair clientKeyPair;
+
+  IoTDBKeyStoreLoaderClient load(final Path baseDir, final char[] password) 
throws Exception {
+    final KeyStore keyStore = KeyStore.getInstance("PKCS12");
+
+    final Path serverKeyStore = baseDir.resolve("iotdb-client.pfx");
+
+    logger.info("Loading KeyStore at {}.", serverKeyStore);
+
+    if (!Files.exists(serverKeyStore)) {
+      keyStore.load(null, password);
+
+      final KeyPair keyPair = 
SelfSignedCertificateGenerator.generateRsaKeyPair(2048);
+
+      final SelfSignedCertificateBuilder builder =
+          new SelfSignedCertificateBuilder(keyPair)
+              .setCommonName("Apache IoTDB OPC UA client")
+              .setOrganization("Apache")
+              .setOrganizationalUnit("dev")
+              .setLocalityName("Beijing")
+              .setStateName("China")
+              .setCountryCode("CN")
+              .setApplicationUri("urn:apache:iotdb:opc-ua-client")
+              .addDnsName("localhost")
+              .addIpAddress("127.0.0.1");
+
+      // Get as many hostnames and IP addresses as we can listed in the 
certificate.
+      for (String hostname : HostnameUtil.getHostnames("0.0.0.0")) {
+        if (IP_ADDR_PATTERN.matcher(hostname).matches()) {
+          builder.addIpAddress(hostname);
+        } else {
+          builder.addDnsName(hostname);
+        }
+      }
+
+      final X509Certificate certificate = builder.build();
+
+      keyStore.setKeyEntry(
+          CLIENT_ALIAS, keyPair.getPrivate(), password, new X509Certificate[] 
{certificate});
+      try (OutputStream out = Files.newOutputStream(serverKeyStore)) {
+        keyStore.store(out, password);
+      }
+    } else {
+      try (InputStream in = Files.newInputStream(serverKeyStore)) {
+        keyStore.load(in, password);
+      }
+    }
+
+    final Key clientPrivateKey = keyStore.getKey(CLIENT_ALIAS, password);
+    if (clientPrivateKey instanceof PrivateKey) {
+      clientCertificate = (X509Certificate) 
keyStore.getCertificate(CLIENT_ALIAS);
+
+      clientCertificateChain =
+          Arrays.stream(keyStore.getCertificateChain(CLIENT_ALIAS))
+              .map(X509Certificate.class::cast)
+              .toArray(X509Certificate[]::new);
+
+      final PublicKey serverPublicKey = clientCertificate.getPublicKey();
+      clientKeyPair = new KeyPair(serverPublicKey, (PrivateKey) 
clientPrivateKey);
+    }
+
+    return this;
+  }
+
+  X509Certificate getClientCertificate() {
+    return clientCertificate;
+  }
+
+  public X509Certificate[] getClientCertificateChain() {
+    return clientCertificateChain;
+  }
+
+  KeyPair getClientKeyPair() {
+    return clientKeyPair;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
new file mode 100644
index 00000000000..bf96d988180
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.protocol.opcua.client;
+
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
+import org.eclipse.milo.opcua.sdk.core.AccessLevel;
+import org.eclipse.milo.opcua.sdk.core.ValueRanks;
+import org.eclipse.milo.opcua.stack.core.Identifiers;
+import org.eclipse.milo.opcua.stack.core.StatusCodes;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
+import org.eclipse.milo.opcua.stack.core.types.builtin.ExpandedNodeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.ExtensionObject;
+import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
+import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
+import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
+import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass;
+import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesItem;
+import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesResponse;
+import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesResult;
+import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
+import org.eclipse.milo.opcua.stack.core.types.structured.ObjectAttributes;
+import org.eclipse.milo.opcua.stack.core.types.structured.VariableAttributes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Predicate;
+
+import static 
org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.convertToOpcDataType;
+import static 
org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.timestampToUtc;
+
+public class IoTDBOpcUaClient {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(OpcUaNameSpace.class);
+
+  // Customized nodes
+  private static final int NAME_SPACE_INDEX = 2;
+
+  // Useless for a server only accept client writing
+  private static final double SAMPLING_INTERVAL_PLACEHOLDER = 500;
+  private final String nodeUrl;
+
+  private final SecurityPolicy securityPolicy;
+  private final IdentityProvider identityProvider;
+  private OpcUaClient client;
+  private final boolean historizing;
+
+  public IoTDBOpcUaClient(
+      final String nodeUrl,
+      final SecurityPolicy securityPolicy,
+      final IdentityProvider identityProvider,
+      final boolean historizing) {
+    this.nodeUrl = nodeUrl;
+    this.securityPolicy = securityPolicy;
+    this.identityProvider = identityProvider;
+    this.historizing = historizing;
+  }
+
+  public void run(final OpcUaClient client) throws Exception {
+    // synchronous connect
+    this.client = client;
+    client.connect().get();
+  }
+
+  // Only support tree model & client-server
+  public void transfer(final Tablet tablet, final OpcUaSink sink) throws 
Exception {
+    OpcUaNameSpace.transferTabletForClientServerModel(
+        tablet, false, sink, this::transferTabletRowForClientServerModel);
+  }
+
+  private void transferTabletRowForClientServerModel(
+      final String[] segments,
+      final List<IMeasurementSchema> measurementSchemas,
+      final List<Long> timestamps,
+      final List<Object> values,
+      final OpcUaSink sink)
+      throws Exception {
+    StatusCode currentQuality = sink.getDefaultQuality();
+    Object value = null;
+    long timestamp = 0;
+    NodeId nodeId = null;
+    NodeId opcDataType = null;
+
+    for (int i = 0; i < measurementSchemas.size(); ++i) {
+      if (Objects.isNull(values.get(i))) {
+        continue;
+      }
+      final String name = measurementSchemas.get(i).getMeasurementName();
+      final TSDataType type = measurementSchemas.get(i).getType();
+      if (Objects.nonNull(sink.getQualityName()) && 
sink.getQualityName().equals(name)) {
+        if (!type.equals(TSDataType.BOOLEAN)) {
+          throw new UnsupportedOperationException(
+              "The quality value only supports boolean type, while true == 
GOOD and false == BAD.");
+        }
+        currentQuality = values.get(i) == Boolean.TRUE ? StatusCode.GOOD : 
StatusCode.BAD;
+        continue;
+      }
+      if (Objects.nonNull(sink.getValueName()) && 
!sink.getValueName().equals(name)) {
+        PipeLogger.log(
+            LOGGER::warn,
+            "When the 'with-quality' mode is enabled, the measurement must be 
either \"value-name\" or \"quality-name\"");
+        continue;
+      }
+      nodeId = new NodeId(NAME_SPACE_INDEX, String.join("/", segments));
+
+      final long utcTimestamp = 
timestampToUtc(timestamps.get(timestamps.size() > 1 ? i : 0));
+      value = values.get(i);
+      timestamp = utcTimestamp;
+      opcDataType = convertToOpcDataType(type);
+    }
+    if (Objects.isNull(value)) {
+      return;
+    }
+
+    final Variant variant = new Variant(value);
+    final DataValue dataValue =
+        new DataValue(variant, currentQuality, new DateTime(timestamp), new 
DateTime());
+    StatusCode writeStatus = client.writeValue(nodeId, dataValue).get();
+
+    if (writeStatus.getValue() == StatusCodes.Bad_NodeIdUnknown) {
+      final AddNodesResponse addStatus =
+          client.addNodes(getNodesToAdd(segments, opcDataType, variant)).get();
+      for (final AddNodesResult result : addStatus.getResults()) {
+        if (!result.getStatusCode().equals(StatusCode.GOOD)
+            && !(result.getStatusCode().getValue() == 
StatusCodes.Bad_NodeIdExists)) {
+          throw new PipeException(
+              "Failed to create nodes after transfer data value, creation 
status: "
+                  + addStatus
+                  + getErrorString(segments, opcDataType, value, writeStatus));
+        }
+      }
+      writeStatus = client.writeValue(nodeId, dataValue).get();
+      if (writeStatus.getValue() != StatusCode.GOOD.getValue()) {
+        throw new PipeException(
+            "Failed to transfer dataValue after successfully created nodes"
+                + getErrorString(segments, opcDataType, value, writeStatus));
+      }
+    } else if (writeStatus.getValue() != StatusCode.GOOD.getValue()) {
+      throw new PipeException(
+          "Failed to transfer dataValue"
+              + getErrorString(segments, opcDataType, value, writeStatus));
+    }
+  }
+
+  private static String getErrorString(
+      final String[] segments,
+      final NodeId dataType,
+      final Object value,
+      final StatusCode writeStatus) {
+    return ", segments: "
+        + Arrays.toString(segments)
+        + ", dataType: "
+        + dataType
+        + ", value: "
+        + value
+        + ", error: "
+        + writeStatus;
+  }
+
+  public List<AddNodesItem> getNodesToAdd(
+      final String[] segments, final NodeId opcDataType, final Variant 
initialValue) {
+    final List<AddNodesItem> addNodesItems = new ArrayList<>();
+    final StringBuilder sb = new StringBuilder(segments[0]);
+    ExpandedNodeId curNodeId = new NodeId(NAME_SPACE_INDEX, 
segments[0]).expanded();
+    addNodesItems.add(
+        new AddNodesItem(
+            Identifiers.ObjectsFolder.expanded(),
+            Identifiers.Organizes,
+            curNodeId,
+            new QualifiedName(NAME_SPACE_INDEX, segments[0]),
+            NodeClass.Object,
+            ExtensionObject.encode(
+                client.getStaticSerializationContext(), 
createFolderAttributes(segments[0])),
+            Identifiers.FolderType.expanded()));
+
+    // segments.length >= 3
+    for (int i = 1; i < segments.length - 1; ++i) {
+      sb.append("/").append(segments[i]);
+      final ExpandedNodeId nextId = new NodeId(NAME_SPACE_INDEX, 
sb.toString()).expanded();
+      addNodesItems.add(
+          new AddNodesItem(
+              curNodeId,
+              Identifiers.Organizes,
+              nextId,
+              new QualifiedName(NAME_SPACE_INDEX, segments[i]),
+              NodeClass.Object,
+              ExtensionObject.encode(
+                  client.getStaticSerializationContext(), 
createFolderAttributes(segments[i])),
+              Identifiers.FolderType.expanded()));
+      curNodeId = nextId;
+    }
+
+    final String measurementName = segments[segments.length - 1];
+    sb.append("/").append(measurementName);
+    addNodesItems.add(
+        new AddNodesItem(
+            curNodeId,
+            Identifiers.Organizes,
+            new NodeId(NAME_SPACE_INDEX, sb.toString()).expanded(),
+            new QualifiedName(NAME_SPACE_INDEX, measurementName),
+            NodeClass.Variable,
+            ExtensionObject.encode(
+                client.getStaticSerializationContext(),
+                createMeasurementAttributes(measurementName, opcDataType, 
initialValue)),
+            Identifiers.BaseDataVariableType.expanded()));
+
+    return addNodesItems;
+  }
+
+  public void disconnect() throws Exception {
+    client.disconnect().get();
+  }
+
+  /////////////////////////////// Getter ///////////////////////////////
+
+  String getNodeUrl() {
+    return nodeUrl;
+  }
+
+  Predicate<EndpointDescription> endpointFilter() {
+    return e -> getSecurityPolicy().getUri().equals(e.getSecurityPolicyUri());
+  }
+
+  SecurityPolicy getSecurityPolicy() {
+    return securityPolicy;
+  }
+
+  IdentityProvider getIdentityProvider() {
+    return identityProvider;
+  }
+
+  @TestOnly
+  public OpcUaClient getClient() {
+    return client;
+  }
+
+  /////////////////////////////// Attribute creator 
///////////////////////////////
+
+  private VariableAttributes createMeasurementAttributes(
+      final String name, final NodeId objectType, final Variant initialValue) {
+    return new VariableAttributes(
+        Unsigned.uint(0xFFFF), // specifiedAttributes
+        LocalizedText.english(name),
+        LocalizedText.english(name),
+        Unsigned.uint(0), // writeMask
+        Unsigned.uint(0), // userWriteMask
+        initialValue,
+        objectType,
+        ValueRanks.Scalar,
+        null, // arrayDimensions
+        AccessLevel.toValue(AccessLevel.READ_WRITE),
+        AccessLevel.toValue(AccessLevel.READ_WRITE),
+        SAMPLING_INTERVAL_PLACEHOLDER,
+        historizing);
+  }
+
+  private static ObjectAttributes createFolderAttributes(final String name) {
+    return new ObjectAttributes(
+        Unsigned.uint(0xFFFF), // specifiedAttributes
+        LocalizedText.english(name),
+        LocalizedText.english(name),
+        Unsigned.uint(0), // writeMask
+        Unsigned.uint(0), // userWriteMask
+        null // notifier
+        );
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaKeyStoreLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java
similarity index 98%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaKeyStoreLoader.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java
index b17f27532d7..56b231fb460 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaKeyStoreLoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.sink.protocol.opcua;
+package org.apache.iotdb.db.pipe.sink.protocol.opcua.server;
 
 import org.apache.iotdb.commons.utils.FileUtils;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
similarity index 87%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
index 6850fba8f20..56a90680185 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
@@ -17,10 +17,12 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.sink.protocol.opcua;
+package org.apache.iotdb.db.pipe.sink.protocol.opcua.server;
 
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink;
 import 
org.apache.iotdb.db.pipe.sink.util.sorter.PipeTableModelTabletEventSorter;
 import 
org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter;
 import org.apache.iotdb.db.utils.DateTimeUtils;
@@ -48,12 +50,15 @@ import 
org.eclipse.milo.opcua.sdk.server.nodes.UaVariableNode;
 import org.eclipse.milo.opcua.sdk.server.util.SubscriptionModel;
 import org.eclipse.milo.opcua.stack.core.Identifiers;
 import org.eclipse.milo.opcua.stack.core.UaException;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
 import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
 import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
 import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
 import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
 import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
 import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.nio.file.Paths;
 import java.sql.Date;
@@ -64,15 +69,17 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
 public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(OpcUaNameSpace.class);
   public static final String NAMESPACE_URI = "urn:apache:iotdb:opc-server";
   private final SubscriptionModel subscriptionModel;
   private final OpcUaServerBuilder builder;
 
-  OpcUaNameSpace(final OpcUaServer server, final OpcUaServerBuilder builder) {
+  public OpcUaNameSpace(final OpcUaServer server, final OpcUaServerBuilder 
builder) {
     super(server, NAMESPACE_URI);
     this.builder = builder;
 
@@ -94,17 +101,22 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
             });
   }
 
-  void transfer(final Tablet tablet, final boolean isTableModel, final 
OpcUaSink sink)
-      throws UaException {
-    if (sink.isClientServerModel) {
-      transferTabletForClientServerModel(tablet, isTableModel, sink);
+  public void transfer(final Tablet tablet, final boolean isTableModel, final 
OpcUaSink sink)
+      throws Exception {
+    if (sink.isClientServerModel()) {
+      transferTabletForClientServerModel(
+          tablet, isTableModel, sink, 
this::transferTabletRowForClientServerModel);
     } else {
       transferTabletForPubSubModel(tablet, isTableModel, sink);
     }
   }
 
-  private void transferTabletForClientServerModel(
-      final Tablet tablet, final boolean isTableModel, final OpcUaSink sink) {
+  public static void transferTabletForClientServerModel(
+      final Tablet tablet,
+      final boolean isTableModel,
+      final OpcUaSink sink,
+      final TabletRowConsumer consumer)
+      throws Exception {
     final List<IMeasurementSchema> schemas = tablet.getSchemas();
     final List<IMeasurementSchema> newSchemas = new ArrayList<>();
     if (!isTableModel) {
@@ -125,8 +137,7 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
         }
       }
 
-      transferTabletRowForClientServerModel(
-          tablet.getDeviceId().split("\\."), newSchemas, timestamps, values, 
sink);
+      consumer.accept(tablet.getDeviceId().split("\\."), newSchemas, 
timestamps, values, sink);
     } else {
       new PipeTableModelTabletEventSorter(tablet).sortByTimestampIfNecessary();
 
@@ -141,15 +152,15 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
       for (int i = 0; i < tablet.getRowSize(); ++i) {
         final Object[] segments = tablet.getDeviceID(i).getSegments();
         final String[] folderSegments = new String[segments.length + 1];
-        folderSegments[0] = sink.unQualifiedDatabaseName;
+        folderSegments[0] = sink.getDatabaseName();
 
         for (int j = 0; j < segments.length; ++j) {
           folderSegments[j + 1] =
-              Objects.isNull(segments[j]) ? sink.placeHolder : (String) 
segments[j];
+              Objects.isNull(segments[j]) ? sink.getPlaceHolder4NullTag() : 
(String) segments[j];
         }
 
         final int finalI = i;
-        transferTabletRowForClientServerModel(
+        consumer.accept(
             folderSegments,
             newSchemas,
             Collections.singletonList(tablet.getTimestamp(i)),
@@ -166,6 +177,17 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
     }
   }
 
+  @FunctionalInterface
+  public interface TabletRowConsumer {
+    void accept(
+        final String[] segments,
+        final List<IMeasurementSchema> measurementSchemas,
+        final List<Long> timestamps,
+        final List<Object> values,
+        final OpcUaSink sink)
+        throws Exception;
+  }
+
   private void transferTabletRowForClientServerModel(
       final String[] segments,
       final List<IMeasurementSchema> measurementSchemas,
@@ -179,7 +201,7 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
     UaNode folderNode = null;
     NodeId folderNodeId;
     for (int i = 0;
-        i < (Objects.isNull(sink.valueName) ? segments.length : 
segments.length - 1);
+        i < (Objects.isNull(sink.getValueName()) ? segments.length : 
segments.length - 1);
         ++i) {
       final String segment = segments[i];
       final UaNode nextFolderNode;
@@ -227,8 +249,7 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
 
     final String currentFolder = currentStr.toString();
 
-    StatusCode currentQuality =
-        Objects.isNull(sink.valueName) ? StatusCode.GOOD : 
StatusCode.UNCERTAIN;
+    StatusCode currentQuality = sink.getDefaultQuality();
     UaVariableNode valueNode = null;
     Object value = null;
     long timestamp = 0;
@@ -239,7 +260,7 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
       }
       final String name = measurementSchemas.get(i).getMeasurementName();
       final TSDataType type = measurementSchemas.get(i).getType();
-      if (Objects.nonNull(sink.qualityName) && sink.qualityName.equals(name)) {
+      if (Objects.nonNull(sink.getQualityName()) && 
sink.getQualityName().equals(name)) {
         if (!type.equals(TSDataType.BOOLEAN)) {
           throw new UnsupportedOperationException(
               "The quality value only supports boolean type, while true == 
GOOD and false == BAD.");
@@ -247,11 +268,14 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
         currentQuality = values.get(i) == Boolean.TRUE ? StatusCode.GOOD : 
StatusCode.BAD;
         continue;
       }
-      if (Objects.nonNull(sink.valueName) && !sink.valueName.equals(name)) {
-        throw new UnsupportedOperationException(
+      if (Objects.nonNull(sink.getValueName()) && 
!sink.getValueName().equals(name)) {
+        PipeLogger.log(
+            LOGGER::warn,
             "When the 'with-quality' mode is enabled, the measurement must be 
either \"value-name\" or \"quality-name\"");
+        continue;
       }
-      final String nodeName = Objects.isNull(sink.valueName) ? name : 
segments[segments.length - 1];
+      final String nodeName =
+          Objects.isNull(sink.getValueName()) ? name : 
segments[segments.length - 1];
       final NodeId nodeId = newNodeId(currentFolder + nodeName);
       final UaVariableNode measurementNode;
       if (!getNodeManager().containsNode(nodeId)) {
@@ -288,7 +312,7 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
       }
 
       final long utcTimestamp = 
timestampToUtc(timestamps.get(timestamps.size() > 1 ? i : 0));
-      if (Objects.isNull(sink.valueName)) {
+      if (Objects.isNull(sink.getValueName())) {
         if (Objects.isNull(measurementNode.getValue())
             || 
Objects.requireNonNull(measurementNode.getValue().getSourceTime()).getUtcTime()
                 < utcTimestamp) {
@@ -342,7 +366,7 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
     }
   }
 
-  private static long timestampToUtc(final long timeStamp) {
+  public static long timestampToUtc(final long timeStamp) {
     return TimestampPrecisionUtils.currPrecision.toNanos(timeStamp) / 100L + 
116444736000000000L;
   }
 
@@ -365,11 +389,11 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
     if (isTableModel) {
       sourceNameList = new ArrayList<>(tablet.getRowSize());
       for (int i = 0; i < tablet.getRowSize(); ++i) {
-        final StringBuilder idBuilder = new 
StringBuilder(sink.unQualifiedDatabaseName);
+        final StringBuilder idBuilder = new 
StringBuilder(sink.getDatabaseName());
         for (final Object segment : tablet.getDeviceID(i).getSegments()) {
           idBuilder
               .append(TsFileConstant.PATH_SEPARATOR)
-              .append(Objects.isNull(segment) ? sink.placeHolder : segment);
+              .append(Objects.isNull(segment) ? sink.getPlaceHolder4NullTag() 
: segment);
         }
         sourceNameList.add(idBuilder.toString());
       }
@@ -473,7 +497,7 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
     eventNode.delete();
   }
 
-  private NodeId convertToOpcDataType(final TSDataType type) {
+  public static NodeId convertToOpcDataType(final TSDataType type) {
     switch (type) {
       case BOOLEAN:
         return Identifiers.Boolean;
@@ -493,6 +517,7 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
       case STRING:
         return Identifiers.String;
       case VECTOR:
+      case OBJECT:
       case UNKNOWN:
       default:
         throw new PipeRuntimeNonCriticalException("Unsupported data type: " + 
type);
@@ -521,11 +546,13 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
 
   /////////////////////////////// Conflict detection 
///////////////////////////////
 
-  void checkEquals(
+  public void checkEquals(
       final String user,
       final String password,
       final String securityDir,
-      final boolean enableAnonymousAccess) {
-    builder.checkEquals(user, password, Paths.get(securityDir), 
enableAnonymousAccess);
+      final boolean enableAnonymousAccess,
+      final Set<SecurityPolicy> securityPolicies) {
+    builder.checkEquals(
+        user, password, Paths.get(securityDir), enableAnonymousAccess, 
securityPolicies);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaServerBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
similarity index 82%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaServerBuilder.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
index bc2df4839e2..61818ecf852 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaServerBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
@@ -17,9 +17,8 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.sink.protocol.opcua;
+package org.apache.iotdb.db.pipe.sink.protocol.opcua.server;
 
-import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import org.eclipse.milo.opcua.sdk.server.OpcUaServer;
@@ -58,6 +57,7 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.security.KeyPair;
 import java.security.cert.X509Certificate;
+import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Objects;
@@ -84,48 +84,45 @@ public class OpcUaServerBuilder implements Closeable {
   private String password;
   private Path securityDir;
   private boolean enableAnonymousAccess;
+  private Set<SecurityPolicy> securityPolicies;
   private DefaultTrustListManager trustListManager;
 
-  OpcUaServerBuilder() {
-    tcpBindPort = 
PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE;
-    httpsBindPort = 
PipeSinkConstant.CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE;
-    user = PipeSinkConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
-    password = PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
-    securityDir = 
Paths.get(PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE);
-    enableAnonymousAccess = 
PipeSinkConstant.CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE;
-  }
-
-  OpcUaServerBuilder setTcpBindPort(final int tcpBindPort) {
+  public OpcUaServerBuilder setTcpBindPort(final int tcpBindPort) {
     this.tcpBindPort = tcpBindPort;
     return this;
   }
 
-  OpcUaServerBuilder setHttpsBindPort(final int httpsBindPort) {
+  public OpcUaServerBuilder setHttpsBindPort(final int httpsBindPort) {
     this.httpsBindPort = httpsBindPort;
     return this;
   }
 
-  OpcUaServerBuilder setUser(final String user) {
+  public OpcUaServerBuilder setUser(final String user) {
     this.user = user;
     return this;
   }
 
-  OpcUaServerBuilder setPassword(final String password) {
+  public OpcUaServerBuilder setPassword(final String password) {
     this.password = password;
     return this;
   }
 
-  OpcUaServerBuilder setSecurityDir(final String securityDir) {
+  public OpcUaServerBuilder setSecurityDir(final String securityDir) {
     this.securityDir = Paths.get(securityDir);
     return this;
   }
 
-  OpcUaServerBuilder setEnableAnonymousAccess(final boolean 
enableAnonymousAccess) {
+  public OpcUaServerBuilder setEnableAnonymousAccess(final boolean 
enableAnonymousAccess) {
     this.enableAnonymousAccess = enableAnonymousAccess;
     return this;
   }
 
-  OpcUaServer build() throws Exception {
+  public OpcUaServerBuilder setSecurityPolicies(final Set<SecurityPolicy> 
securityPolicies) {
+    this.securityPolicies = securityPolicies;
+    return this;
+  }
+
+  public OpcUaServer build() throws Exception {
     Files.createDirectories(securityDir);
     if (!Files.exists(securityDir)) {
       throw new PipeException("Unable to create security dir: " + securityDir);
@@ -247,30 +244,36 @@ public class OpcUaServerBuilder implements Closeable {
                     USER_TOKEN_POLICY_USERNAME,
                     USER_TOKEN_POLICY_X509);
 
-        final EndpointConfiguration.Builder noSecurityBuilder =
-            builder
-                .copy()
-                .setSecurityPolicy(SecurityPolicy.None)
-                .setSecurityMode(MessageSecurityMode.None);
-
-        endpointConfigurations.add(buildTcpEndpoint(noSecurityBuilder, 
tcpBindPort));
-        endpointConfigurations.add(buildHttpsEndpoint(noSecurityBuilder, 
httpsBindPort));
-
-        endpointConfigurations.add(
-            buildTcpEndpoint(
-                builder
-                    .copy()
-                    .setSecurityPolicy(SecurityPolicy.Basic256Sha256)
-                    .setSecurityMode(MessageSecurityMode.SignAndEncrypt),
-                tcpBindPort));
-
-        endpointConfigurations.add(
-            buildHttpsEndpoint(
-                builder
-                    .copy()
-                    .setSecurityPolicy(SecurityPolicy.Basic256Sha256)
-                    .setSecurityMode(MessageSecurityMode.Sign),
-                httpsBindPort));
+        final Set<SecurityPolicy> securityPolicySet = new 
HashSet<>(securityPolicies);
+        if (securityPolicySet.contains(SecurityPolicy.None)) {
+          final EndpointConfiguration.Builder noSecurityBuilder =
+              builder
+                  .copy()
+                  .setSecurityPolicy(SecurityPolicy.None)
+                  .setSecurityMode(MessageSecurityMode.None);
+
+          endpointConfigurations.add(buildTcpEndpoint(noSecurityBuilder, 
tcpBindPort));
+          endpointConfigurations.add(buildHttpsEndpoint(noSecurityBuilder, 
httpsBindPort));
+          securityPolicySet.remove(SecurityPolicy.None);
+        }
+
+        for (final SecurityPolicy securityPolicy : securityPolicySet) {
+          endpointConfigurations.add(
+              buildTcpEndpoint(
+                  builder
+                      .copy()
+                      .setSecurityPolicy(securityPolicy)
+                      .setSecurityMode(MessageSecurityMode.SignAndEncrypt),
+                  tcpBindPort));
+
+          endpointConfigurations.add(
+              buildHttpsEndpoint(
+                  builder
+                      .copy()
+                      .setSecurityPolicy(securityPolicy)
+                      .setSecurityMode(MessageSecurityMode.Sign),
+                  httpsBindPort));
+        }
 
         final EndpointConfiguration.Builder discoveryBuilder =
             builder
@@ -309,7 +312,8 @@ public class OpcUaServerBuilder implements Closeable {
       final String user,
       final String password,
       final Path securityDir,
-      final boolean enableAnonymousAccess) {
+      final boolean enableAnonymousAccess,
+      final Set<SecurityPolicy> securityPolicies) {
     checkEquals("user", this.user, user);
     checkEquals("password", this.password, password);
     checkEquals(
@@ -317,10 +321,15 @@ public class OpcUaServerBuilder implements Closeable {
         
FileSystems.getDefault().getPath(this.securityDir.toAbsolutePath().toString()),
         
FileSystems.getDefault().getPath(securityDir.toAbsolutePath().toString()));
     checkEquals("enableAnonymousAccess option", this.enableAnonymousAccess, 
enableAnonymousAccess);
+    checkEquals("securityPolicies", this.securityPolicies, securityPolicies);
   }
 
-  private void checkEquals(final String attrName, final Object thisAttr, final 
Object thatAttr) {
+  private void checkEquals(final String attrName, Object thisAttr, Object 
thatAttr) {
     if (!Objects.equals(thisAttr, thatAttr)) {
+      if (attrName.equals("password")) {
+        thisAttr = "****";
+        thatAttr = "****";
+      }
       throw new PipeException(
           String.format(
               "The existing server with tcp port %s and https port %s's %s %s 
conflicts to the new %s %s, reject reusing.",
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java
index 26ff72cde24..ec2122b9175 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java
@@ -172,7 +172,8 @@ public class PipeSinkTest {
               false, "root.db", "db", "root.db", tablet, false, "pipe", 0L, 
null, null, false);
       event.increaseReferenceCount("");
       normalOPC.transfer(event);
-      Assert.assertThrows(UnsupportedOperationException.class, () -> 
qualityOPC.transfer(event));
+      // Shall not throw
+      qualityOPC.transfer(event);
       event.decreaseReferenceCount("", false);
 
       qualityOPC.transfer(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
index ecdc01237e9..b27bb59224d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
@@ -28,6 +28,7 @@ import java.io.File;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 import static org.apache.iotdb.commons.conf.IoTDBConstant.MB;
@@ -180,7 +181,7 @@ public class PipeSinkConstant {
 
   public static final String CONNECTOR_OPC_UA_PLACEHOLDER_KEY = 
"connector.opcua.placeholder";
   public static final String SINK_OPC_UA_PLACEHOLDER_KEY = 
"sink.opcua.placeholder";
-  public static final String CONNECTOR_OPC_UA_PLACEHOLDER_DEFAULT_VALUE = 
"null";
+  public static final String 
CONNECTOR_OPC_UA_PLACEHOLDER_4_NULL_TAG_DEFAULT_VALUE = "null";
 
   public static final String CONNECTOR_OPC_UA_WITH_QUALITY_KEY = 
"connector.opcua.with-quality";
   public static final String SINK_OPC_UA_WITH_QUALITY_KEY = 
"sink.opcua.with-quality";
@@ -194,6 +195,40 @@ public class PipeSinkConstant {
   public static final String SINK_OPC_UA_QUALITY_NAME_KEY = 
"sink.opcua.quality-name";
   public static final String CONNECTOR_OPC_UA_QUALITY_NAME_DEFAULT_VALUE = 
"quality";
 
+  public static final String CONNECTOR_OPC_UA_DEFAULT_QUALITY_KEY =
+      "connector.opcua.default-quality";
+  public static final String SINK_OPC_UA_DEFAULT_QUALITY_KEY = 
"sink.opcua.default-quality";
+  public static final String CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE = 
"GOOD";
+  public static final String CONNECTOR_OPC_UA_DEFAULT_QUALITY_BAD_VALUE = 
"BAD";
+  public static final String CONNECTOR_OPC_UA_DEFAULT_QUALITY_UNCERTAIN_VALUE 
= "UNCERTAIN";
+
+  public static final String CONNECTOR_OPC_UA_NODE_URL_KEY = 
"connector.opcua.node-url";
+  public static final String SINK_OPC_UA_NODE_URL_KEY = "sink.opcua.node-url";
+
+  public static final String CONNECTOR_OPC_UA_SECURITY_POLICY_KEY =
+      "connector.opcua.security-policy";
+  public static final String SINK_OPC_UA_SECURITY_POLICY_KEY = 
"sink.opcua.security-policy";
+  public static final String CONNECTOR_OPC_UA_SECURITY_POLICY_NONE_VALUE = 
"NONE";
+  public static final String 
CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_128_RSA_15_VALUE =
+      "BASIC128RSA15";
+  public static final String CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_VALUE 
= "BASIC256";
+  public static final String 
CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_SHA_256_VALUE =
+      "BASIC256SHA256";
+  public static final String 
CONNECTOR_OPC_UA_SECURITY_POLICY_AES128_SHA256_RSAOAEP_VALUE =
+      "AES128_SHA256_RSAOAEP";
+  public static final String 
CONNECTOR_OPC_UA_SECURITY_POLICY_AES256_SHA256_RSAPSS_VALUE =
+      "AES256_SHA256_RSAPSS";
+
+  public static final List<String> 
CONNECTOR_OPC_UA_SECURITY_POLICY_SERVER_DEFAULT_VALUES =
+      Arrays.asList(
+          CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_SHA_256_VALUE,
+          CONNECTOR_OPC_UA_SECURITY_POLICY_AES128_SHA256_RSAOAEP_VALUE,
+          CONNECTOR_OPC_UA_SECURITY_POLICY_AES256_SHA256_RSAPSS_VALUE);
+
+  public static final String CONNECTOR_OPC_UA_HISTORIZING_KEY = 
"connector.opcua.historizing";
+  public static final String SINK_OPC_UA_HISTORIZING_KEY = 
"sink.opcua.historizing";
+  public static final boolean CONNECTOR_OPC_UA_HISTORIZING_DEFAULT_VALUE = 
false;
+
   public static final String CONNECTOR_LEADER_CACHE_ENABLE_KEY = 
"connector.leader-cache.enable";
   public static final String SINK_LEADER_CACHE_ENABLE_KEY = 
"sink.leader-cache.enable";
   public static final boolean CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE = 
true;
diff --git a/pom.xml b/pom.xml
index b39fbb78af8..07651de14c1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -409,6 +409,16 @@
                 <artifactId>stack-server</artifactId>
                 <version>${milo.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.eclipse.milo</groupId>
+                <artifactId>stack-client</artifactId>
+                <version>${milo.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.eclipse.milo</groupId>
+                <artifactId>sdk-client</artifactId>
+                <version>${milo.version}</version>
+            </dependency>
             <!-- TODO: Deprecated: Use Airline 2 or Picocli instead -->
             <dependency>
                 <groupId>io.airlift</groupId>


Reply via email to