This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch opc-ua
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/opc-ua by this push:
new 9a0b216625e qualification
9a0b216625e is described below
commit 9a0b216625eeedaa7eac1805dc6e8f20c48d20c2
Author: Caideyipi <[email protected]>
AuthorDate: Wed Dec 17 15:27:12 2025 +0800
qualification
---
.../api/customizer/parameter/PipeParameters.java | 26 ++++---
.../pipe/sink/protocol/opcua/OpcUaNameSpace.java | 84 +++++++++++++---------
.../db/pipe/sink/protocol/opcua/OpcUaSink.java | 69 ++++++++++++------
.../pipe/config/constant/PipeSinkConstant.java | 12 ++++
4 files changed, 127 insertions(+), 64 deletions(-)
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index afee5f5abba..22e97bbe806 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -367,24 +367,32 @@ public class PipeParameters {
private static class KeyReducer {
- private static final Set<String> PREFIXES = new HashSet<>();
+ private static final Set<String> FIRST_PREFIXES = new HashSet<>();
+ private static final Set<String> SECOND_PREFIXES = new HashSet<>();
static {
- PREFIXES.add("extractor.");
- PREFIXES.add("source.");
- PREFIXES.add("processor.");
- PREFIXES.add("connector.");
- PREFIXES.add("sink.");
+ FIRST_PREFIXES.add("extractor.");
+ FIRST_PREFIXES.add("source.");
+ FIRST_PREFIXES.add("processor.");
+ FIRST_PREFIXES.add("connector.");
+ FIRST_PREFIXES.add("sink.");
+
+ SECOND_PREFIXES.add("opcua.");
}
- static String reduce(final String key) {
+ static String reduce(String key) {
if (key == null) {
return null;
}
final String lowerCaseKey = key.toLowerCase();
- for (final String prefix : PREFIXES) {
+ for (final String prefix : FIRST_PREFIXES) {
+ if (lowerCaseKey.startsWith(prefix)) {
+ key = key.substring(prefix.length());
+ }
+ }
+ for (final String prefix : SECOND_PREFIXES) {
if (lowerCaseKey.startsWith(prefix)) {
- return key.substring(prefix.length());
+ key = key.substring(prefix.length());
}
}
return key;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java
index e9e80f3c3c7..160861c1c5e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.sink.protocol.opcua;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
-import org.apache.iotdb.commons.utils.PathUtils;
import
org.apache.iotdb.db.pipe.sink.util.sorter.PipeTableModelTabletEventSorter;
import
org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter;
import org.apache.iotdb.db.utils.DateTimeUtils;
@@ -70,22 +69,14 @@ import java.util.stream.Collectors;
public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle {
public static final String NAMESPACE_URI = "urn:apache:iotdb:opc-server";
- private final boolean isClientServerModel;
private final SubscriptionModel subscriptionModel;
private final OpcUaServerBuilder builder;
- private final String databaseName;
private final String placeHolder;
OpcUaNameSpace(
- final OpcUaServer server,
- final boolean isClientServerModel,
- final OpcUaServerBuilder builder,
- final String qualifiedDatabaseName,
- final String placeHolder) {
+ final OpcUaServer server, final OpcUaServerBuilder builder, final String
placeHolder) {
super(server, NAMESPACE_URI);
- this.isClientServerModel = isClientServerModel;
this.builder = builder;
- this.databaseName = PathUtils.unQualifyDatabaseName(qualifiedDatabaseName);
this.placeHolder = placeHolder;
subscriptionModel = new SubscriptionModel(server, this);
@@ -106,15 +97,17 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
});
}
- void transfer(final Tablet tablet, final boolean isTableModel) throws
UaException {
- if (isClientServerModel) {
- transferTabletForClientServerModel(tablet, isTableModel);
+ void transfer(final Tablet tablet, final boolean isTableModel, final
OpcUaSink sink)
+ throws UaException {
+ if (sink.isClientServerModel) {
+ transferTabletForClientServerModel(tablet, isTableModel, sink);
} else {
- transferTabletForPubSubModel(tablet, isTableModel);
+ transferTabletForPubSubModel(tablet, isTableModel, sink);
}
}
- private void transferTabletForClientServerModel(final Tablet tablet, final
boolean isTableModel) {
+ private void transferTabletForClientServerModel(
+ final Tablet tablet, final boolean isTableModel, final OpcUaSink sink) {
final List<IMeasurementSchema> schemas = tablet.getSchemas();
final List<IMeasurementSchema> newSchemas = new ArrayList<>();
if (!isTableModel) {
@@ -136,7 +129,7 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
}
transferTabletRowForClientServerModel(
- tablet.getDeviceId().split("\\."), newSchemas, timestamps, values);
+ tablet.getDeviceId().split("\\."), newSchemas, timestamps, values,
sink);
} else {
new PipeTableModelTabletEventSorter(tablet).sortByTimestampIfNecessary();
@@ -151,7 +144,7 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
for (int i = 0; i < tablet.getRowSize(); ++i) {
final Object[] segments = tablet.getDeviceID(i).getSegments();
final String[] folderSegments = new String[segments.length + 1];
- folderSegments[0] = databaseName;
+ folderSegments[0] = sink.unQualifiedDatabaseName;
for (int j = 0; j < segments.length; ++j) {
folderSegments[j + 1] = Objects.isNull(segments[j]) ? placeHolder :
(String) segments[j];
@@ -169,7 +162,8 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
? null
: getTabletObjectValue4Opc(
tablet.getValues()[index], finalI,
schemas.get(index).getType()))
- .collect(Collectors.toList()));
+ .collect(Collectors.toList()),
+ sink);
}
}
}
@@ -178,14 +172,18 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
final String[] segments,
final List<IMeasurementSchema> measurementSchemas,
final List<Long> timestamps,
- final List<Object> values) {
+ final List<Object> values,
+ final OpcUaSink sink) {
if (segments.length == 0) {
throw new PipeRuntimeCriticalException("The segments of tablets must
exist");
}
final StringBuilder currentStr = new StringBuilder();
UaNode folderNode = null;
NodeId folderNodeId;
- for (final String segment : segments) {
+ for (int i = 0;
+ i < (Objects.isNull(sink.valueName) ? segments.length :
segments.length - 1);
+ ++i) {
+ final String segment = segments[i];
final UaNode nextFolderNode;
currentStr.append(segment);
@@ -230,32 +228,50 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
}
final String currentFolder = currentStr.toString();
+ StatusCode currentQuality =
+ Objects.isNull(sink.valueName) ? StatusCode.GOOD :
StatusCode.UNCERTAIN;
for (int i = 0; i < measurementSchemas.size(); ++i) {
if (Objects.isNull(values.get(i))) {
continue;
}
final String name = measurementSchemas.get(i).getMeasurementName();
final TSDataType type = measurementSchemas.get(i).getType();
- final NodeId nodeId = newNodeId(currentFolder + name);
+ if (Objects.nonNull(sink.qualityName) && sink.qualityName.equals(name)) {
+ if (!type.equals(TSDataType.BOOLEAN)) {
+ throw new UnsupportedOperationException(
+ "The quality value only supports boolean type, while true ==
GOOD and false == BAD.");
+ }
+ currentQuality = values.get(i) == Boolean.TRUE ? StatusCode.GOOD :
StatusCode.BAD;
+ continue;
+ }
+ if (Objects.nonNull(sink.valueName) && !sink.valueName.equals(name)) {
+ throw new UnsupportedOperationException(
+ "When the 'with-quality' mode is enabled, the measurement must be
either \"value-name\" or \"quality-name\"");
+ }
+ final String nodeName = Objects.isNull(sink.valueName) ? name :
segments[segments.length - 1];
+ final NodeId nodeId = newNodeId(currentFolder + nodeName);
final UaVariableNode measurementNode;
if (!getNodeManager().containsNode(nodeId)) {
measurementNode =
new UaVariableNode.UaVariableNodeBuilder(getNodeContext())
- .setNodeId(newNodeId(currentFolder + name))
+ .setNodeId(nodeId)
.setAccessLevel(AccessLevel.READ_WRITE)
.setUserAccessLevel(AccessLevel.READ_ONLY)
- .setBrowseName(newQualifiedName(name))
- .setDisplayName(LocalizedText.english(name))
+ .setBrowseName(newQualifiedName(nodeName))
+ .setDisplayName(LocalizedText.english(nodeName))
.setDataType(convertToOpcDataType(type))
.setTypeDefinition(Identifiers.BaseDataVariableType)
.build();
getNodeManager().addNode(measurementNode);
- folderNode.addReference(
- new Reference(
- folderNode.getNodeId(),
- Identifiers.Organizes,
- measurementNode.getNodeId().expanded(),
- true));
+ if (Objects.nonNull(folderNode)) {
+ folderNode.addReference(
+ new Reference(
+ folderNode.getNodeId(), Identifiers.Organizes,
nodeId.expanded(), true));
+ } else {
+ measurementNode.addReference(
+ new Reference(
+ nodeId, Identifiers.Organizes,
Identifiers.ObjectsFolder.expanded(), false));
+ }
} else {
// This must exist
measurementNode =
@@ -275,7 +291,7 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
measurementNode.setValue(
new DataValue(
new Variant(values.get(i)),
- StatusCode.GOOD,
+ currentQuality,
new DateTime(utcTimestamp),
new DateTime()));
}
@@ -319,8 +335,8 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
* @param tablet the tablet to send
* @throws UaException if failed to create {@link Event}
*/
- private void transferTabletForPubSubModel(final Tablet tablet, final boolean
isTableModel)
- throws UaException {
+ private void transferTabletForPubSubModel(
+ final Tablet tablet, final boolean isTableModel, final OpcUaSink sink)
throws UaException {
final BaseEventTypeNode eventNode =
getServer()
.getEventFactory()
@@ -331,7 +347,7 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
if (isTableModel) {
sourceNameList = new ArrayList<>(tablet.getRowSize());
for (int i = 0; i < tablet.getRowSize(); ++i) {
- final StringBuilder idBuilder = new StringBuilder(databaseName);
+ final StringBuilder idBuilder = new
StringBuilder(sink.unQualifiedDatabaseName);
for (final Object segment : tablet.getDeviceID(i).getSegments()) {
idBuilder
.append(TsFileConstant.PATH_SEPARATOR)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
index 2c6f40d769b..035122c6d38 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.sink.protocol.opcua;
import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
@@ -41,6 +42,8 @@ import org.eclipse.milo.opcua.sdk.server.OpcUaServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.io.File;
import java.util.Arrays;
import java.util.Map;
@@ -63,10 +66,14 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CON
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_PUB_SUB_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_PLACEHOLDER_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_PLACEHOLDER_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_NAME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_VALUE_NAME_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_WITH_QUALITY_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USERNAME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USER_KEY;
@@ -74,8 +81,11 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SIN
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_HTTPS_BIND_PORT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_MODEL_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_PLACEHOLDER_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_QUALITY_NAME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_SECURITY_DIR_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_TCP_BIND_PORT_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_VALUE_NAME_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_WITH_QUALITY_KEY;
/**
* Send data in IoTDB based on Opc Ua protocol, using Eclipse Milo. All data
are converted into
@@ -95,6 +105,11 @@ public class OpcUaSink implements PipeConnector {
SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP = new
ConcurrentHashMap<>();
private String serverKey;
+ boolean isClientServerModel;
+ String unQualifiedDatabaseName;
+ String placeHolder;
+ @Nullable String valueName;
+ @Nullable String qualityName;
private OpcUaNameSpace nameSpace;
@Override
@@ -156,10 +171,40 @@ public class OpcUaSink implements PipeConnector {
CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY,
SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY),
CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE);
- final String placeHolder =
+ placeHolder =
parameters.getStringOrDefault(
Arrays.asList(CONNECTOR_OPC_UA_PLACEHOLDER_KEY,
SINK_OPC_UA_PLACEHOLDER_KEY),
CONNECTOR_OPC_UA_PLACEHOLDER_DEFAULT_VALUE);
+ final boolean withQuality =
+ parameters.getBooleanOrDefault(
+ Arrays.asList(CONNECTOR_OPC_UA_WITH_QUALITY_KEY,
SINK_OPC_UA_WITH_QUALITY_KEY),
+ CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE);
+ valueName =
+ withQuality
+ ? parameters.getStringOrDefault(
+ Arrays.asList(CONNECTOR_OPC_UA_VALUE_NAME_KEY,
SINK_OPC_UA_VALUE_NAME_KEY),
+ CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE)
+ : null;
+ qualityName =
+ withQuality
+ ? parameters.getStringOrDefault(
+ Arrays.asList(CONNECTOR_OPC_UA_QUALITY_NAME_KEY,
SINK_OPC_UA_QUALITY_NAME_KEY),
+ CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE)
+ : null;
+ isClientServerModel =
+ parameters
+ .getStringOrDefault(
+ Arrays.asList(CONNECTOR_OPC_UA_MODEL_KEY,
SINK_OPC_UA_MODEL_KEY),
+ CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE)
+ .equals(CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE);
+
+ final DataRegion region =
+ StorageEngine.getInstance()
+ .getDataRegion(new
DataRegionId(configuration.getRuntimeEnvironment().getRegionId()));
+ unQualifiedDatabaseName =
+ Objects.nonNull(region)
+ ? PathUtils.unQualifyDatabaseName(region.getDatabaseName())
+ : "__temp_db";
synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP) {
serverKey = httpsBindPort + ":" + tcpBindPort;
@@ -180,25 +225,7 @@ public class OpcUaSink implements PipeConnector {
.setSecurityDir(securityDir)
.setEnableAnonymousAccess(enableAnonymousAccess);
final OpcUaServer newServer = builder.build();
- final DataRegion region =
- StorageEngine.getInstance()
- .getDataRegion(
- new DataRegionId(
-
configuration.getRuntimeEnvironment().getRegionId()));
- nameSpace =
- new OpcUaNameSpace(
- newServer,
- parameters
- .getStringOrDefault(
- Arrays.asList(
- CONNECTOR_OPC_UA_MODEL_KEY,
SINK_OPC_UA_MODEL_KEY),
- CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE)
-
.equals(CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE),
- builder,
- Objects.nonNull(region)
- ? region.getDatabaseName()
- : "root.__temp_db",
- placeHolder);
+ nameSpace = new OpcUaNameSpace(newServer, builder,
placeHolder);
nameSpace.startup();
newServer.startup().get();
return new Pair<>(new AtomicInteger(0), nameSpace);
@@ -239,7 +266,7 @@ public class OpcUaSink implements PipeConnector {
transferByTablet(
tabletInsertionEvent,
LOGGER,
- (tablet, isTableModel) -> nameSpace.transfer(tablet, isTableModel));
+ (tablet, isTableModel) -> nameSpace.transfer(tablet, isTableModel,
this));
}
public static void transferByTablet(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
index e28989e1f8c..ecdc01237e9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
@@ -182,6 +182,18 @@ public class PipeSinkConstant {
public static final String SINK_OPC_UA_PLACEHOLDER_KEY =
"sink.opcua.placeholder";
public static final String CONNECTOR_OPC_UA_PLACEHOLDER_DEFAULT_VALUE =
"null";
+ public static final String CONNECTOR_OPC_UA_WITH_QUALITY_KEY =
"connector.opcua.with-quality";
+ public static final String SINK_OPC_UA_WITH_QUALITY_KEY =
"sink.opcua.with-quality";
+ public static final boolean CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE =
false;
+
+ public static final String CONNECTOR_OPC_UA_VALUE_NAME_KEY =
"connector.opcua.value-name";
+ public static final String SINK_OPC_UA_VALUE_NAME_KEY =
"sink.opcua.value-name";
+ public static final String CONNECTOR_OPC_UA_VALUE_NAME_DEFAULT_VALUE =
"value";
+
+ public static final String CONNECTOR_OPC_UA_QUALITY_NAME_KEY =
"connector.opcua.quality-name";
+ public static final String SINK_OPC_UA_QUALITY_NAME_KEY =
"sink.opcua.quality-name";
+ public static final String CONNECTOR_OPC_UA_QUALITY_NAME_DEFAULT_VALUE =
"quality";
+
public static final String CONNECTOR_LEADER_CACHE_ENABLE_KEY =
"connector.leader-cache.enable";
public static final String SINK_LEADER_CACHE_ENABLE_KEY =
"sink.leader-cache.enable";
public static final boolean CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE =
true;