This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch client-opc
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/client-opc by this push:
new 341cb2dfc10 pj
341cb2dfc10 is described below
commit 341cb2dfc10cc242336b058ea384a86efab4be76
Author: Caideyipi <[email protected]>
AuthorDate: Mon Dec 22 14:53:26 2025 +0800
pj
---
.../db/pipe/sink/protocol/opcua/OpcUaSink.java | 26 +++++++++++++++++++
.../opcua/{ => server}/OpcUaKeyStoreLoader.java | 2 +-
.../opcua/{ => server}/OpcUaNameSpace.java | 30 ++++++++++++----------
.../opcua/{ => server}/OpcUaServerBuilder.java | 18 ++++++-------
4 files changed, 52 insertions(+), 24 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
index 7404a8b03bc..9a505ce90f0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaServerBuilder;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.pipe.api.PipeConnector;
@@ -356,4 +358,28 @@ public class OpcUaSink implements PipeConnector {
}
}
}
+
+ // Getter
+
+ public boolean isClientServerModel() {
+ return isClientServerModel;
+ }
+
+ public String getUnQualifiedDatabaseName() {
+ return unQualifiedDatabaseName;
+ }
+
+ public String getPlaceHolder() {
+ return placeHolder;
+ }
+
+ @Nullable
+ public String getValueName() {
+ return valueName;
+ }
+
+ @Nullable
+ public String getQualityName() {
+ return qualityName;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaKeyStoreLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java
similarity index 98%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaKeyStoreLoader.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java
index b17f27532d7..56b231fb460 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaKeyStoreLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.sink.protocol.opcua;
+package org.apache.iotdb.db.pipe.sink.protocol.opcua.server;
import org.apache.iotdb.commons.utils.FileUtils;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
similarity index 94%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
index 6850fba8f20..0a13fc34865 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
@@ -17,10 +17,11 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.sink.protocol.opcua;
+package org.apache.iotdb.db.pipe.sink.protocol.opcua.server;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink;
import
org.apache.iotdb.db.pipe.sink.util.sorter.PipeTableModelTabletEventSorter;
import
org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter;
import org.apache.iotdb.db.utils.DateTimeUtils;
@@ -72,7 +73,7 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
private final SubscriptionModel subscriptionModel;
private final OpcUaServerBuilder builder;
- OpcUaNameSpace(final OpcUaServer server, final OpcUaServerBuilder builder) {
+ public OpcUaNameSpace(final OpcUaServer server, final OpcUaServerBuilder
builder) {
super(server, NAMESPACE_URI);
this.builder = builder;
@@ -94,9 +95,9 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
});
}
- void transfer(final Tablet tablet, final boolean isTableModel, final
OpcUaSink sink)
+ public void transfer(final Tablet tablet, final boolean isTableModel, final
OpcUaSink sink)
throws UaException {
- if (sink.isClientServerModel) {
+ if (sink.isClientServerModel()) {
transferTabletForClientServerModel(tablet, isTableModel, sink);
} else {
transferTabletForPubSubModel(tablet, isTableModel, sink);
@@ -141,11 +142,11 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
for (int i = 0; i < tablet.getRowSize(); ++i) {
final Object[] segments = tablet.getDeviceID(i).getSegments();
final String[] folderSegments = new String[segments.length + 1];
- folderSegments[0] = sink.unQualifiedDatabaseName;
+ folderSegments[0] = sink.getUnQualifiedDatabaseName();
for (int j = 0; j < segments.length; ++j) {
folderSegments[j + 1] =
- Objects.isNull(segments[j]) ? sink.placeHolder : (String)
segments[j];
+ Objects.isNull(segments[j]) ? sink.getPlaceHolder() : (String)
segments[j];
}
final int finalI = i;
@@ -228,7 +229,7 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
final String currentFolder = currentStr.toString();
StatusCode currentQuality =
- Objects.isNull(sink.valueName) ? StatusCode.GOOD :
StatusCode.UNCERTAIN;
+ Objects.isNull(sink.getValueName()) ? StatusCode.GOOD :
StatusCode.UNCERTAIN;
UaVariableNode valueNode = null;
Object value = null;
long timestamp = 0;
@@ -239,7 +240,7 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
}
final String name = measurementSchemas.get(i).getMeasurementName();
final TSDataType type = measurementSchemas.get(i).getType();
- if (Objects.nonNull(sink.qualityName) && sink.qualityName.equals(name)) {
+ if (Objects.nonNull(sink.getQualityName()) &&
sink.getQualityName().equals(name)) {
if (!type.equals(TSDataType.BOOLEAN)) {
throw new UnsupportedOperationException(
"The quality value only supports boolean type, while true ==
GOOD and false == BAD.");
@@ -247,11 +248,12 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
currentQuality = values.get(i) == Boolean.TRUE ? StatusCode.GOOD :
StatusCode.BAD;
continue;
}
- if (Objects.nonNull(sink.valueName) && !sink.valueName.equals(name)) {
+ if (Objects.nonNull(sink.getValueName()) &&
!sink.getValueName().equals(name)) {
throw new UnsupportedOperationException(
"When the 'with-quality' mode is enabled, the measurement must be
either \"value-name\" or \"quality-name\"");
}
- final String nodeName = Objects.isNull(sink.valueName) ? name :
segments[segments.length - 1];
+ final String nodeName =
+ Objects.isNull(sink.getValueName()) ? name :
segments[segments.length - 1];
final NodeId nodeId = newNodeId(currentFolder + nodeName);
final UaVariableNode measurementNode;
if (!getNodeManager().containsNode(nodeId)) {
@@ -288,7 +290,7 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
}
final long utcTimestamp =
timestampToUtc(timestamps.get(timestamps.size() > 1 ? i : 0));
- if (Objects.isNull(sink.valueName)) {
+ if (Objects.isNull(sink.getValueName())) {
if (Objects.isNull(measurementNode.getValue())
||
Objects.requireNonNull(measurementNode.getValue().getSourceTime()).getUtcTime()
< utcTimestamp) {
@@ -365,11 +367,11 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
if (isTableModel) {
sourceNameList = new ArrayList<>(tablet.getRowSize());
for (int i = 0; i < tablet.getRowSize(); ++i) {
- final StringBuilder idBuilder = new
StringBuilder(sink.unQualifiedDatabaseName);
+ final StringBuilder idBuilder = new
StringBuilder(sink.getUnQualifiedDatabaseName());
for (final Object segment : tablet.getDeviceID(i).getSegments()) {
idBuilder
.append(TsFileConstant.PATH_SEPARATOR)
- .append(Objects.isNull(segment) ? sink.placeHolder : segment);
+ .append(Objects.isNull(segment) ? sink.getPlaceHolder() :
segment);
}
sourceNameList.add(idBuilder.toString());
}
@@ -521,7 +523,7 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
/////////////////////////////// Conflict detection
///////////////////////////////
- void checkEquals(
+ public void checkEquals(
final String user,
final String password,
final String securityDir,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaServerBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
similarity index 95%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaServerBuilder.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
index bc2df4839e2..1d433482853 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaServerBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.sink.protocol.opcua;
+package org.apache.iotdb.db.pipe.sink.protocol.opcua.server;
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -86,7 +86,7 @@ public class OpcUaServerBuilder implements Closeable {
private boolean enableAnonymousAccess;
private DefaultTrustListManager trustListManager;
- OpcUaServerBuilder() {
+ public OpcUaServerBuilder() {
tcpBindPort =
PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE;
httpsBindPort =
PipeSinkConstant.CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE;
user = PipeSinkConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
@@ -95,37 +95,37 @@ public class OpcUaServerBuilder implements Closeable {
enableAnonymousAccess =
PipeSinkConstant.CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE;
}
- OpcUaServerBuilder setTcpBindPort(final int tcpBindPort) {
+ public OpcUaServerBuilder setTcpBindPort(final int tcpBindPort) {
this.tcpBindPort = tcpBindPort;
return this;
}
- OpcUaServerBuilder setHttpsBindPort(final int httpsBindPort) {
+ public OpcUaServerBuilder setHttpsBindPort(final int httpsBindPort) {
this.httpsBindPort = httpsBindPort;
return this;
}
- OpcUaServerBuilder setUser(final String user) {
+ public OpcUaServerBuilder setUser(final String user) {
this.user = user;
return this;
}
- OpcUaServerBuilder setPassword(final String password) {
+ public OpcUaServerBuilder setPassword(final String password) {
this.password = password;
return this;
}
- OpcUaServerBuilder setSecurityDir(final String securityDir) {
+ public OpcUaServerBuilder setSecurityDir(final String securityDir) {
this.securityDir = Paths.get(securityDir);
return this;
}
- OpcUaServerBuilder setEnableAnonymousAccess(final boolean
enableAnonymousAccess) {
+ public OpcUaServerBuilder setEnableAnonymousAccess(final boolean
enableAnonymousAccess) {
this.enableAnonymousAccess = enableAnonymousAccess;
return this;
}
- OpcUaServer build() throws Exception {
+ public OpcUaServer build() throws Exception {
Files.createDirectories(securityDir);
if (!Files.exists(securityDir)) {
throw new PipeException("Unable to create security dir: " + securityDir);