This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 58f94e2fbb8 Pipe: Implemented OPC Sink for outer server & Set 
configuration and changed the default value of the server security policies & 
Made the default quality configurable and does not throw when non-value/quality 
measurement is encountered (#16944) (#17367)
58f94e2fbb8 is described below

commit 58f94e2fbb8609b11e8f2133c5d2e3aab6ea0693
Author: Caideyipi <[email protected]>
AuthorDate: Fri Mar 27 10:43:04 2026 +0800

    Pipe: Implemented OPC Sink for outer server & Set configuration and changed 
the default value of the server security policies & Made the default quality 
configurable and does not throw when non-value/quality measurement is 
encountered (#16944) (#17367)
    
    fix
    
    Pipe: Fixed the OPC UA client connection problem (#17083)
    
    * fix
    
    * IT
    
    (cherry picked from commit 82f7ca6dfc54435c8ead7327669c888b1edbeeba)
    
    spt
    
    Optimized the logger when table does not exist in DN heartbeat && Pipe: 
Fixed the OPC UA Sink key getter logic and potentail NPE when closing client && 
Load: Fixed the missing schema writing for "root" table (#17063)
    
    * root-fix
    
    * f
    
    * fix
    
    * rest
    
    * spls
    
    * gsa
    
    * fix
    
    (cherry picked from commit 5101489d4126b40120d72d49ee5a58edd23aec22)
    
    fix
    
    Pipe: Implemented OPC Sink for outer server & Set configuration and changed 
the default value of the server security policies & Made the default quality 
configurable and does not throw when non-value/quality measurement is 
encountered (#16944)
    
    * pj
    
    * cj
    
    * bone
    
    * fix
    
    * fix
    
    * framework
    
    * fix
    
    * trilog
    
    * framework
    
    * fix
    
    * fix
    
    * yl
    
    * stack-client
    
    * fix
    
    * might
    
    * sleep-removal
    
    * cleaning
    
    * fix
    
    * sec-dir
    
    * cleaning
    
    * remove-poison
    
    * f
    
    * fix
    
    * clean-sit
    
    * sit-comp
    
    * object
    
    * many-clean
    
    * sit-sit
    
    * fix
    
    * fix
    
    * fix
    
    * ref
    
    * sit
    
    * partial
    
    * security-policies
    
    * check-equals
    
    * check-err
    
    * fix
    
    * compile-fix
    
    * adjust
    
    * ut
    
    * refactor
    
    * fix_and_IT
    
    * fix
    
    * placeholder
    
    * rollback
    
    * eliminate-fault
    
    * pw
    
    * fix
    
    * f
    
    * fix
    
    (cherry picked from commit cb18a95fc014c116d3e0041a753d5493432189bb)
---
 integration-test/pom.xml                           |   1 -
 .../iotdb/pipe/it/single/AbstractPipeSingleIT.java |   2 +-
 .../iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java     | 261 ++++++++++++++--
 .../api/customizer/parameter/PipeParameters.java   |  51 +++-
 iotdb-core/datanode/pom.xml                        |  12 +
 .../db/pipe/sink/protocol/opcua/OpcUaSink.java     | 308 ++++++++++++++++++-
 .../sink/protocol/opcua/client/ClientRunner.java   | 159 ++++++++++
 .../opcua/client/IoTDBKeyStoreLoaderClient.java    | 127 ++++++++
 .../protocol/opcua/client/IoTDBOpcUaClient.java    | 335 +++++++++++++++++++++
 .../opcua/{ => server}/OpcUaKeyStoreLoader.java    |   2 +-
 .../opcua/{ => server}/OpcUaNameSpace.java         | 173 +++++++----
 .../opcua/{ => server}/OpcUaServerBuilder.java     |  97 +++---
 .../apache/iotdb/db/pipe/sink/PipeSinkTest.java    |  89 ++++++
 .../pipe/config/constant/PipeSinkConstant.java     |  52 ++++
 pom.xml                                            |  10 +
 15 files changed, 1530 insertions(+), 149 deletions(-)

diff --git a/integration-test/pom.xml b/integration-test/pom.xml
index 8c2f7416929..43b60fc371b 100644
--- a/integration-test/pom.xml
+++ b/integration-test/pom.xml
@@ -185,7 +185,6 @@
         <dependency>
             <groupId>org.bouncycastle</groupId>
             <artifactId>bcprov-jdk18on</artifactId>
-            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>junit</groupId>
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
index ea41dc9b579..dded7cbce30 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
@@ -31,7 +31,7 @@ abstract class AbstractPipeSingleIT {
 
   @Before
   public void setUp() {
-    MultiEnvFactory.createEnv(2);
+    MultiEnvFactory.createEnv(1);
     env = MultiEnvFactory.getEnv(0);
     env.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
     // 10 min, assert that the operations will not time out
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
index 645fbdbaa68..599a43476c1 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
@@ -20,67 +20,280 @@
 package org.apache.iotdb.pipe.it.single;
 
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.ClientRunner;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.IoTDBOpcUaClient;
+import org.apache.iotdb.it.env.cluster.EnvUtils;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT1;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
+import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
+import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
+import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
+import java.io.File;
+import java.net.ConnectException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE;
+import static 
org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.timestampToUtc;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT1.class})
 public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
   @Test
-  public void testOPCUASink() throws Exception {
+  public void testOPCUAServerSink() throws Exception {
+    int tcpPort = -1;
+    int httpsPort = -1;
     try (final SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) env.getLeaderConfigNodeConnection()) {
 
-      TestUtils.executeNonQuery(env, "insert into root.db.d1(time,s1) values 
(1,1)", null);
+      TestUtils.executeNonQuery(env, "insert into root.db.d1(time, s1) values 
(1, 1)", null);
 
-      final Map<String, String> connectorAttributes = new HashMap<>();
-      connectorAttributes.put("sink", "opc-ua-sink");
-      connectorAttributes.put("opcua.model", "client-server");
+      final Map<String, String> sinkAttributes = new HashMap<>();
 
-      Assert.assertEquals(
-          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
-          client
-              .createPipe(
-                  new TCreatePipeReq("testPipe", connectorAttributes)
-                      .setExtractorAttributes(Collections.emptyMap())
-                      .setProcessorAttributes(Collections.emptyMap()))
-              .getCode());
+      sinkAttributes.put("sink", "opc-ua-sink");
+      sinkAttributes.put("model", "client-server");
+      sinkAttributes.put("opcua.security-policy", "None");
+
+      OpcUaClient opcUaClient;
+      DataValue value;
+      while (true) {
+        final int[] ports = EnvUtils.searchAvailablePorts();
+        tcpPort = ports[0];
+        httpsPort = ports[1];
+        sinkAttributes.put("opcua.tcp.port", Integer.toString(tcpPort));
+        sinkAttributes.put("https.port", Integer.toString(httpsPort));
+
+        Assert.assertEquals(
+            TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+            client
+                .createPipe(
+                    new TCreatePipeReq("testPipe", sinkAttributes)
+                        
.setExtractorAttributes(Collections.singletonMap("user", "root"))
+                        .setProcessorAttributes(Collections.emptyMap()))
+                .getCode());
+
+        try {
+          opcUaClient =
+              getOpcUaClient(
+                  "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", 
SecurityPolicy.None, "root", "root");
+        } catch (final PipeException e) {
+          if (e.getCause() instanceof ConnectException) {
+            continue;
+          } else {
+            throw e;
+          }
+        }
+        value =
+            opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, 
"root/db/d1/s1")).get();
+        Assert.assertEquals(new Variant(1.0), value.getValue());
+        Assert.assertEquals(new DateTime(timestampToUtc(1)), 
value.getSourceTime());
+        opcUaClient.disconnect().get();
+        break;
+      }
+
+      // Create the region first to avoid tsFile parsing
+      TestUtils.executeNonQueries(
+          env,
+          Arrays.asList(
+              "create aligned timeSeries root.db.opc(value double, quality 
boolean, other int32)",
+              "create aligned timeSeries root.db.opc1(value double, quality 
boolean, other int32)",
+              "create aligned timeSeries root.db.opc2(value double, quality 
boolean, other int32)",
+              "insert into root.db.opc(time, value, quality, other) values (0, 
0, true, 1)",
+              "insert into root.db.opc1(time, value, quality, other) values 
(0, 0, true, 1)",
+              "insert into root.db.opc2(time, value, quality, other) values 
(0, 0, true, 1)"),
+          null);
+
+      while (true) {
+        final int[] ports = EnvUtils.searchAvailablePorts();
+        tcpPort = ports[0];
+        httpsPort = ports[1];
+        sinkAttributes.put("opcua.tcp.port", Integer.toString(tcpPort));
+        sinkAttributes.put("https.port", Integer.toString(httpsPort));
+        sinkAttributes.put("with-quality", "true");
+
+        Assert.assertEquals(
+            TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+            client
+                .alterPipe(
+                    new TAlterPipeReq()
+                        .setPipeName("testPipe")
+                        .setIsReplaceAllConnectorAttributes(true)
+                        .setConnectorAttributes(sinkAttributes)
+                        .setProcessorAttributes(Collections.emptyMap())
+                        .setExtractorAttributes(Collections.emptyMap()))
+                .getCode());
+        try {
+          opcUaClient =
+              getOpcUaClient(
+                  "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", 
SecurityPolicy.None, "root", "root");
+        } catch (final PipeException e) {
+          if (e.getCause() instanceof ConnectException) {
+            continue;
+          } else {
+            throw e;
+          }
+        }
+        break;
+      }
+
+      // Test multiple regions
+      TestUtils.executeNonQueries(
+          env,
+          Arrays.asList(
+              "insert into root.db.opc(time, value, quality, other) values (1, 
1, false, 1)",
+              "insert into root.db.opc1(time, value, quality, other) values 
(1, 1, false, 1)",
+              "insert into root.db.opc2(time, value, quality, other) values 
(1, 1, false, 1)"),
+          null);
+
+      long startTime = System.currentTimeMillis();
+      while (true) {
+        try {
+          value =
+              opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, 
"root/db/opc")).get();
+          Assert.assertEquals(new Variant(1.0), value.getValue());
+          Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
+          Assert.assertEquals(new DateTime(timestampToUtc(1)), 
value.getSourceTime());
+
+          value =
+              opcUaClient
+                  .readValue(0, TimestampsToReturn.Both, new NodeId(2, 
"root/db/opc1"))
+                  .get();
+          Assert.assertEquals(new Variant(1.0), value.getValue());
+          Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
+          Assert.assertEquals(new DateTime(timestampToUtc(1)), 
value.getSourceTime());
+
+          value =
+              opcUaClient
+                  .readValue(0, TimestampsToReturn.Both, new NodeId(2, 
"root/db/opc2"))
+                  .get();
+          Assert.assertEquals(new Variant(1.0), value.getValue());
+          Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
+          Assert.assertEquals(new DateTime(timestampToUtc(1)), 
value.getSourceTime());
+          break;
+        } catch (final Throwable t) {
+          if (System.currentTimeMillis() - startTime > 10_000L) {
+            throw t;
+          }
+        }
+      }
+
+      TestUtils.executeNonQuery(
+          env, "insert into root.db.opc(time, quality) values (2, true)", 
null);
+      TestUtils.executeNonQuery(env, "insert into root.db.opc(time, value) 
values (2, 2)", null);
+
+      startTime = System.currentTimeMillis();
+      while (true) {
+        try {
+          value =
+              opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, 
"root/db/opc")).get();
+          Assert.assertEquals(new DateTime(timestampToUtc(2)), 
value.getSourceTime());
+          Assert.assertEquals(new Variant(2.0), value.getValue());
+          Assert.assertEquals(StatusCode.UNCERTAIN, value.getStatusCode());
+          break;
+        } catch (final Throwable t) {
+          if (System.currentTimeMillis() - startTime > 10_000L) {
+            throw t;
+          }
+        }
+      }
+
+      opcUaClient.disconnect().get();
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.dropPipe("testPipe").getCode());
 
       // Test reconstruction
