This is an automated email from the ASF dual-hosted git repository.
Caideyipi pushed a commit to branch opc-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/opc-fix by this push:
new 5a7859bd371 fix
5a7859bd371 is described below
commit 5a7859bd37141463442df75fd40077bcbf9a7285
Author: Caideyipi <[email protected]>
AuthorDate: Tue Apr 21 17:33:14 2026 +0800
fix
---
.../protocol/opcua/client/IoTDBOpcUaClient.java | 65 +++++++++++++++++-----
1 file changed, 50 insertions(+), 15 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
index 2019c0fe833..7595b75747e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink;
import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace;
import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.MeasurementSchema;
@@ -56,9 +57,10 @@ import
org.eclipse.milo.opcua.stack.core.types.structured.VariableAttributes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.nio.file.Paths;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
@@ -130,7 +132,6 @@ public class IoTDBOpcUaClient {
StatusCode currentQuality = sink.getDefaultQuality();
Object value = null;
long timestamp = 0;
- NodeId nodeId = null;
NodeId opcDataType = null;
for (int i = 0; i < measurementSchemas.size(); ++i) {
@@ -153,17 +154,43 @@ public class IoTDBOpcUaClient {
"When the 'with-quality' mode is enabled, the measurement must be
either \"value-name\" or \"quality-name\"");
continue;
}
- nodeId = new NodeId(NAME_SPACE_INDEX, String.join("/", segments));
final long utcTimestamp =
timestampToUtc(timestamps.get(timestamps.size() > 1 ? i : 0));
- value = values.get(i);
- timestamp = utcTimestamp;
- opcDataType = convertToOpcDataType(type);
+ if (Objects.isNull(sink.getValueName())) {
+ writeValue(
+ values.get(i),
+ utcTimestamp,
+ convertToOpcDataType(type),
+ currentQuality,
+ segments,
+ name);
+ } else {
+ value = values.get(i);
+ timestamp = utcTimestamp;
+ opcDataType = convertToOpcDataType(type);
+ }
}
if (Objects.isNull(value)) {
return;
}
+ writeValue(value, timestamp, opcDataType, currentQuality, segments, null);
+ }
+
+ private void writeValue(
+ final Object value,
+ final long timestamp,
+ final NodeId opcDataType,
+ final StatusCode currentQuality,
+ final String[] segments,
+ final @Nullable String name)
+ throws Exception {
+ final NodeId nodeId =
+ new NodeId(
+ NAME_SPACE_INDEX,
+ Objects.nonNull(name)
+ ? String.join("/", segments) + "/" + name
+ : String.join("/", segments));
final Variant variant = new Variant(value);
final DataValue dataValue =
new DataValue(variant, currentQuality, new DateTime(timestamp), new
DateTime());
@@ -171,36 +198,41 @@ public class IoTDBOpcUaClient {
if (writeStatus.getValue() == StatusCodes.Bad_NodeIdUnknown) {
final AddNodesResponse addStatus =
- client.addNodes(getNodesToAdd(segments, opcDataType, variant)).get();
+ client.addNodes(getNodesToAdd(segments, name, opcDataType,
variant)).get();
for (final AddNodesResult result : addStatus.getResults()) {
if (!result.getStatusCode().equals(StatusCode.GOOD)
&& !(result.getStatusCode().getValue() ==
StatusCodes.Bad_NodeIdExists)) {
throw new PipeException(
"Failed to create nodes after transfer data value, creation
status: "
+ addStatus
- + getErrorString(segments, opcDataType, value, writeStatus));
+ + getErrorString(segments, name, opcDataType, value,
writeStatus));
}
}
writeStatus = client.writeValue(nodeId, dataValue).get();
if (writeStatus.getValue() != StatusCode.GOOD.getValue()) {
throw new PipeException(
"Failed to transfer dataValue after successfully created nodes"
- + getErrorString(segments, opcDataType, value, writeStatus));
+ + getErrorString(segments, name, opcDataType, value,
writeStatus));
}
} else if (writeStatus.getValue() != StatusCode.GOOD.getValue()) {
throw new PipeException(
"Failed to transfer dataValue"
- + getErrorString(segments, opcDataType, value, writeStatus));
+ + getErrorString(segments, name, opcDataType, value,
writeStatus));
}
}
private static String getErrorString(
final String[] segments,
+ final @Nullable String name,
final NodeId dataType,
final Object value,
final StatusCode writeStatus) {
- return ", segments: "
- + Arrays.toString(segments)
+ return ", measurement: "
+ + (Objects.nonNull(name)
+ ? String.join(TsFileConstant.PATH_SEPARATOR, segments)
+ + TsFileConstant.PATH_SEPARATOR
+ + name
+ : String.join(TsFileConstant.PATH_SEPARATOR, segments))
+ ", dataType: "
+ dataType
+ ", value: "
@@ -210,7 +242,10 @@ public class IoTDBOpcUaClient {
}
public List<AddNodesItem> getNodesToAdd(
- final String[] segments, final NodeId opcDataType, final Variant
initialValue) {
+ final String[] segments,
+ final @Nullable String name,
+ final NodeId opcDataType,
+ final Variant initialValue) {
final List<AddNodesItem> addNodesItems = new ArrayList<>();
final StringBuilder sb = new StringBuilder(segments[0]);
ExpandedNodeId curNodeId = new NodeId(NAME_SPACE_INDEX,
segments[0]).expanded();
@@ -226,7 +261,7 @@ public class IoTDBOpcUaClient {
Identifiers.FolderType.expanded()));
// segments.length >= 3
- for (int i = 1; i < segments.length - 1; ++i) {
+ for (int i = 1; i < (Objects.nonNull(name) ? segments.length :
segments.length - 1); ++i) {
sb.append("/").append(segments[i]);
final ExpandedNodeId nextId = new NodeId(NAME_SPACE_INDEX,
sb.toString()).expanded();
addNodesItems.add(
@@ -242,7 +277,7 @@ public class IoTDBOpcUaClient {
curNodeId = nextId;
}
- final String measurementName = segments[segments.length - 1];
+ final String measurementName = Objects.nonNull(name) ? name :
segments[segments.length - 1];
sb.append("/").append(measurementName);
addNodesItems.add(
new AddNodesItem(