This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch opc-fix-fix-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/opc-fix-fix-fix by this push:
new 0bff6e6be94 add
0bff6e6be94 is described below
commit 0bff6e6be944f23ea0a2670b04381f11be90551b
Author: Caideyipi <[email protected]>
AuthorDate: Tue Mar 24 12:33:23 2026 +0800
add
---
.../sink/protocol/opcua/server/OpcUaNameSpace.java | 101 +++++++++++++--------
1 file changed, 62 insertions(+), 39 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
index 2c2e860af7b..1985a1e5f6b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
@@ -250,8 +250,8 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
final String currentFolder = currentStr.toString();
StatusCode currentQuality = sink.getDefaultQuality();
- UaVariableNode valueNode = null;
Object value = null;
+ TSDataType dataType = null;
long timestamp = 0;
for (int i = 0; i < measurementSchemas.size(); ++i) {
@@ -274,9 +274,6 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
"When the 'with-quality' mode is enabled, the measurement must be
either \"value-name\" or \"quality-name\"");
continue;
}
- final String nodeName =
- Objects.isNull(sink.getValueName()) ? name :
segments[segments.length - 1];
- final NodeId nodeId = newNodeId(currentFolder + nodeName);
final UaVariableNode measurementNode;
final long utcTimestamp =
timestampToUtc(timestamps.get(timestamps.size() > 1 ? i : 0));
final DataValue dataValue =
@@ -286,52 +283,29 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
new DateTime(utcTimestamp),
new DateTime());
- if (!getNodeManager().containsNode(nodeId)) {
- measurementNode =
- new UaVariableNode.UaVariableNodeBuilder(getNodeContext())
- .setNodeId(nodeId)
- .setAccessLevel(AccessLevel.READ_WRITE)
- .setUserAccessLevel(AccessLevel.READ_ONLY)
- .setBrowseName(newQualifiedName(nodeName))
- .setDisplayName(LocalizedText.english(nodeName))
- .setDataType(convertToOpcDataType(type))
- .setTypeDefinition(Identifiers.BaseDataVariableType)
- .setValue(dataValue)
- .build();
- getNodeManager().addNode(measurementNode);
- if (Objects.nonNull(folderNode)) {
- folderNode.addReference(
- new Reference(
- folderNode.getNodeId(), Identifiers.Organizes,
nodeId.expanded(), true));
- } else {
- measurementNode.addReference(
- new Reference(
- nodeId, Identifiers.Organizes,
Identifiers.ObjectsFolder.expanded(), false));
- }
- } else {
- // This must exist
- measurementNode =
- (UaVariableNode)
- getNodeManager()
- .getNode(nodeId)
- .orElseThrow(
- () ->
- new PipeRuntimeCriticalException(
- String.format("The Node %s does not exist.",
nodeId)));
- }
-
if (Objects.isNull(sink.getValueName())) {
+ measurementNode =
+ addNode(name, currentFolder, folderNode, utcTimestamp, value,
currentQuality, type);
if (Objects.isNull(measurementNode.getValue())
|| Objects.isNull(measurementNode.getValue().getSourceTime())
|| measurementNode.getValue().getSourceTime().getUtcTime() <
utcTimestamp) {
measurementNode.setValue(dataValue);
}
} else {
- valueNode = measurementNode;
value = values.get(i);
timestamp = utcTimestamp;
+ dataType = type;
}
}
+ final UaVariableNode valueNode =
+ addNode(
+ segments[segments.length - 1],
+ currentFolder,
+ folderNode,
+ timestamp,
+ value,
+ currentQuality,
+ dataType);
if (Objects.nonNull(valueNode)) {
if (Objects.isNull(valueNode.getValue())
|| Objects.isNull(valueNode.getValue().getSourceTime())
@@ -343,6 +317,55 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
}
}
+ private UaVariableNode addNode(
+ final String nodeName,
+ final String currentFolder,
+ final UaNode folderNode,
+ final long utcTimestamp,
+ final Object value,
+ final StatusCode currentQuality,
+ final TSDataType type) {
+ final NodeId nodeId = newNodeId(currentFolder + nodeName);
+ final UaVariableNode measurementNode;
+ final DataValue dataValue =
+ new DataValue(
+ new Variant(value), currentQuality, new DateTime(utcTimestamp),
new DateTime());
+
+ if (!getNodeManager().containsNode(nodeId)) {
+ measurementNode =
+ new UaVariableNode.UaVariableNodeBuilder(getNodeContext())
+ .setNodeId(nodeId)
+ .setAccessLevel(AccessLevel.READ_WRITE)
+ .setUserAccessLevel(AccessLevel.READ_ONLY)
+ .setBrowseName(newQualifiedName(nodeName))
+ .setDisplayName(LocalizedText.english(nodeName))
+ .setDataType(convertToOpcDataType(type))
+ .setTypeDefinition(Identifiers.BaseDataVariableType)
+ .setValue(dataValue)
+ .build();
+ getNodeManager().addNode(measurementNode);
+ if (Objects.nonNull(folderNode)) {
+ folderNode.addReference(
+ new Reference(folderNode.getNodeId(), Identifiers.Organizes,
nodeId.expanded(), true));
+ } else {
+ measurementNode.addReference(
+ new Reference(
+ nodeId, Identifiers.Organizes,
Identifiers.ObjectsFolder.expanded(), false));
+ }
+ } else {
+ // This must exist
+ measurementNode =
+ (UaVariableNode)
+ getNodeManager()
+ .getNode(nodeId)
+ .orElseThrow(
+ () ->
+ new PipeRuntimeCriticalException(
+ String.format("The Node %s does not exist.",
nodeId)));
+ }
+ return measurementNode;
+ }
+
private static Object getTabletObjectValue4Opc(
final Object column, final int rowIndex, final TSDataType type) {
switch (type) {