-      connectorAttributes.put("password", "test");
+      sinkAttributes.put("password", "test");
+      sinkAttributes.put("security-policy", "basic256sha256");
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(),
           client
               .createPipe(
-                  new TCreatePipeReq("testPipe", connectorAttributes)
+                  new TCreatePipeReq("testPipe", sinkAttributes)
                       .setExtractorAttributes(Collections.emptyMap())
                       .setProcessorAttributes(Collections.emptyMap()))
               .getCode());
 
+      // Banned none, only allows basic256sha256
+      final int finalTcpPort = tcpPort;
+      Assert.assertThrows(
+          PipeException.class,
+          () ->
+              getOpcUaClient(
+                  "opc.tcp://127.0.0.1:" + finalTcpPort + "/iotdb",
+                  SecurityPolicy.None,
+                  "root",
+                  "root"));
+
       // Test conflict
-      connectorAttributes.put("password", "conflict");
-      Assert.assertEquals(
-          TSStatusCode.PIPE_ERROR.getStatusCode(),
-          client
-              .createPipe(
-                  new TCreatePipeReq("testPipe", connectorAttributes)
-                      .setExtractorAttributes(Collections.emptyMap())
-                      .setProcessorAttributes(Collections.emptyMap()))
-              .getCode());
+      sinkAttributes.put("password", "conflict");
+      try {
+        TestUtils.executeNonQuery(
+            env,
+            String.format(
+                "create pipe test1 ('sink'='opc-ua-sink', 
'password'='conflict@pswd', 'tcp.port'='%s', 'https.port'='%s')",
+                tcpPort, httpsPort),
+            null);
+        Assert.fail();
+      } catch (final Exception e) {
+        Assert.assertEquals(
+            String.format(
+                "org.apache.iotdb.jdbc.IoTDBSQLException: 1107: The existing 
server with tcp port %s and https port %s's password **** conflicts to the new 
password ****, reject reusing.",
+                tcpPort, httpsPort),
+            e.getMessage());
+      }
+    } finally {
+      if (tcpPort >= 0) {
+        final String lockPath = EnvUtils.getLockFilePath(tcpPort);
+        if (!new File(lockPath).delete()) {
+          System.out.printf("Delete lock file %s failed%n", lockPath);
+        }
+      }
     }
   }
