This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 72d4d2b3918 [To dev/1.3] Pipe: Optimized the OPC UA logic to avoid
potential bugs (#17310)
72d4d2b3918 is described below
commit 72d4d2b39183f9dda3eaddac022e2a2e21778858
Author: Caideyipi <[email protected]>
AuthorDate: Wed Mar 18 17:30:09 2026 +0800
[To dev/1.3] Pipe: Optimized the OPC UA logic to avoid potential bugs
(#17310)
* fix
* fix
---
.../task/subtask/sink/PipeSinkSubtaskManager.java | 45 +++++++++++--------
.../pipe/sink/protocol/opcua/OpcUaNameSpace.java | 50 +++++++++++++---------
.../pipe/config/constant/PipeSinkConstant.java | 8 ++++
3 files changed, 64 insertions(+), 39 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
index 69ec2d00295..caff425f790 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
@@ -62,10 +62,10 @@ public class PipeSinkSubtaskManager {
public synchronized String register(
final Supplier<? extends PipeSinkSubtaskExecutor> executorSupplier,
- final PipeParameters pipeConnectorParameters,
+ final PipeParameters pipeSinkParameters,
final PipeTaskSinkRuntimeEnvironment environment) {
final String connectorKey =
- pipeConnectorParameters
+ pipeSinkParameters
.getStringOrDefault(
Arrays.asList(PipeSinkConstant.CONNECTOR_KEY,
PipeSinkConstant.SINK_KEY),
BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
@@ -79,23 +79,31 @@ public class PipeSinkSubtaskManager {
environment.getRegionId(),
connectorKey);
- final boolean isDataRegionConnector =
+ final boolean isDataSinkConnector =
StorageEngine.getInstance()
.getAllDataRegionIds()
.contains(new DataRegionId(environment.getRegionId()));
- final int connectorNum;
+ final int sinkNum;
boolean realTimeFirst = false;
- String attributeSortedString =
generateAttributeSortedString(pipeConnectorParameters);
- if (isDataRegionConnector) {
- connectorNum =
- pipeConnectorParameters.getIntOrDefault(
+ String attributeSortedString =
generateAttributeSortedString(pipeSinkParameters);
+ if (isDataSinkConnector) {
+ sinkNum =
+ pipeSinkParameters.getIntOrDefault(
Arrays.asList(
PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
PipeSinkConstant.SINK_IOTDB_PARALLEL_TASKS_KEY),
- PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
+ PipeSinkConstant.SINGLE_THREAD_DEFAULT_SINK.contains(
+ pipeSinkParameters
+ .getStringOrDefault(
+ Arrays.asList(
+ PipeSinkConstant.CONNECTOR_KEY,
PipeSinkConstant.SINK_KEY),
+
BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName())
+ .toLowerCase())
+ ? 1
+ :
PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
realTimeFirst =
- pipeConnectorParameters.getBooleanOrDefault(
+ pipeSinkParameters.getBooleanOrDefault(
Arrays.asList(
PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY,
PipeSinkConstant.SINK_REALTIME_FIRST_KEY),
@@ -104,7 +112,7 @@ public class PipeSinkSubtaskManager {
} else {
// Do not allow parallel tasks for schema region connectors
// to avoid the potential disorder of the schema region data transfer
- connectorNum = 1;
+ sinkNum = 1;
attributeSortedString = "schema_" + attributeSortedString;
}
environment.setAttributeSortedString(attributeSortedString);
@@ -112,8 +120,7 @@ public class PipeSinkSubtaskManager {
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
final PipeSinkSubtaskExecutor executor = executorSupplier.get();
- final List<PipeSinkSubtaskLifeCycle> pipeSinkSubtaskLifeCycleList =
- new ArrayList<>(connectorNum);
+ final List<PipeSinkSubtaskLifeCycle> pipeSinkSubtaskLifeCycleList = new
ArrayList<>(sinkNum);
AtomicInteger counter = new AtomicInteger(0);
// Shared pending queue for all subtasks
@@ -126,20 +133,20 @@ public class PipeSinkSubtaskManager {
((PipeRealtimePriorityBlockingQueue)
pendingQueue).setOfferTsFileCounter(counter);
}
- for (int connectorIndex = 0; connectorIndex < connectorNum;
connectorIndex++) {
+ for (int connectorIndex = 0; connectorIndex < sinkNum; connectorIndex++)
{
final PipeConnector pipeConnector =
- isDataRegionConnector
- ?
PipeDataNodeAgent.plugin().dataRegion().reflectSink(pipeConnectorParameters)
- :
PipeDataNodeAgent.plugin().schemaRegion().reflectSink(pipeConnectorParameters);
+ isDataSinkConnector
+ ?
PipeDataNodeAgent.plugin().dataRegion().reflectSink(pipeSinkParameters)
+ :
PipeDataNodeAgent.plugin().schemaRegion().reflectSink(pipeSinkParameters);
// 1. Construct, validate and customize PipeConnector, and then
handshake (create
// connection) with the target
try {
if (pipeConnector instanceof IoTDBDataRegionAsyncSink) {
((IoTDBDataRegionAsyncSink)
pipeConnector).setTransferTsFileCounter(counter);
}
- pipeConnector.validate(new
PipeParameterValidator(pipeConnectorParameters));
+ pipeConnector.validate(new
PipeParameterValidator(pipeSinkParameters));
pipeConnector.customize(
- pipeConnectorParameters, new
PipeTaskRuntimeConfiguration(environment));
+ pipeSinkParameters, new
PipeTaskRuntimeConfiguration(environment));
pipeConnector.handshake();
} catch (final Exception e) {
try {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java
index f9fbb2d2ad4..a8edc941bb4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java
@@ -156,6 +156,26 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
final TSDataType type = measurementSchema.getType();
final NodeId nodeId = newNodeId(currentFolder + name);
final UaVariableNode measurementNode;
+
+ int lastNonnullIndex = -1;
+ for (int j = tablet.rowSize - 1; j >= 0; --j) {
+ if (!tablet.bitMaps[i].isMarked(j)) {
+ lastNonnullIndex = j;
+ break;
+ }
+ }
+
+ if (lastNonnullIndex == -1) {
+ continue;
+ }
+
+ final long utcTimestamp =
timestampToUtc(tablet.timestamps[lastNonnullIndex]);
+ final DataValue value =
+ new DataValue(
+ new Variant(getTabletObjectValue4Opc(tablet.values[i],
lastNonnullIndex, type)),
+ StatusCode.GOOD,
+ new DateTime(utcTimestamp),
+ new DateTime());
if (!getNodeManager().containsNode(nodeId)) {
measurementNode =
new UaVariableNode.UaVariableNodeBuilder(getNodeContext())
@@ -166,6 +186,7 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
.setDisplayName(LocalizedText.english(name))
.setDataType(convertToOpcDataType(type))
.setTypeDefinition(Identifiers.BaseDataVariableType)
+ .setValue(value)
.build();
getNodeManager().addNode(measurementNode);
folderNode.addOrganizes(measurementNode);
@@ -181,26 +202,15 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
String.format("The Node %s does not exist.",
nodeId)));
}
- int lastNonnullIndex = -1;
- for (int j = tablet.rowSize - 1; j >= 0; --j) {
- if (!tablet.bitMaps[i].isMarked(j)) {
- lastNonnullIndex = j;
- break;
- }
- }
-
- if (lastNonnullIndex != -1) {
- final long utcTimestamp =
timestampToUtc(tablet.timestamps[lastNonnullIndex]);
- if (Objects.isNull(measurementNode.getValue())
- ||
Objects.requireNonNull(measurementNode.getValue().getSourceTime()).getUtcTime()
- < utcTimestamp) {
- measurementNode.setValue(
- new DataValue(
- new Variant(getTabletObjectValue4Opc(tablet.values[i],
lastNonnullIndex, type)),
- StatusCode.GOOD,
- new DateTime(utcTimestamp),
- new DateTime()));
- }
+ if (Objects.isNull(measurementNode.getValue())
+ || Objects.isNull(measurementNode.getValue().getSourceTime())
+ || measurementNode.getValue().getSourceTime().getUtcTime() <
utcTimestamp) {
+ measurementNode.setValue(
+ new DataValue(
+ new Variant(getTabletObjectValue4Opc(tablet.values[i],
lastNonnullIndex, type)),
+ StatusCode.GOOD,
+ new DateTime(utcTimestamp),
+ new DateTime()));
}
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
index 451cd965268..1cd9cfb33c7 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.commons.pipe.config.constant;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import com.github.luben.zstd.Zstd;
@@ -54,6 +55,13 @@ public class PipeSinkConstant {
public static final String SINK_IOTDB_PARALLEL_TASKS_KEY =
"sink.parallel.tasks";
public static final int CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE =
PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum();
+ public static final Set<String> SINGLE_THREAD_DEFAULT_SINK =
+ new HashSet<>(
+ Arrays.asList(
+ BuiltinPipePlugin.OPC_UA_SINK.getPipePluginName(),
+ BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName(),
+ BuiltinPipePlugin.OPC_DA_SINK.getPipePluginName(),
+ BuiltinPipePlugin.OPC_DA_CONNECTOR.getPipePluginName()));
public static final String CONNECTOR_REALTIME_FIRST_KEY =
"connector.realtime-first";
public static final String SINK_REALTIME_FIRST_KEY = "sink.realtime-first";