This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch client-opc
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/client-opc by this push:
     new 341cb2dfc10 pj
341cb2dfc10 is described below

commit 341cb2dfc10cc242336b058ea384a86efab4be76
Author: Caideyipi <[email protected]>
AuthorDate: Mon Dec 22 14:53:26 2025 +0800

    pj
---
 .../db/pipe/sink/protocol/opcua/OpcUaSink.java     | 26 +++++++++++++++++++
 .../opcua/{ => server}/OpcUaKeyStoreLoader.java    |  2 +-
 .../opcua/{ => server}/OpcUaNameSpace.java         | 30 ++++++++++++----------
 .../opcua/{ => server}/OpcUaServerBuilder.java     | 18 ++++++-------
 4 files changed, 52 insertions(+), 24 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
index 7404a8b03bc..9a505ce90f0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaServerBuilder;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.pipe.api.PipeConnector;
@@ -356,4 +358,28 @@ public class OpcUaSink implements PipeConnector {
       }
     }
   }
+
+  // Getter
+
+  public boolean isClientServerModel() {
+    return isClientServerModel;
+  }
+
+  public String getUnQualifiedDatabaseName() {
+    return unQualifiedDatabaseName;
+  }
+
+  public String getPlaceHolder() {
+    return placeHolder;
+  }
+
+  @Nullable
+  public String getValueName() {
+    return valueName;
+  }
+
+  @Nullable
+  public String getQualityName() {
+    return qualityName;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaKeyStoreLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java
similarity index 98%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaKeyStoreLoader.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java
index b17f27532d7..56b231fb460 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaKeyStoreLoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.sink.protocol.opcua;
+package org.apache.iotdb.db.pipe.sink.protocol.opcua.server;
 
 import org.apache.iotdb.commons.utils.FileUtils;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
similarity index 94%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
index 6850fba8f20..0a13fc34865 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
@@ -17,10 +17,11 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.sink.protocol.opcua;
+package org.apache.iotdb.db.pipe.sink.protocol.opcua.server;
 
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink;
 import 
org.apache.iotdb.db.pipe.sink.util.sorter.PipeTableModelTabletEventSorter;
 import 
org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter;
 import org.apache.iotdb.db.utils.DateTimeUtils;
@@ -72,7 +73,7 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
   private final SubscriptionModel subscriptionModel;
   private final OpcUaServerBuilder builder;
 
-  OpcUaNameSpace(final OpcUaServer server, final OpcUaServerBuilder builder) {
+  public OpcUaNameSpace(final OpcUaServer server, final OpcUaServerBuilder 
builder) {
     super(server, NAMESPACE_URI);
     this.builder = builder;
 
@@ -94,9 +95,9 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
             });
   }
 