+
+  private static OpcUaClient getOpcUaClient(
+      final String nodeUrl,
+      final SecurityPolicy policy,
+      final String userName,
+      final String password) {
+    final IoTDBOpcUaClient client;
+
+    final IdentityProvider provider =
+        Objects.nonNull(userName)
+            ? new UsernameProvider(userName, password)
+            : new AnonymousProvider();
+
+    final String securityDir =
+        CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE
+            + File.separatorChar
+            + 
UUID.nameUUIDFromBytes(nodeUrl.getBytes(TSFileConfig.STRING_CHARSET));
+
+    client = new IoTDBOpcUaClient(nodeUrl, policy, provider, false);
+    new ClientRunner(client, securityDir, password, userName, 10).run();
+    return client.getClient();
+  }
 }
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index 3dcb2d19b0a..b6d59c827f0 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -59,7 +59,9 @@ public class PipeParameters {
   }
 
   public boolean hasAttribute(final String key) {
-    return attributes.containsKey(key) || 
attributes.containsKey(KeyReducer.reduce(key));
+    return attributes.containsKey(key)
+        || attributes.containsKey(KeyReducer.shallowReduce(key))
+        || attributes.containsKey(KeyReducer.reduce(key));
   }
 
   public boolean hasAnyAttributes(final String... keys) {
@@ -76,7 +78,11 @@ public class PipeParameters {
   }
 
   public String getString(final String key) {
-    final String value = attributes.get(key);
+    String value = attributes.get(key);
+    if (Objects.nonNull(value)) {
+      return value;
+    }
+    value = attributes.get(KeyReducer.shallowReduce(key));
     return value != null ? value : attributes.get(KeyReducer.reduce(key));
   }
 
@@ -350,29 +356,52 @@ public class PipeParameters {
   }
 
   private static class KeyReducer {
-
-    private static final Set<String> PREFIXES = new HashSet<>();
+    private static final Set<String> FIRST_PREFIXES = new HashSet<>();
+    private static final Set<String> SECOND_PREFIXES = new HashSet<>();
 
     static {
-      PREFIXES.add("extractor.");
-      PREFIXES.add("source.");
-      PREFIXES.add("processor.");
-      PREFIXES.add("connector.");
-      PREFIXES.add("sink.");
+      FIRST_PREFIXES.add("extractor.");
+      FIRST_PREFIXES.add("source.");
+      FIRST_PREFIXES.add("processor.");
+      FIRST_PREFIXES.add("connector.");
+      FIRST_PREFIXES.add("sink.");
+
+      SECOND_PREFIXES.add("opcua.");
     }
 
-    static String reduce(final String key) {
+    static String shallowReduce(String key) {
       if (key == null) {
         return null;
       }
       final String lowerCaseKey = key.toLowerCase();
-      for (final String prefix : PREFIXES) {
+      for (final String prefix : FIRST_PREFIXES) {
         if (lowerCaseKey.startsWith(prefix)) {
           return key.substring(prefix.length());
         }
       }
       return key;
     }
+
+    public static String reduce(String key) {
+      if (key == null) {
+        return null;
+      }
+      String lowerCaseKey = key.toLowerCase();
+      for (final String prefix : FIRST_PREFIXES) {
+        if (lowerCaseKey.startsWith(prefix)) {
+          key = key.substring(prefix.length());
+          lowerCaseKey = lowerCaseKey.substring(prefix.length());
+          break;
+        }
+      }
+      for (final String prefix : SECOND_PREFIXES) {
+        if (lowerCaseKey.startsWith(prefix)) {
+          key = key.substring(prefix.length());
+          break;
+        }
+      }
+      return key;
+    }
   }
 
   public static class ValueHider {
diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml
index 2a019e1fc43..175a77e746e 100644
--- a/iotdb-core/datanode/pom.xml
+++ b/iotdb-core/datanode/pom.xml
@@ -206,6 +206,18 @@
             <groupId>org.checkerframework</groupId>
             <artifactId>checker-qual</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.eclipse.milo</groupId>
+            <artifactId>stack-client</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.milo</groupId>
+            <artifactId>sdk-client</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.bouncycastle</groupId>
+            <artifactId>bcprov-jdk18on</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.eclipse.jetty</groupId>
             <artifactId>jetty-http</artifactId>
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
index 5ce35e75053..e1985061224 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
@@ -22,6 +22,10 @@ package org.apache.iotdb.db.pipe.sink.protocol.opcua;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.ClientRunner;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.IoTDBOpcUaClient;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaServerBuilder;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
@@ -30,44 +34,86 @@ import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
+import org.apache.tsfile.common.conf.TSFileConfig;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.write.record.Tablet;
+import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
+import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
+import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
 import org.eclipse.milo.opcua.sdk.server.OpcUaServer;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.File;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_BAD_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_UNCERTAIN_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_HISTORIZING_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_HISTORIZING_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_HTTPS_BIND_PORT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_PUB_SUB_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_NODE_URL_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_NAME_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_NAME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_AES128_SHA256_RSAOAEP_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_AES256_SHA256_RSAPSS_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_128_RSA_15_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_SHA_256_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_NONE_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_SERVER_DEFAULT_VALUES;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TIMEOUT_SECONDS_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TIMEOUT_SECONDS_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_VALUE_NAME_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_VALUE_NAME_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_WITH_QUALITY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USERNAME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USER_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_DEFAULT_QUALITY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_HISTORIZING_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_HTTPS_BIND_PORT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_MODEL_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_NODE_URL_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_QUALITY_NAME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_SECURITY_DIR_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_SECURITY_POLICY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_TCP_BIND_PORT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_TIMEOUT_SECONDS_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_VALUE_NAME_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_WITH_QUALITY_KEY;
 
 /**
  * Send data in IoTDB based on Opc Ua protocol, using Eclipse Milo. All data 
are converted into
@@ -83,9 +129,21 @@ public class OpcUaSink implements PipeConnector {
 
   private static final Map<String, Pair<AtomicInteger, OpcUaNameSpace>>
       SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP = new 
ConcurrentHashMap<>();
+  private static final Map<String, Pair<AtomicInteger, IoTDBOpcUaClient>>
+      CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP = new ConcurrentHashMap<>();
 
   private String serverKey;
-  private OpcUaNameSpace nameSpace;
+  private String nodeUrl;
+  private boolean isClientServerModel;
+  private @Nullable String valueName;
+  private @Nullable String qualityName;
+  private StatusCode defaultQuality;
+
+  // Inner server
+  private @Nullable OpcUaNameSpace nameSpace;
+
+  // Outer server
+  private @Nullable IoTDBOpcUaClient client;
 
   @Override
   public void validate(final PipeParameterValidator validator) throws 
Exception {
@@ -104,13 +162,71 @@ public class OpcUaSink implements PipeConnector {
             Arrays.asList(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY),
             Arrays.asList(CONNECTOR_IOTDB_USERNAME_KEY, 
SINK_IOTDB_USERNAME_KEY),
             false);
-    ;
+
+    final PipeParameters parameters = validator.getParameters();
+    if (validator
+            .getParameters()
+            .hasAnyAttributes(CONNECTOR_OPC_UA_NODE_URL_KEY, 
SINK_OPC_UA_NODE_URL_KEY)
+        || parameters.getBooleanOrDefault(
+            Arrays.asList(CONNECTOR_OPC_UA_WITH_QUALITY_KEY, 
SINK_OPC_UA_WITH_QUALITY_KEY),
+            CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE)) {
+      validator.validate(
+          CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE::equals,
+          String.format(
+              "When the OPC UA sink points to an outer server or sets 
'with-quality' to true, the %s or %s must be %s.",
+              CONNECTOR_OPC_UA_MODEL_KEY,
+              SINK_OPC_UA_MODEL_KEY,
+              CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE),
+          parameters.getStringOrDefault(
+              Arrays.asList(CONNECTOR_OPC_UA_MODEL_KEY, SINK_OPC_UA_MODEL_KEY),
+              CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE));
+    }
   }
 
   @Override
   public void customize(
       final PipeParameters parameters, final PipeConnectorRuntimeConfiguration 
configuration)
       throws Exception {
+    final boolean withQuality =
+        parameters.getBooleanOrDefault(
+            Arrays.asList(CONNECTOR_OPC_UA_WITH_QUALITY_KEY, 
SINK_OPC_UA_WITH_QUALITY_KEY),
+            CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE);
+    valueName =
+        withQuality
+            ? parameters.getStringOrDefault(
+                Arrays.asList(CONNECTOR_OPC_UA_VALUE_NAME_KEY, 
SINK_OPC_UA_VALUE_NAME_KEY),
+                CONNECTOR_OPC_UA_VALUE_NAME_DEFAULT_VALUE)
+            : null;
+    qualityName =
+        withQuality
+            ? parameters.getStringOrDefault(
+                Arrays.asList(CONNECTOR_OPC_UA_QUALITY_NAME_KEY, 
SINK_OPC_UA_QUALITY_NAME_KEY),
+                CONNECTOR_OPC_UA_QUALITY_NAME_DEFAULT_VALUE)
+            : null;
+    defaultQuality =
+        getQuality(
+            withQuality
+                ? parameters.getStringOrDefault(
+                    Arrays.asList(
+                        CONNECTOR_OPC_UA_DEFAULT_QUALITY_KEY, 
SINK_OPC_UA_DEFAULT_QUALITY_KEY),
+                    CONNECTOR_OPC_UA_DEFAULT_QUALITY_UNCERTAIN_VALUE)
+                : CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE);
+    isClientServerModel =
+        parameters
+            .getStringOrDefault(
+                Arrays.asList(CONNECTOR_OPC_UA_MODEL_KEY, 
SINK_OPC_UA_MODEL_KEY),
+                CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE)
+            .equals(CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE);
+
+    nodeUrl = parameters.getStringByKeys(CONNECTOR_OPC_UA_NODE_URL_KEY, 
SINK_OPC_UA_NODE_URL_KEY);
+    if (Objects.isNull(nodeUrl)) {
+      customizeServer(parameters);
+    } else {
+      customizeClient(parameters);
+    }
+  }
+
+  private void customizeServer(final PipeParameters parameters) {
     final int tcpBindPort =
         parameters.getIntOrDefault(
             Arrays.asList(CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY, 
SINK_OPC_UA_TCP_BIND_PORT_KEY),
@@ -147,6 +263,21 @@ public class OpcUaSink implements PipeConnector {
                 CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY,
                 SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY),
             CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE);
+    final Set<SecurityPolicy> securityPolicies =
+        (parameters.hasAnyAttributes(
+                    CONNECTOR_OPC_UA_SECURITY_POLICY_KEY, 
SINK_OPC_UA_SECURITY_POLICY_KEY)
+                ? Arrays.stream(
+                    parameters
+                        .getStringByKeys(
+                            CONNECTOR_OPC_UA_SECURITY_POLICY_KEY, 
SINK_OPC_UA_SECURITY_POLICY_KEY)
+                        .replace(" ", "")
+                        .split(","))
+                : 
CONNECTOR_OPC_UA_SECURITY_POLICY_SERVER_DEFAULT_VALUES.stream())
+            .map(this::getSecurityPolicy)
+            .collect(Collectors.toSet());
+    if (securityPolicies.isEmpty()) {
+      throw new PipeException("The security policy cannot be empty.");
+    }
 
     synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP) {
       serverKey = httpsBindPort + ":" + tcpBindPort;
@@ -165,25 +296,22 @@ public class OpcUaSink implements PipeConnector {
                                 .setUser(user)
                                 .setPassword(password)
                                 .setSecurityDir(securityDir)
-                                
.setEnableAnonymousAccess(enableAnonymousAccess);
+                                
.setEnableAnonymousAccess(enableAnonymousAccess)
+                                .setSecurityPolicies(securityPolicies);
                         final OpcUaServer newServer = builder.build();
-                        nameSpace =
-                            new OpcUaNameSpace(
-                                newServer,
-                                parameters
-                                    .getStringOrDefault(
-                                        Arrays.asList(
-                                            CONNECTOR_OPC_UA_MODEL_KEY, 
SINK_OPC_UA_MODEL_KEY),
-                                        CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE)
-                                    
.equals(CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE),
-                                builder);
+                        nameSpace = new OpcUaNameSpace(newServer, builder);
                         nameSpace.startup();
                         newServer.startup().get();
                         return new Pair<>(new AtomicInteger(0), nameSpace);
                       } else {
                         oldValue
                             .getRight()
-                            .checkEquals(user, password, securityDir, 
enableAnonymousAccess);
+                            .checkEquals(
+                                user,
+                                password,
+                                securityDir,
+                                enableAnonymousAccess,
+                                securityPolicies);
                         return oldValue;
                       }
                     } catch (final PipeException e) {
@@ -197,6 +325,104 @@ public class OpcUaSink implements PipeConnector {
     }
   }
 
+  private void customizeClient(final PipeParameters parameters) {
+    final SecurityPolicy policy =
+        getSecurityPolicy(
+            parameters
+                .getStringOrDefault(
+                    Arrays.asList(
+                        CONNECTOR_OPC_UA_SECURITY_POLICY_KEY, 
SINK_OPC_UA_SECURITY_POLICY_KEY),
+                    CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_SHA_256_VALUE)
+                .toUpperCase());
+
+    final IdentityProvider provider;
+    final String userName =
+        parameters.getStringByKeys(CONNECTOR_IOTDB_USER_KEY, 
SINK_IOTDB_USER_KEY);
+    final String password =
+        parameters.getStringOrDefault(
+            Arrays.asList(CONNECTOR_IOTDB_PASSWORD_KEY, 
SINK_IOTDB_PASSWORD_KEY),
+            CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE);
+    provider =
+        Objects.nonNull(userName)
+            ? new UsernameProvider(userName, password)
+            : new AnonymousProvider();
+
+    final String securityDir =
+        IoTDBConfig.addDataHomeDir(
+            parameters.getStringOrDefault(
+                Arrays.asList(CONNECTOR_OPC_UA_SECURITY_DIR_KEY, 
SINK_OPC_UA_SECURITY_DIR_KEY),
+                CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE
+                    + File.separatorChar
+                    + 
UUID.nameUUIDFromBytes(nodeUrl.getBytes(TSFileConfig.STRING_CHARSET))));
+
+    final long timeoutSeconds =
+        parameters.getLongOrDefault(
+            Arrays.asList(CONNECTOR_OPC_UA_TIMEOUT_SECONDS_KEY, 
SINK_OPC_UA_TIMEOUT_SECONDS_KEY),
+            CONNECTOR_OPC_UA_TIMEOUT_SECONDS_DEFAULT_VALUE);
+
+    synchronized (CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP) {
+      client =
+          CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP
+              .compute(
+                  nodeUrl,
+                  (key, oldValue) -> {
+                    if (Objects.isNull(oldValue)) {
+                      final IoTDBOpcUaClient result =
+                          new IoTDBOpcUaClient(
+                              nodeUrl,
+                              policy,
+                              provider,
+                              parameters.getBooleanOrDefault(
+                                  Arrays.asList(
+                                      CONNECTOR_OPC_UA_HISTORIZING_KEY,
+                                      SINK_OPC_UA_HISTORIZING_KEY),
+                                  CONNECTOR_OPC_UA_HISTORIZING_DEFAULT_VALUE));
+                      final ClientRunner runner =
+                          new ClientRunner(result, securityDir, password, 
userName, timeoutSeconds);
+                      runner.run();
+                      return new Pair<>(new AtomicInteger(0), result);
+                    }
+                    oldValue.getRight().checkEquals(userName, password, 
securityDir, policy);
+                    return oldValue;
+                  })
+              .getRight();
+      
CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP.get(nodeUrl).getLeft().incrementAndGet();
+    }
+  }
+
+  private SecurityPolicy getSecurityPolicy(final String securityPolicy) {
+    switch (securityPolicy.toUpperCase()) {
+      case CONNECTOR_OPC_UA_SECURITY_POLICY_NONE_VALUE:
+        return SecurityPolicy.None;
+      case CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_128_RSA_15_VALUE:
+        return SecurityPolicy.Basic128Rsa15;
+      case CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_VALUE:
+        return SecurityPolicy.Basic256;
+      case CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_SHA_256_VALUE:
+        return SecurityPolicy.Basic256Sha256;
+      case CONNECTOR_OPC_UA_SECURITY_POLICY_AES128_SHA256_RSAOAEP_VALUE:
+        return SecurityPolicy.Aes128_Sha256_RsaOaep;
+      case CONNECTOR_OPC_UA_SECURITY_POLICY_AES256_SHA256_RSAPSS_VALUE:
+        return SecurityPolicy.Aes256_Sha256_RsaPss;
+      default:
+        throw new PipeException(
+            "The security policy can only be 'None', 'Basic128Rsa15', 
'Basic256', 'Basic256Sha256', 'Aes128_Sha256_RsaOaep' or 
'Aes256_Sha256_RsaPss'.");
+    }
+  }
+
+  private StatusCode getQuality(final String quality) {
+    switch (quality.toUpperCase()) {
+      case CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE:
+        return StatusCode.GOOD;
+      case CONNECTOR_OPC_UA_DEFAULT_QUALITY_BAD_VALUE:
+        return StatusCode.BAD;
+      case CONNECTOR_OPC_UA_DEFAULT_QUALITY_UNCERTAIN_VALUE:
+        return StatusCode.UNCERTAIN;
+      default:
+        throw new PipeException("The default quality can only be 'GOOD', 'BAD' 
or 'UNCERTAIN'.");
+    }
+  }
+
   @Override
   public void handshake() throws Exception {
     // Server side, do nothing
@@ -214,7 +440,19 @@ public class OpcUaSink implements PipeConnector {
 
   @Override
   public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
-    transferByTablet(tabletInsertionEvent, LOGGER, tablet -> 
nameSpace.transfer(tablet));
+    transferByTablet(
+        tabletInsertionEvent,
+        LOGGER,
+        tablet -> {
+          if (Objects.nonNull(nameSpace)) {
+            nameSpace.transfer(tablet, this);
+          } else if (Objects.nonNull(client)) {
+            client.transfer(tablet, this);
+          } else {
+            throw new PipeException(
+                "No OPC client or server is specified when transferring 
tablet");
+          }
+        });
   }
 
   public static void transferByTablet(
@@ -299,5 +537,45 @@ public class OpcUaSink implements PipeConnector {
         }
       }
     }
+
+    if (nodeUrl == null) {
+      return;
+    }
+
+    synchronized (CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP) {
+      final Pair<AtomicInteger, IoTDBOpcUaClient> pair =
+          CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP.get(nodeUrl);
+      if (pair == null) {
+        return;
+      }
+
+      if (pair.getLeft().decrementAndGet() <= 0) {
+        try {
+          pair.getRight().disconnect();
+        } finally {
+          CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP.remove(nodeUrl);
+        }
+      }
+    }
+  }
+
+  /////////////////////////////// Getter ///////////////////////////////
+
+  public boolean isClientServerModel() {
+    return isClientServerModel;
+  }
+
+  @Nullable
+  public String getValueName() {
+    return valueName;
+  }
+
+  @Nullable
+  public String getQualityName() {
+    return qualityName;
+  }
+
+  public StatusCode getDefaultQuality() {
+    return defaultQuality;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
new file mode 100644
index 00000000000..9e3b46d463a
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.protocol.opcua.client;
+
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import 
org.eclipse.milo.opcua.stack.client.security.DefaultClientCertificateValidator;
+import org.eclipse.milo.opcua.stack.core.security.DefaultTrustListManager;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.Security;
+import java.util.Objects;
+
+import static 
org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
+
+public class ClientRunner {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(ClientRunner.class);
+
+  static {
+    // Required for SecurityPolicy.Aes256_Sha256_RsaPss
+    Security.addProvider(new BouncyCastleProvider());
+  }
+
+  private final IoTDBOpcUaClient configurableUaClient;
+  private final Path securityDir;
+  private final String password;
+  private final long timeoutSeconds;
+
+  // For conflict checking
+  private final String user;
+
+  public ClientRunner(
+      final IoTDBOpcUaClient configurableUaClient,
+      final String securityDir,
+      final String password,
+      final String user,
+      final long timeoutSeconds) {
+    this.configurableUaClient = configurableUaClient;
+    this.securityDir = Paths.get(securityDir);
+    this.password = password;
+    this.user = user;
+    this.timeoutSeconds = timeoutSeconds;
+    configurableUaClient.setRunner(this);
+  }
+
+  private OpcUaClient createClient() throws Exception {
+    Files.createDirectories(securityDir);
+    if (!Files.exists(securityDir)) {
+      throw new Exception("unable to create security dir: " + securityDir);
+    }
+
+    final File pkiDir = securityDir.resolve("pki").toFile();
+
+    logger.info("security dir: {}", securityDir.toAbsolutePath());
+    logger.info("security pki dir: {}", pkiDir.getAbsolutePath());
+
+    final IoTDBKeyStoreLoaderClient loader =
+        new IoTDBKeyStoreLoaderClient().load(securityDir, 
password.toCharArray());
+
+    final DefaultTrustListManager trustListManager = new 
DefaultTrustListManager(pkiDir);
+
+    final DefaultClientCertificateValidator certificateValidator =
+        new DefaultClientCertificateValidator(trustListManager);
+
+    return OpcUaClient.create(
+        configurableUaClient.getNodeUrl(),
+        endpoints -> 
endpoints.stream().filter(configurableUaClient.endpointFilter()).findFirst(),
+        configBuilder ->
+            configBuilder
+                .setApplicationName(LocalizedText.english("Apache IoTDB OPC UA 
client"))
+                .setApplicationUri("urn:apache:iotdb:opc-ua-client")
+                .setKeyPair(loader.getClientKeyPair())
+                .setCertificate(loader.getClientCertificate())
+                .setCertificateChain(loader.getClientCertificateChain())
+                .setCertificateValidator(certificateValidator)
+                
.setIdentityProvider(configurableUaClient.getIdentityProvider())
+                .setRequestTimeout(uint(timeoutSeconds * 1000L))
+                .setConnectTimeout(uint(timeoutSeconds * 1000L))
+                .setMaxResponseMessageSize(uint(0))
+                .build());
+  }
+
+  public void run() {
+    try {
+      final OpcUaClient client = createClient();
+
+      try {
+        configurableUaClient.run(client);
+      } catch (final Exception e) {
+        throw new PipeException(
+            "Error running opc client: " + e.getClass().getSimpleName() + ": " 
+ e.getMessage());
+      }
+    } catch (final Exception e) {
+      throw new PipeException(
+          "Error getting opc client: " + e.getClass().getSimpleName() + ": " + 
e.getMessage());
+    }
+  }
+
+  long getTimeoutSeconds() {
+    return timeoutSeconds;
+  }
+
+  /////////////////////////////// Conflict detection 
///////////////////////////////
+
+  void checkEquals(
+      final String user,
+      final String password,
+      final Path securityDir,
+      final SecurityPolicy securityPolicy) {
+    checkEquals("user", this.user, user);
+    checkEquals("password", this.password, password);
+    checkEquals(
+        "security dir",
+        
FileSystems.getDefault().getPath(this.securityDir.toAbsolutePath().toString()),
+        
FileSystems.getDefault().getPath(securityDir.toAbsolutePath().toString()));
+    checkEquals("securityPolicy", configurableUaClient.getSecurityPolicy(), 
securityPolicy);
+  }
+
+  private void checkEquals(final String attrName, Object thisAttr, Object 
thatAttr) {
+    if (!Objects.equals(thisAttr, thatAttr)) {
+      if (attrName.equals("password")) {
+        thisAttr = "****";
+        thatAttr = "****";
+      }
+      throw new PipeException(
+          String.format(
+              "The existing server with nodeUrl %s's %s %s conflicts to the 
new %s %s, reject reusing.",
+              configurableUaClient.getNodeUrl(), attrName, thisAttr, attrName, 
thatAttr));
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBKeyStoreLoaderClient.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBKeyStoreLoaderClient.java
new file mode 100644
index 00000000000..bfaf378822c
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBKeyStoreLoaderClient.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.protocol.opcua.client;
+
+import org.eclipse.milo.opcua.sdk.server.util.HostnameUtil;
+import org.eclipse.milo.opcua.stack.core.util.SelfSignedCertificateBuilder;
+import org.eclipse.milo.opcua.stack.core.util.SelfSignedCertificateGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.Key;
+import java.security.KeyPair;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.PublicKey;
+import java.security.cert.X509Certificate;
+import java.util.Arrays;
+import java.util.regex.Pattern;
+
+class IoTDBKeyStoreLoaderClient {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(ClientRunner.class);
+
+  private static final Pattern IP_ADDR_PATTERN =
+      
Pattern.compile("^(([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.){3}([01]?\\d\\d?|2[0-4]\\d|25[0-5])$");
+
+  private static final String CLIENT_ALIAS = "client-ai";
+
+  private X509Certificate[] clientCertificateChain;
+  private X509Certificate clientCertificate;
+  private KeyPair clientKeyPair;
+
+  IoTDBKeyStoreLoaderClient load(final Path baseDir, final char[] password) 
throws Exception {
+    final KeyStore keyStore = KeyStore.getInstance("PKCS12");
+
+    final Path serverKeyStore = baseDir.resolve("iotdb-client.pfx");
+
+    logger.info("Loading KeyStore at {}.", serverKeyStore);
+
+    if (!Files.exists(serverKeyStore)) {
+      keyStore.load(null, password);
+
+      final KeyPair keyPair = 
SelfSignedCertificateGenerator.generateRsaKeyPair(2048);
+
+      final SelfSignedCertificateBuilder builder =
+          new SelfSignedCertificateBuilder(keyPair)
+              .setCommonName("Apache IoTDB OPC UA client")
+              .setOrganization("Apache")
+              .setOrganizationalUnit("dev")
+              .setLocalityName("Beijing")
+              .setStateName("China")
+              .setCountryCode("CN")
+              .setApplicationUri("urn:apache:iotdb:opc-ua-client")
+              .addDnsName("localhost")
+              .addIpAddress("127.0.0.1");
+
+      // Get as many hostnames and IP addresses as we can listed in the 
certificate.
+      for (String hostname : HostnameUtil.getHostnames("0.0.0.0")) {
+        if (IP_ADDR_PATTERN.matcher(hostname).matches()) {
+          builder.addIpAddress(hostname);
+        } else {
+          builder.addDnsName(hostname);
+        }
+      }
+
+      final X509Certificate certificate = builder.build();
+
+      keyStore.setKeyEntry(
+          CLIENT_ALIAS, keyPair.getPrivate(), password, new X509Certificate[] 
{certificate});
+      try (OutputStream out = Files.newOutputStream(serverKeyStore)) {
+        keyStore.store(out, password);
+      }
+    } else {
+      try (InputStream in = Files.newInputStream(serverKeyStore)) {
+        keyStore.load(in, password);
+      }
+    }
+
+    final Key clientPrivateKey = keyStore.getKey(CLIENT_ALIAS, password);
+    if (clientPrivateKey instanceof PrivateKey) {
+      clientCertificate = (X509Certificate) 
keyStore.getCertificate(CLIENT_ALIAS);
+
+      clientCertificateChain =
+          Arrays.stream(keyStore.getCertificateChain(CLIENT_ALIAS))
+              .map(X509Certificate.class::cast)
+              .toArray(X509Certificate[]::new);
+
+      final PublicKey serverPublicKey = clientCertificate.getPublicKey();
+      clientKeyPair = new KeyPair(serverPublicKey, (PrivateKey) 
clientPrivateKey);
+    }
+
+    return this;
+  }
+
+  X509Certificate getClientCertificate() {
+    return clientCertificate;
+  }
+
+  public X509Certificate[] getClientCertificateChain() {
+    return clientCertificateChain;
+  }
+
+  KeyPair getClientKeyPair() {
+    return clientKeyPair;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
new file mode 100644
index 00000000000..2019c0fe833
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.protocol.opcua.client;
+
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
+import org.eclipse.milo.opcua.sdk.core.AccessLevel;
+import org.eclipse.milo.opcua.sdk.core.ValueRanks;
+import org.eclipse.milo.opcua.stack.core.Identifiers;
+import org.eclipse.milo.opcua.stack.core.StatusCodes;
+import org.eclipse.milo.opcua.stack.core.UaException;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
+import org.eclipse.milo.opcua.stack.core.types.builtin.ExpandedNodeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.ExtensionObject;
+import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
+import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
+import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
+import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass;
+import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesItem;
+import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesResponse;
+import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesResult;
+import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
+import org.eclipse.milo.opcua.stack.core.types.structured.ObjectAttributes;
+import org.eclipse.milo.opcua.stack.core.types.structured.VariableAttributes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Predicate;
+
+import static 
org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.convertToOpcDataType;
+import static 
org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.timestampToUtc;
+import static org.eclipse.milo.opcua.stack.core.StatusCodes.Bad_Timeout;
+
+public class IoTDBOpcUaClient {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(OpcUaNameSpace.class);
+
+  // Customized nodes
+  private static final int NAME_SPACE_INDEX = 2;
+
+  // Useless for a server only accept client writing
+  private static final double SAMPLING_INTERVAL_PLACEHOLDER = 500;
+  private final String nodeUrl;
+
+  private final SecurityPolicy securityPolicy;
+  private final IdentityProvider identityProvider;
+  private OpcUaClient client;
+  private final boolean historizing;
+  private ClientRunner runner;
+
+  public IoTDBOpcUaClient(
+      final String nodeUrl,
+      final SecurityPolicy securityPolicy,
+      final IdentityProvider identityProvider,
+      final boolean historizing) {
+    this.nodeUrl = nodeUrl;
+    this.securityPolicy = securityPolicy;
+    this.identityProvider = identityProvider;
+    this.historizing = historizing;
+  }
+
+  public void run(final OpcUaClient client) throws Exception {
+    // synchronous connect
+    this.client = client;
+    long startTime = System.currentTimeMillis();
+    while (System.currentTimeMillis() - startTime < runner.getTimeoutSeconds() 
* 1000L) {
+      try {
+        client.connect().get();
+      } catch (final ExecutionException e) {
+        if (e.getCause() instanceof UaException
+            && ((UaException) e.getCause()).getStatusCode().getValue() == 
Bad_Timeout) {
+          Thread.sleep(1000L);
+          continue;
+        }
+        throw e;
+      }
+      break;
+    }
+  }
+
+  // Only support tree model & client-server
+  public void transfer(final Tablet tablet, final OpcUaSink sink) throws 
Exception {
+    OpcUaNameSpace.transferTabletForClientServerModel(
+        tablet, sink, this::transferTabletRowForClientServerModel);
+  }
+
+  private void transferTabletRowForClientServerModel(
+      final String[] segments,
+      final List<MeasurementSchema> measurementSchemas,
+      final List<Long> timestamps,
+      final List<Object> values,
+      final OpcUaSink sink)
+      throws Exception {
+    StatusCode currentQuality = sink.getDefaultQuality();
+    Object value = null;
+    long timestamp = 0;
+    NodeId nodeId = null;
+    NodeId opcDataType = null;
+
+    for (int i = 0; i < measurementSchemas.size(); ++i) {
+      if (Objects.isNull(values.get(i))) {
+        continue;
+      }
+      final String name = measurementSchemas.get(i).getMeasurementId();
+      final TSDataType type = measurementSchemas.get(i).getType();
+      if (Objects.nonNull(sink.getQualityName()) && 
sink.getQualityName().equals(name)) {
+        if (!type.equals(TSDataType.BOOLEAN)) {
+          throw new UnsupportedOperationException(
+              "The quality value only supports boolean type, while true == 
GOOD and false == BAD.");
+        }
+        currentQuality = values.get(i) == Boolean.TRUE ? StatusCode.GOOD : 
StatusCode.BAD;
+        continue;
+      }
+      if (Objects.nonNull(sink.getValueName()) && 
!sink.getValueName().equals(name)) {
+        PipeLogger.log(
+            LOGGER::warn,
+            "When the 'with-quality' mode is enabled, the measurement must be 
either \"value-name\" or \"quality-name\"");
+        continue;
+      }
+      nodeId = new NodeId(NAME_SPACE_INDEX, String.join("/", segments));
+
+      final long utcTimestamp = 
timestampToUtc(timestamps.get(timestamps.size() > 1 ? i : 0));
+      value = values.get(i);
+      timestamp = utcTimestamp;
+      opcDataType = convertToOpcDataType(type);
+    }
+    if (Objects.isNull(value)) {
+      return;
+    }
+
+    final Variant variant = new Variant(value);
+    final DataValue dataValue =
+        new DataValue(variant, currentQuality, new DateTime(timestamp), new 
DateTime());
+    StatusCode writeStatus = client.writeValue(nodeId, dataValue).get();
+
+    if (writeStatus.getValue() == StatusCodes.Bad_NodeIdUnknown) {
+      final AddNodesResponse addStatus =
+          client.addNodes(getNodesToAdd(segments, opcDataType, variant)).get();
+      for (final AddNodesResult result : addStatus.getResults()) {
+        if (!result.getStatusCode().equals(StatusCode.GOOD)
+            && !(result.getStatusCode().getValue() == 
StatusCodes.Bad_NodeIdExists)) {
+          throw new PipeException(
+              "Failed to create nodes after transfer data value, creation 
status: "
+                  + addStatus
+                  + getErrorString(segments, opcDataType, value, writeStatus));
+        }
+      }
+      writeStatus = client.writeValue(nodeId, dataValue).get();
+      if (writeStatus.getValue() != StatusCode.GOOD.getValue()) {
+        throw new PipeException(
+            "Failed to transfer dataValue after successfully created nodes"
+                + getErrorString(segments, opcDataType, value, writeStatus));
+      }
+    } else if (writeStatus.getValue() != StatusCode.GOOD.getValue()) {
+      throw new PipeException(
+          "Failed to transfer dataValue"
+              + getErrorString(segments, opcDataType, value, writeStatus));
+    }
+  }
+
+  private static String getErrorString(
+      final String[] segments,
+      final NodeId dataType,
+      final Object value,
+      final StatusCode writeStatus) {
+    return ", segments: "
+        + Arrays.toString(segments)
+        + ", dataType: "
+        + dataType
+        + ", value: "
+        + value
+        + ", error: "
+        + writeStatus;
+  }
+
+  public List<AddNodesItem> getNodesToAdd(
+      final String[] segments, final NodeId opcDataType, final Variant 
initialValue) {
+    final List<AddNodesItem> addNodesItems = new ArrayList<>();
+    final StringBuilder sb = new StringBuilder(segments[0]);
+    ExpandedNodeId curNodeId = new NodeId(NAME_SPACE_INDEX, 
segments[0]).expanded();
+    addNodesItems.add(
+        new AddNodesItem(
+            Identifiers.ObjectsFolder.expanded(),
+            Identifiers.Organizes,
+            curNodeId,
+            new QualifiedName(NAME_SPACE_INDEX, segments[0]),
+            NodeClass.Object,
+            ExtensionObject.encode(
+                client.getStaticSerializationContext(), 
createFolderAttributes(segments[0])),
+            Identifiers.FolderType.expanded()));
+
+    // segments.length >= 3
+    for (int i = 1; i < segments.length - 1; ++i) {
+      sb.append("/").append(segments[i]);
+      final ExpandedNodeId nextId = new NodeId(NAME_SPACE_INDEX, 
sb.toString()).expanded();
+      addNodesItems.add(
+          new AddNodesItem(
+              curNodeId,
+              Identifiers.Organizes,
+              nextId,
+              new QualifiedName(NAME_SPACE_INDEX, segments[i]),
+              NodeClass.Object,
+              ExtensionObject.encode(
+                  client.getStaticSerializationContext(), 
createFolderAttributes(segments[i])),
+              Identifiers.FolderType.expanded()));
+      curNodeId = nextId;
+    }
+
+    final String measurementName = segments[segments.length - 1];
+    sb.append("/").append(measurementName);
+    addNodesItems.add(
+        new AddNodesItem(
+            curNodeId,
+            Identifiers.Organizes,
+            new NodeId(NAME_SPACE_INDEX, sb.toString()).expanded(),
+            new QualifiedName(NAME_SPACE_INDEX, measurementName),
+            NodeClass.Variable,
+            ExtensionObject.encode(
+                client.getStaticSerializationContext(),
+                createMeasurementAttributes(measurementName, opcDataType, 
initialValue)),
+            Identifiers.BaseDataVariableType.expanded()));
+
+    return addNodesItems;
+  }
+
+  public void disconnect() throws Exception {
+    if (Objects.nonNull(client)) {
+      client.disconnect().get();
+    }
+  }
+
+  /////////////////////////////// Getter ///////////////////////////////
+
+  String getNodeUrl() {
+    return nodeUrl;
+  }
+
+  Predicate<EndpointDescription> endpointFilter() {
+    return e -> getSecurityPolicy().getUri().equals(e.getSecurityPolicyUri());
+  }
+
+  SecurityPolicy getSecurityPolicy() {
+    return securityPolicy;
+  }
+
+  IdentityProvider getIdentityProvider() {
+    return identityProvider;
+  }
+
+  @TestOnly
+  public OpcUaClient getClient() {
+    return client;
+  }
+
+  /////////////////////////////// Attribute creator 
///////////////////////////////
+
+  private VariableAttributes createMeasurementAttributes(
+      final String name, final NodeId objectType, final Variant initialValue) {
+    return new VariableAttributes(
+        Unsigned.uint(0xFFFF), // specifiedAttributes
+        LocalizedText.english(name),
+        LocalizedText.english(name),
+        Unsigned.uint(0), // writeMask
+        Unsigned.uint(0), // userWriteMask
+        initialValue,
+        objectType,
+        ValueRanks.Scalar,
+        null, // arrayDimensions
+        AccessLevel.toValue(AccessLevel.READ_WRITE),
+        AccessLevel.toValue(AccessLevel.READ_WRITE),
+        SAMPLING_INTERVAL_PLACEHOLDER,
+        historizing);
+  }
+
+  private static ObjectAttributes createFolderAttributes(final String name) {
+    return new ObjectAttributes(
+        Unsigned.uint(0xFFFF), // specifiedAttributes
+        LocalizedText.english(name),
+        LocalizedText.english(name),
+        Unsigned.uint(0), // writeMask
+        Unsigned.uint(0), // userWriteMask
+        null // notifier
+        );
+  }
+
+  /////////////////////////////// Conflict detection 
///////////////////////////////
+
+  public void setRunner(ClientRunner runner) {
+    this.runner = runner;
+  }
+
+  public void checkEquals(
+      final String user,
+      final String password,
+      final String securityDir,
+      final SecurityPolicy securityPolicy) {
+    runner.checkEquals(user, password, Paths.get(securityDir), securityPolicy);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaKeyStoreLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java
similarity index 98%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaKeyStoreLoader.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java
index b17f27532d7..56b231fb460 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaKeyStoreLoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.sink.protocol.opcua;
+package org.apache.iotdb.db.pipe.sink.protocol.opcua.server;
 
 import org.apache.iotdb.commons.utils.FileUtils;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
similarity index 70%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
index 465a2f67455..04cc251d655 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
@@ -17,10 +17,12 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.sink.protocol.opcua;
+package org.apache.iotdb.db.pipe.sink.protocol.opcua.server;
 
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink;
 import org.apache.iotdb.db.pipe.sink.util.PipeTabletEventSorter;
 import org.apache.iotdb.db.utils.DateTimeUtils;
 import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
@@ -46,33 +48,35 @@ import 
org.eclipse.milo.opcua.sdk.server.nodes.UaVariableNode;
 import org.eclipse.milo.opcua.sdk.server.util.SubscriptionModel;
 import org.eclipse.milo.opcua.stack.core.Identifiers;
 import org.eclipse.milo.opcua.stack.core.UaException;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
 import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
 import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
 import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
 import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
 import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
 import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.nio.file.Paths;
 import java.sql.Date;
 import java.time.LocalDate;
 import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import java.util.UUID;
 
 public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(OpcUaNameSpace.class);
   public static final String NAMESPACE_URI = "urn:apache:iotdb:opc-server";
-  private final boolean isClientServerModel;
   private final SubscriptionModel subscriptionModel;
   private final OpcUaServerBuilder builder;
 
-  OpcUaNameSpace(
-      final OpcUaServer server,
-      final boolean isClientServerModel,
-      final OpcUaServerBuilder builder) {
+  public OpcUaNameSpace(final OpcUaServer server, final OpcUaServerBuilder 
builder) {
     super(server, NAMESPACE_URI);
-    this.isClientServerModel = isClientServerModel;
     this.builder = builder;
 
     subscriptionModel = new SubscriptionModel(server, this);
@@ -93,25 +97,65 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
             });
   }
 
-  void transfer(final Tablet tablet) throws UaException {
-    if (isClientServerModel) {
-      transferTabletForClientServerModel(tablet);
+  public void transfer(final Tablet tablet, final OpcUaSink sink) throws 
Exception {
+    if (sink.isClientServerModel()) {
+      transferTabletForClientServerModel(tablet, sink, 
this::transferTabletRowForClientServerModel);
     } else {
       transferTabletForPubSubModel(tablet);
     }
   }
 
-  private void transferTabletForClientServerModel(final Tablet tablet) {
+  public static void transferTabletForClientServerModel(
+      final Tablet tablet, final OpcUaSink sink, final TabletRowConsumer 
consumer)
+      throws Exception {
+    final List<MeasurementSchema> schemas = tablet.getSchemas();
+    final List<MeasurementSchema> newSchemas = new ArrayList<>();
     new 
PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
 
-    final String[] segments = tablet.deviceId.split("\\.");
+    final List<Long> timestamps = new ArrayList<>();
+    final List<Object> values = new ArrayList<>();
+
+    for (int i = 0; i < schemas.size(); ++i) {
+      for (int j = tablet.rowSize - 1; j >= 0; --j) {
+        if (tablet.bitMaps == null || tablet.bitMaps[i] == null || 
!tablet.bitMaps[i].isMarked(j)) {
+          newSchemas.add(schemas.get(i));
+          timestamps.add(tablet.timestamps[j]);
+          values.add(getTabletObjectValue4Opc(tablet.values[i], j, 
schemas.get(i).getType()));
+          break;
+        }
+      }
+    }
+
+    consumer.accept(tablet.deviceId.split("\\."), newSchemas, timestamps, 
values, sink);
+  }
+
+  @FunctionalInterface
+  public interface TabletRowConsumer {
+    void accept(
+        final String[] segments,
+        final List<MeasurementSchema> measurementSchemas,
+        final List<Long> timestamps,
+        final List<Object> values,
+        final OpcUaSink sink)
+        throws Exception;
+  }
+
+  private void transferTabletRowForClientServerModel(
+      final String[] segments,
+      final List<MeasurementSchema> measurementSchemas,
+      final List<Long> timestamps,
+      final List<Object> values,
+      final OpcUaSink sink) {
     if (segments.length == 0) {
       throw new PipeRuntimeCriticalException("The segments of tablets must 
exist");
     }
     final StringBuilder currentStr = new StringBuilder();
     UaNode folderNode = null;
     NodeId folderNodeId;
-    for (final String segment : segments) {
+    for (int i = 0;
+        i < (Objects.isNull(sink.getValueName()) ? segments.length : 
segments.length - 1);
+        ++i) {
+      final String segment = segments[i];
       final UaNode nextFolderNode;
 
       currentStr.append(segment);
@@ -150,48 +194,61 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
                     () ->
                         new PipeRuntimeCriticalException(
                             String.format(
-                                "The folder node for %s does not exist.", 
tablet.deviceId)));
+                                "The folder node for %s does not exist.",
+                                Arrays.toString(segments))));
       }
     }
 
     final String currentFolder = currentStr.toString();
-    for (int i = 0; i < tablet.getSchemas().size(); ++i) {
-      final MeasurementSchema measurementSchema = tablet.getSchemas().get(i);
-      final String name = measurementSchema.getMeasurementId();
-      final TSDataType type = measurementSchema.getType();
-      final NodeId nodeId = newNodeId(currentFolder + name);
-      final UaVariableNode measurementNode;
 
-      int lastNonnullIndex = -1;
-      for (int j = tablet.rowSize - 1; j >= 0; --j) {
-        if (!tablet.bitMaps[i].isMarked(j)) {
-          lastNonnullIndex = j;
-          break;
+    StatusCode currentQuality = sink.getDefaultQuality();
+    UaVariableNode valueNode = null;
+    Object value = null;
+    long timestamp = 0;
+
+    for (int i = 0; i < measurementSchemas.size(); ++i) {
+      if (Objects.isNull(values.get(i))) {
+        continue;
+      }
+      final String name = measurementSchemas.get(i).getMeasurementId();
+      final TSDataType type = measurementSchemas.get(i).getType();
+      if (Objects.nonNull(sink.getQualityName()) && 
sink.getQualityName().equals(name)) {
+        if (!type.equals(TSDataType.BOOLEAN)) {
+          throw new UnsupportedOperationException(
+              "The quality value only supports boolean type, while true == 
GOOD and false == BAD.");
         }
+        currentQuality = values.get(i) == Boolean.TRUE ? StatusCode.GOOD : 
StatusCode.BAD;
+        continue;
       }
-
-      if (lastNonnullIndex == -1) {
+      if (Objects.nonNull(sink.getValueName()) && 
!sink.getValueName().equals(name)) {
+        PipeLogger.log(
+            LOGGER::warn,
+            "When the 'with-quality' mode is enabled, the measurement must be 
either \"value-name\" or \"quality-name\"");
         continue;
       }
-
-      final long utcTimestamp = 
timestampToUtc(tablet.timestamps[lastNonnullIndex]);
-      final DataValue value =
+      final String nodeName =
+          Objects.isNull(sink.getValueName()) ? name : 
segments[segments.length - 1];
+      final NodeId nodeId = newNodeId(currentFolder + nodeName);
+      final UaVariableNode measurementNode;
+      final long utcTimestamp = 
timestampToUtc(timestamps.get(timestamps.size() > 1 ? i : 0));
+      final DataValue dataValue =
           new DataValue(
-              new Variant(getTabletObjectValue4Opc(tablet.values[i], 
lastNonnullIndex, type)),
-              StatusCode.GOOD,
+              new Variant(values.get(i)),
+              currentQuality,
               new DateTime(utcTimestamp),
               new DateTime());
+
       if (!getNodeManager().containsNode(nodeId)) {
         measurementNode =
             new UaVariableNode.UaVariableNodeBuilder(getNodeContext())
-                .setNodeId(newNodeId(currentFolder + name))
+                .setNodeId(nodeId)
                 .setAccessLevel(AccessLevel.READ_WRITE)
                 .setUserAccessLevel(AccessLevel.READ_ONLY)
-                .setBrowseName(newQualifiedName(name))
-                .setDisplayName(LocalizedText.english(name))
+                .setBrowseName(newQualifiedName(nodeName))
+                .setDisplayName(LocalizedText.english(nodeName))
                 .setDataType(convertToOpcDataType(type))
                 .setTypeDefinition(Identifiers.BaseDataVariableType)
-                .setValue(value)
+                .setValue(dataValue)
                 .build();
         getNodeManager().addNode(measurementNode);
         if (Objects.nonNull(folderNode)) {
@@ -215,15 +272,25 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
                                 String.format("The Node %s does not exist.", 
nodeId)));
       }
 
-      if (Objects.isNull(measurementNode.getValue())
-          || Objects.isNull(measurementNode.getValue().getSourceTime())
-          || measurementNode.getValue().getSourceTime().getUtcTime() < 
utcTimestamp) {
-        measurementNode.setValue(
+      if (Objects.isNull(sink.getValueName())) {
+        if (Objects.isNull(measurementNode.getValue())
+            || Objects.isNull(measurementNode.getValue().getSourceTime())
+            || measurementNode.getValue().getSourceTime().getUtcTime() < 
utcTimestamp) {
+          measurementNode.setValue(dataValue);
+        }
+      } else {
+        valueNode = measurementNode;
+        value = values.get(i);
+        timestamp = utcTimestamp;
+      }
+    }
+    if (Objects.nonNull(valueNode)) {
+      if (Objects.isNull(valueNode.getValue())
+          || Objects.isNull(valueNode.getValue().getSourceTime())
+          || valueNode.getValue().getSourceTime().getUtcTime() < timestamp) {
+        valueNode.setValue(
             new DataValue(
-                new Variant(getTabletObjectValue4Opc(tablet.values[i], 
lastNonnullIndex, type)),
-                StatusCode.GOOD,
-                new DateTime(utcTimestamp),
-                new DateTime()));
+                new Variant(value), currentQuality, new DateTime(timestamp), 
new DateTime()));
       }
     }
   }
@@ -254,7 +321,7 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
     }
   }
 
-  private static long timestampToUtc(final long timeStamp) {
+  public static long timestampToUtc(final long timeStamp) {
     return TimestampPrecisionUtils.currPrecision.toNanos(timeStamp) / 100L + 
116444736000000000L;
   }
 
@@ -271,9 +338,9 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
             .getEventFactory()
             .createEvent(
                 new NodeId(getNamespaceIndex(), UUID.randomUUID()), 
Identifiers.BaseEventType);
+
     // Use eventNode here because other nodes doesn't support values and times 
simultaneously
     for (int columnIndex = 0; columnIndex < tablet.getSchemas().size(); 
++columnIndex) {
-
       final TSDataType dataType = 
tablet.getSchemas().get(columnIndex).getType();
 
       // Source name --> Sensor path, like root.test.d_0.s_0
@@ -309,8 +376,8 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
           case DATE:
             eventNode.setMessage(
                 LocalizedText.english(
-                    (((LocalDate[]) tablet.values[columnIndex])[rowIndex])
-                        .atStartOfDay(ZoneId.systemDefault())
+                    ((LocalDate[]) tablet.values[columnIndex])
+                        [rowIndex].atStartOfDay(ZoneId.systemDefault())
                         .toString()));
             break;
           case INT64:
@@ -355,7 +422,7 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
     eventNode.delete();
   }
 
-  private NodeId convertToOpcDataType(final TSDataType type) {
+  public static NodeId convertToOpcDataType(final TSDataType type) {
     switch (type) {
       case BOOLEAN:
         return Identifiers.Boolean;
@@ -403,11 +470,13 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
 
   /////////////////////////////// Conflict detection 
///////////////////////////////
 
-  void checkEquals(
+  public void checkEquals(
       final String user,
       final String password,
       final String securityDir,
-      final boolean enableAnonymousAccess) {
-    builder.checkEquals(user, password, Paths.get(securityDir), 
enableAnonymousAccess);
+      final boolean enableAnonymousAccess,
+      final Set<SecurityPolicy> securityPolicies) {
+    builder.checkEquals(
+        user, password, Paths.get(securityDir), enableAnonymousAccess, 
securityPolicies);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaServerBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
similarity index 82%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaServerBuilder.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
index bc2df4839e2..61818ecf852 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaServerBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
@@ -17,9 +17,8 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.sink.protocol.opcua;
+package org.apache.iotdb.db.pipe.sink.protocol.opcua.server;
 
-import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import org.eclipse.milo.opcua.sdk.server.OpcUaServer;
@@ -58,6 +57,7 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.security.KeyPair;
 import java.security.cert.X509Certificate;
+import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Objects;
@@ -84,48 +84,45 @@ public class OpcUaServerBuilder implements Closeable {
   private String password;
   private Path securityDir;
   private boolean enableAnonymousAccess;
+  private Set<SecurityPolicy> securityPolicies;
   private DefaultTrustListManager trustListManager;
 
-  OpcUaServerBuilder() {
-    tcpBindPort = 
PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE;
-    httpsBindPort = 
PipeSinkConstant.CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE;
-    user = PipeSinkConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
-    password = PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
-    securityDir = 
Paths.get(PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE);
-    enableAnonymousAccess = 
PipeSinkConstant.CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE;
-  }
-
-  OpcUaServerBuilder setTcpBindPort(final int tcpBindPort) {
+  public OpcUaServerBuilder setTcpBindPort(final int tcpBindPort) {
     this.tcpBindPort = tcpBindPort;
     return this;
   }
 
-  OpcUaServerBuilder setHttpsBindPort(final int httpsBindPort) {
+  public OpcUaServerBuilder setHttpsBindPort(final int httpsBindPort) {
     this.httpsBindPort = httpsBindPort;
     return this;
   }
 
-  OpcUaServerBuilder setUser(final String user) {
+  public OpcUaServerBuilder setUser(final String user) {
     this.user = user;
     return this;
   }
 
-  OpcUaServerBuilder setPassword(final String password) {
+  public OpcUaServerBuilder setPassword(final String password) {
     this.password = password;
     return this;
   }
 
-  OpcUaServerBuilder setSecurityDir(final String securityDir) {
+  public OpcUaServerBuilder setSecurityDir(final String securityDir) {
     this.securityDir = Paths.get(securityDir);
     return this;
   }
 
-  OpcUaServerBuilder setEnableAnonymousAccess(final boolean 
enableAnonymousAccess) {
+  public OpcUaServerBuilder setEnableAnonymousAccess(final boolean 
enableAnonymousAccess) {
     this.enableAnonymousAccess = enableAnonymousAccess;
     return this;
   }
 
-  OpcUaServer build() throws Exception {
+  public OpcUaServerBuilder setSecurityPolicies(final Set<SecurityPolicy> 
securityPolicies) {
+    this.securityPolicies = securityPolicies;
+    return this;
+  }
+
+  public OpcUaServer build() throws Exception {
     Files.createDirectories(securityDir);
     if (!Files.exists(securityDir)) {
       throw new PipeException("Unable to create security dir: " + securityDir);
@@ -247,30 +244,36 @@ public class OpcUaServerBuilder implements Closeable {
                     USER_TOKEN_POLICY_USERNAME,
                     USER_TOKEN_POLICY_X509);
 
-        final EndpointConfiguration.Builder noSecurityBuilder =
-            builder
-                .copy()
-                .setSecurityPolicy(SecurityPolicy.None)
-                .setSecurityMode(MessageSecurityMode.None);
-
-        endpointConfigurations.add(buildTcpEndpoint(noSecurityBuilder, 
tcpBindPort));
-        endpointConfigurations.add(buildHttpsEndpoint(noSecurityBuilder, 
httpsBindPort));
-
-        endpointConfigurations.add(
-            buildTcpEndpoint(
-                builder
-                    .copy()
-                    .setSecurityPolicy(SecurityPolicy.Basic256Sha256)
-                    .setSecurityMode(MessageSecurityMode.SignAndEncrypt),
-                tcpBindPort));
-
-        endpointConfigurations.add(
-            buildHttpsEndpoint(
-                builder
-                    .copy()
-                    .setSecurityPolicy(SecurityPolicy.Basic256Sha256)
-                    .setSecurityMode(MessageSecurityMode.Sign),
-                httpsBindPort));
+        final Set<SecurityPolicy> securityPolicySet = new 
HashSet<>(securityPolicies);
+        if (securityPolicySet.contains(SecurityPolicy.None)) {
+          final EndpointConfiguration.Builder noSecurityBuilder =
+              builder
+                  .copy()
+                  .setSecurityPolicy(SecurityPolicy.None)
+                  .setSecurityMode(MessageSecurityMode.None);
+
+          endpointConfigurations.add(buildTcpEndpoint(noSecurityBuilder, 
tcpBindPort));
+          endpointConfigurations.add(buildHttpsEndpoint(noSecurityBuilder, 
httpsBindPort));
+          securityPolicySet.remove(SecurityPolicy.None);
+        }
+
+        for (final SecurityPolicy securityPolicy : securityPolicySet) {
+          endpointConfigurations.add(
+              buildTcpEndpoint(
+                  builder
+                      .copy()
+                      .setSecurityPolicy(securityPolicy)
+                      .setSecurityMode(MessageSecurityMode.SignAndEncrypt),
+                  tcpBindPort));
+
+          endpointConfigurations.add(
+              buildHttpsEndpoint(
+                  builder
+                      .copy()
+                      .setSecurityPolicy(securityPolicy)
+                      .setSecurityMode(MessageSecurityMode.Sign),
+                  httpsBindPort));
+        }
 
         final EndpointConfiguration.Builder discoveryBuilder =
             builder
@@ -309,7 +312,8 @@ public class OpcUaServerBuilder implements Closeable {
       final String user,
       final String password,
       final Path securityDir,
-      final boolean enableAnonymousAccess) {
+      final boolean enableAnonymousAccess,
+      final Set<SecurityPolicy> securityPolicies) {
     checkEquals("user", this.user, user);
     checkEquals("password", this.password, password);
     checkEquals(
@@ -317,10 +321,15 @@ public class OpcUaServerBuilder implements Closeable {
         
FileSystems.getDefault().getPath(this.securityDir.toAbsolutePath().toString()),
         
FileSystems.getDefault().getPath(securityDir.toAbsolutePath().toString()));
     checkEquals("enableAnonymousAccess option", this.enableAnonymousAccess, 
enableAnonymousAccess);
+    checkEquals("securityPolicies", this.securityPolicies, securityPolicies);
   }
 
-  private void checkEquals(final String attrName, final Object thisAttr, final 
Object thatAttr) {
+  private void checkEquals(final String attrName, Object thisAttr, Object 
thatAttr) {
     if (!Objects.equals(thisAttr, thatAttr)) {
+      if (attrName.equals("password")) {
+        thisAttr = "****";
+        thatAttr = "****";
+      }
       throw new PipeException(
           String.format(
               "The existing server with tcp port %s and https port %s's %s %s 
conflicts to the new %s %s, reject reusing.",
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java
index 2cf154053af..fae64308762 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java
@@ -21,16 +21,26 @@ package org.apache.iotdb.db.pipe.sink;
 
 import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
+import 
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
+import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSinkRuntimeEnvironment;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.sink.protocol.legacy.IoTDBLegacyPipeSink;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink;
 import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
 import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.security.SecureRandom;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 
 public class PipeSinkTest {
 
@@ -92,4 +102,83 @@ public class PipeSinkTest {
       Assert.fail();
     }
   }
+
+  @Test
+  public void testOpcUaSink() {
+    final List<MeasurementSchema> schemaList =
+        Arrays.asList(
+            new MeasurementSchema("s1", TSDataType.INT64),
+            new MeasurementSchema("s2", TSDataType.INT64));
+
+    final Tablet tablet = new Tablet("root.db.d1.vector6", schemaList, 100);
+
+    long timestamp = System.currentTimeMillis();
+    for (long row = 0; row < 100; row++) {
+      final int rowSize = tablet.rowSize;
+      tablet.addTimestamp(rowSize, timestamp);
+      for (int i = 0; i < 2; i++) {
+        tablet.addValue(
+            schemaList.get(i).getMeasurementId(), rowSize, new 
SecureRandom().nextLong());
+      }
+      timestamp++;
+    }
+
+    final List<MeasurementSchema> opcSchemaList =
+        Arrays.asList(
+            new MeasurementSchema("value1", TSDataType.INT64),
+            new MeasurementSchema("quality1", TSDataType.BOOLEAN));
+    final Tablet qualityTablet = new Tablet("root.db.d1.vector6.s3", 
opcSchemaList, 100);
+
+    timestamp = System.currentTimeMillis();
+    for (long row = 0; row < 100; row++) {
+      final int rowSize = qualityTablet.rowSize;
+      qualityTablet.addTimestamp(rowSize, timestamp);
+      qualityTablet.addValue(
+          opcSchemaList.get(0).getMeasurementId(), rowSize, new 
SecureRandom().nextLong());
+      qualityTablet.addValue(opcSchemaList.get(1).getMeasurementId(), rowSize, 
true);
+      timestamp++;
+    }
+
+    try (final OpcUaSink qualityOPC = new OpcUaSink();
+        final OpcUaSink normalOPC = new OpcUaSink()) {
+      final PipeTaskRuntimeConfiguration configuration =
+          new PipeTaskRuntimeConfiguration(new 
PipeTaskSinkRuntimeEnvironment("temp", 0, 1));
+      qualityOPC.customize(
+          new PipeParameters(
+              new HashMap<String, String>() {
+                {
+                  put(
+                      PipeSinkConstant.CONNECTOR_KEY,
+                      BuiltinPipePlugin.OPC_UA_SINK.getPipePluginName());
+                  put(PipeSinkConstant.CONNECTOR_OPC_UA_WITH_QUALITY_KEY, 
"true");
+                  put(PipeSinkConstant.CONNECTOR_OPC_UA_VALUE_NAME_KEY, 
"value1");
+                  put(PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_NAME_KEY, 
"quality1");
+                }
+              }),
+          configuration);
+      normalOPC.customize(
+          new PipeParameters(
+              new HashMap<String, String>() {
+                {
+                  put(
+                      PipeSinkConstant.CONNECTOR_KEY,
+                      BuiltinPipePlugin.OPC_UA_SINK.getPipePluginName());
+                }
+              }),
+          configuration);
+      final PipeRawTabletInsertionEvent event =
+          new PipeRawTabletInsertionEvent(tablet, false, "pipe", 0L, null, 
null, false);
+      event.increaseReferenceCount("");
+      normalOPC.transfer(event);
+      // Shall not throw
+      qualityOPC.transfer(event);
+      event.decreaseReferenceCount("", false);
+
+      qualityOPC.transfer(
+          new PipeRawTabletInsertionEvent(qualityTablet, false, "pipe", 0L, 
null, null, false));
+
+    } catch (Exception e) {
+      Assert.fail();
+    }
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
index 1cd9cfb33c7..2eaf6f903de 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
@@ -29,6 +29,7 @@ import java.io.File;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 import static org.apache.iotdb.commons.conf.IoTDBConstant.MB;
@@ -179,6 +180,57 @@ public class PipeSinkConstant {
       "sink.opcua.enable-anonymous-access";
   public static final boolean 
CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE = true;
 
+  public static final String CONNECTOR_OPC_UA_WITH_QUALITY_KEY = 
"connector.opcua.with-quality";
+  public static final String SINK_OPC_UA_WITH_QUALITY_KEY = 
"sink.opcua.with-quality";
+  public static final boolean CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE = 
false;
+
+  public static final String CONNECTOR_OPC_UA_VALUE_NAME_KEY = 
"connector.opcua.value-name";
+  public static final String SINK_OPC_UA_VALUE_NAME_KEY = 
"sink.opcua.value-name";
+  public static final String CONNECTOR_OPC_UA_VALUE_NAME_DEFAULT_VALUE = 
"value";
+
+  public static final String CONNECTOR_OPC_UA_QUALITY_NAME_KEY = 
"connector.opcua.quality-name";
+  public static final String SINK_OPC_UA_QUALITY_NAME_KEY = 
"sink.opcua.quality-name";
+  public static final String CONNECTOR_OPC_UA_QUALITY_NAME_DEFAULT_VALUE = 
"quality";
+
+  public static final String CONNECTOR_OPC_UA_DEFAULT_QUALITY_KEY =
+      "connector.opcua.default-quality";
+  public static final String SINK_OPC_UA_DEFAULT_QUALITY_KEY = 
"sink.opcua.default-quality";
+  public static final String CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE = 
"GOOD";
+  public static final String CONNECTOR_OPC_UA_DEFAULT_QUALITY_BAD_VALUE = 
"BAD";
+  public static final String CONNECTOR_OPC_UA_DEFAULT_QUALITY_UNCERTAIN_VALUE 
= "UNCERTAIN";
+
+  public static final String CONNECTOR_OPC_UA_NODE_URL_KEY = 
"connector.opcua.node-url";
+  public static final String SINK_OPC_UA_NODE_URL_KEY = "sink.opcua.node-url";
+
+  public static final String CONNECTOR_OPC_UA_SECURITY_POLICY_KEY =
+      "connector.opcua.security-policy";
+  public static final String SINK_OPC_UA_SECURITY_POLICY_KEY = 
"sink.opcua.security-policy";
+  public static final String CONNECTOR_OPC_UA_SECURITY_POLICY_NONE_VALUE = 
"NONE";
+  public static final String 
CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_128_RSA_15_VALUE =
+      "BASIC128RSA15";
+  public static final String CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_VALUE 
= "BASIC256";
+  public static final String 
CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_SHA_256_VALUE =
+      "BASIC256SHA256";
+  public static final String 
CONNECTOR_OPC_UA_SECURITY_POLICY_AES128_SHA256_RSAOAEP_VALUE =
+      "AES128_SHA256_RSAOAEP";
+  public static final String 
CONNECTOR_OPC_UA_SECURITY_POLICY_AES256_SHA256_RSAPSS_VALUE =
+      "AES256_SHA256_RSAPSS";
+
+  public static final List<String> 
CONNECTOR_OPC_UA_SECURITY_POLICY_SERVER_DEFAULT_VALUES =
+      Arrays.asList(
+          CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_SHA_256_VALUE,
+          CONNECTOR_OPC_UA_SECURITY_POLICY_AES128_SHA256_RSAOAEP_VALUE,
+          CONNECTOR_OPC_UA_SECURITY_POLICY_AES256_SHA256_RSAPSS_VALUE);
+
+  public static final String CONNECTOR_OPC_UA_HISTORIZING_KEY = 
"connector.opcua.historizing";
+  public static final String SINK_OPC_UA_HISTORIZING_KEY = 
"sink.opcua.historizing";
+  public static final boolean CONNECTOR_OPC_UA_HISTORIZING_DEFAULT_VALUE = 
false;
+
+  public static final String SINK_OPC_UA_TIMEOUT_SECONDS_KEY = 
"sink.opcua.timeout-seconds";
+  public static final String CONNECTOR_OPC_UA_TIMEOUT_SECONDS_KEY =
+      "connector.opcua.timeout-seconds";
+  public static final long CONNECTOR_OPC_UA_TIMEOUT_SECONDS_DEFAULT_VALUE = 
10L;
+
   public static final String CONNECTOR_LEADER_CACHE_ENABLE_KEY = 
"connector.leader-cache.enable";
   public static final String SINK_LEADER_CACHE_ENABLE_KEY = 
"sink.leader-cache.enable";
   public static final boolean CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE = 
true;
diff --git a/pom.xml b/pom.xml
index eb27c1694b4..e92cace16cf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -410,6 +410,16 @@
                 <artifactId>checker-qual</artifactId>
                 <version>${checker-qual.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.eclipse.milo</groupId>
+                <artifactId>stack-client</artifactId>
+                <version>${milo.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.eclipse.milo</groupId>
+                <artifactId>sdk-client</artifactId>
+                <version>${milo.version}</version>
+            </dependency>
             <!-- TODO: Deprecated: Use Airline 2 or Picocli instead -->
             <dependency>
                 <groupId>io.airlift</groupId>

Reply via email to