-  void transfer(final Tablet tablet, final boolean isTableModel, final 
OpcUaSink sink)
+  public void transfer(final Tablet tablet, final boolean isTableModel, final 
OpcUaSink sink)
       throws UaException {
-    if (sink.isClientServerModel) {
+    if (sink.isClientServerModel()) {
       transferTabletForClientServerModel(tablet, isTableModel, sink);
     } else {
       transferTabletForPubSubModel(tablet, isTableModel, sink);
@@ -141,11 +142,11 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
       for (int i = 0; i < tablet.getRowSize(); ++i) {
         final Object[] segments = tablet.getDeviceID(i).getSegments();
         final String[] folderSegments = new String[segments.length + 1];
-        folderSegments[0] = sink.unQualifiedDatabaseName;
+        folderSegments[0] = sink.getUnQualifiedDatabaseName();
 
         for (int j = 0; j < segments.length; ++j) {
           folderSegments[j + 1] =
-              Objects.isNull(segments[j]) ? sink.placeHolder : (String) 
segments[j];
+              Objects.isNull(segments[j]) ? sink.getPlaceHolder() : (String) 
segments[j];
         }
 
         final int finalI = i;
@@ -228,7 +229,7 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
     final String currentFolder = currentStr.toString();
 
     StatusCode currentQuality =
-        Objects.isNull(sink.valueName) ? StatusCode.GOOD : 
StatusCode.UNCERTAIN;
+        Objects.isNull(sink.getValueName()) ? StatusCode.GOOD : 
StatusCode.UNCERTAIN;
     UaVariableNode valueNode = null;
     Object value = null;
     long timestamp = 0;
@@ -239,7 +240,7 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
       }
       final String name = measurementSchemas.get(i).getMeasurementName();
       final TSDataType type = measurementSchemas.get(i).getType();
-      if (Objects.nonNull(sink.qualityName) && sink.qualityName.equals(name)) {
+      if (Objects.nonNull(sink.getQualityName()) && 
sink.getQualityName().equals(name)) {
         if (!type.equals(TSDataType.BOOLEAN)) {
           throw new UnsupportedOperationException(
               "The quality value only supports boolean type, while true == 
GOOD and false == BAD.");
@@ -247,11 +248,12 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
         currentQuality = values.get(i) == Boolean.TRUE ? StatusCode.GOOD : 
StatusCode.BAD;
         continue;
       }
-      if (Objects.nonNull(sink.valueName) && !sink.valueName.equals(name)) {
+      if (Objects.nonNull(sink.getValueName()) && 
!sink.getValueName().equals(name)) {
         throw new UnsupportedOperationException(
             "When the 'with-quality' mode is enabled, the measurement must be 
either \"value-name\" or \"quality-name\"");
       }
-      final String nodeName = Objects.isNull(sink.valueName) ? name : 
segments[segments.length - 1];
+      final String nodeName =
+          Objects.isNull(sink.getValueName()) ? name : 
segments[segments.length - 1];
       final NodeId nodeId = newNodeId(currentFolder + nodeName);
       final UaVariableNode measurementNode;
       if (!getNodeManager().containsNode(nodeId)) {
@@ -288,7 +290,7 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
       }
 
       final long utcTimestamp = 
timestampToUtc(timestamps.get(timestamps.size() > 1 ? i : 0));
-      if (Objects.isNull(sink.valueName)) {
+      if (Objects.isNull(sink.getValueName())) {
         if (Objects.isNull(measurementNode.getValue())
             || 
Objects.requireNonNull(measurementNode.getValue().getSourceTime()).getUtcTime()
                 < utcTimestamp) {
@@ -365,11 +367,11 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
     if (isTableModel) {
       sourceNameList = new ArrayList<>(tablet.getRowSize());
       for (int i = 0; i < tablet.getRowSize(); ++i) {
-        final StringBuilder idBuilder = new 
StringBuilder(sink.unQualifiedDatabaseName);
+        final StringBuilder idBuilder = new 
StringBuilder(sink.getUnQualifiedDatabaseName());
         for (final Object segment : tablet.getDeviceID(i).getSegments()) {
           idBuilder
               .append(TsFileConstant.PATH_SEPARATOR)
-              .append(Objects.isNull(segment) ? sink.placeHolder : segment);
+              .append(Objects.isNull(segment) ? sink.getPlaceHolder() : 
segment);
         }
         sourceNameList.add(idBuilder.toString());
       }
@@ -521,7 +523,7 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
 
   /////////////////////////////// Conflict detection 
///////////////////////////////
 
-  void checkEquals(
+  public void checkEquals(
       final String user,
       final String password,
       final String securityDir,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaServerBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
similarity index 95%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaServerBuilder.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
index bc2df4839e2..1d433482853 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaServerBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.sink.protocol.opcua;
+package org.apache.iotdb.db.pipe.sink.protocol.opcua.server;
 
 import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
 import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -86,7 +86,7 @@ public class OpcUaServerBuilder implements Closeable {
   private boolean enableAnonymousAccess;
   private DefaultTrustListManager trustListManager;
 
-  OpcUaServerBuilder() {
+  public OpcUaServerBuilder() {
     tcpBindPort = 
PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE;
     httpsBindPort = 
PipeSinkConstant.CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE;
     user = PipeSinkConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
@@ -95,37 +95,37 @@ public class OpcUaServerBuilder implements Closeable {
     enableAnonymousAccess = 
PipeSinkConstant.CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE;
   }
 
-  OpcUaServerBuilder setTcpBindPort(final int tcpBindPort) {
+  public OpcUaServerBuilder setTcpBindPort(final int tcpBindPort) {
     this.tcpBindPort = tcpBindPort;
     return this;
   }
 
-  OpcUaServerBuilder setHttpsBindPort(final int httpsBindPort) {
+  public OpcUaServerBuilder setHttpsBindPort(final int httpsBindPort) {
     this.httpsBindPort = httpsBindPort;
     return this;
   }
 
-  OpcUaServerBuilder setUser(final String user) {
+  public OpcUaServerBuilder setUser(final String user) {
     this.user = user;
     return this;
   }
 
-  OpcUaServerBuilder setPassword(final String password) {
+  public OpcUaServerBuilder setPassword(final String password) {
     this.password = password;
     return this;
   }
 
-  OpcUaServerBuilder setSecurityDir(final String securityDir) {
+  public OpcUaServerBuilder setSecurityDir(final String securityDir) {
     this.securityDir = Paths.get(securityDir);
     return this;
   }
 
-  OpcUaServerBuilder setEnableAnonymousAccess(final boolean 
enableAnonymousAccess) {
+  public OpcUaServerBuilder setEnableAnonymousAccess(final boolean 
enableAnonymousAccess) {
     this.enableAnonymousAccess = enableAnonymousAccess;
     return this;
   }
 
-  OpcUaServer build() throws Exception {
+  public OpcUaServer build() throws Exception {
     Files.createDirectories(securityDir);
     if (!Files.exists(securityDir)) {
       throw new PipeException("Unable to create security dir: " + securityDir);

Reply via email to