This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 12f655ff763 Pipe: Optimized the OPC UA logic to avoid potential bugs
(#17309)
12f655ff763 is described below
commit 12f655ff763de846d9985844b0d52881eab97c52
Author: Caideyipi <[email protected]>
AuthorDate: Wed Mar 18 17:31:49 2026 +0800
Pipe: Optimized the OPC UA logic to avoid potential bugs (#17309)
* fix
* fix
---
.../agent/task/subtask/sink/PipeSinkSubtask.java | 12 +-
.../task/subtask/sink/PipeSinkSubtaskManager.java | 65 +++---
.../metric/sink/PipeDataRegionSinkMetrics.java | 241 ++++++++++-----------
.../sink/protocol/opcua/server/OpcUaNameSpace.java | 25 ++-
.../pipe/config/constant/PipeSinkConstant.java | 8 +
5 files changed, 183 insertions(+), 168 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 91bccc6cbda..c6ab2946005 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -56,7 +56,7 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask {
// Record these variables to provide corresponding value to tag key of
monitoring metrics
private final String attributeSortedString;
- private final int connectorIndex;
+ private final int sinkIndex;
// Now parallel connectors run the same time, thus the heartbeat events are
not sure
// to trigger the general event transfer function, causing potentially such
as
@@ -68,12 +68,12 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
final String taskID,
final long creationTime,
final String attributeSortedString,
- final int connectorIndex,
+ final int sinkIndex,
final UnboundedBlockingPendingQueue<Event> inputPendingQueue,
final PipeConnector outputPipeConnector) {
super(taskID, creationTime, outputPipeConnector);
this.attributeSortedString = attributeSortedString;
- this.connectorIndex = connectorIndex;
+ this.sinkIndex = sinkIndex;
this.inputPendingQueue = inputPendingQueue;
if (!attributeSortedString.startsWith("schema_")) {
@@ -256,8 +256,8 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
return attributeSortedString;
}
- public int getConnectorIndex() {
- return connectorIndex;
+ public int getSinkIndex() {
+ return sinkIndex;
}
public int getTsFileInsertionEventCount() {
@@ -275,7 +275,7 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
+ (lastEvent instanceof PipeHeartbeatEvent ? 1 : 0);
}
- public int getAsyncConnectorRetryEventQueueSize() {
+ public int getAsyncSinkRetryEventQueueSize() {
return outputPipeSink instanceof IoTDBDataRegionAsyncSink
? ((IoTDBDataRegionAsyncSink) outputPipeSink).getRetryEventQueueSize()
: 0;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
index 3a461fe0395..9138a075918 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
@@ -64,10 +64,10 @@ public class PipeSinkSubtaskManager {
public synchronized String register(
final Supplier<? extends PipeSinkSubtaskExecutor> executorSupplier,
- final PipeParameters pipeConnectorParameters,
+ final PipeParameters pipeSinkParameters,
final PipeTaskSinkRuntimeEnvironment environment) {
final String connectorKey =
- pipeConnectorParameters
+ pipeSinkParameters
.getStringOrDefault(
Arrays.asList(PipeSinkConstant.CONNECTOR_KEY,
PipeSinkConstant.SINK_KEY),
BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
@@ -81,24 +81,32 @@ public class PipeSinkSubtaskManager {
environment.getRegionId(),
connectorKey);
- final boolean isDataRegionConnector =
+ final boolean isDataRegionSink =
StorageEngine.getInstance()
.getAllDataRegionIds()
.contains(new DataRegionId(environment.getRegionId()))
|| PipeRuntimeMeta.isSourceExternal(environment.getRegionId());
- final int connectorNum;
+ final int sinkNum;
boolean realTimeFirst = false;
- String attributeSortedString =
generateAttributeSortedString(pipeConnectorParameters);
- if (isDataRegionConnector) {
- connectorNum =
- pipeConnectorParameters.getIntOrDefault(
+ String attributeSortedString =
generateAttributeSortedString(pipeSinkParameters);
+ if (isDataRegionSink) {
+ sinkNum =
+ pipeSinkParameters.getIntOrDefault(
Arrays.asList(
PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
PipeSinkConstant.SINK_IOTDB_PARALLEL_TASKS_KEY),
- PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
+ PipeSinkConstant.SINGLE_THREAD_DEFAULT_SINK.contains(
+ pipeSinkParameters
+ .getStringOrDefault(
+ Arrays.asList(
+ PipeSinkConstant.CONNECTOR_KEY,
PipeSinkConstant.SINK_KEY),
+
BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName())
+ .toLowerCase())
+ ? 1
+ :
PipeSinkConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
realTimeFirst =
- pipeConnectorParameters.getBooleanOrDefault(
+ pipeSinkParameters.getBooleanOrDefault(
Arrays.asList(
PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY,
PipeSinkConstant.SINK_REALTIME_FIRST_KEY),
@@ -107,15 +115,14 @@ public class PipeSinkSubtaskManager {
} else {
// Do not allow parallel tasks for schema region connectors
// to avoid the potential disorder of the schema region data transfer
- connectorNum = 1;
+ sinkNum = 1;
attributeSortedString = "schema_" + attributeSortedString;
}
environment.setAttributeSortedString(attributeSortedString);
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
final PipeSinkSubtaskExecutor executor = executorSupplier.get();
- final List<PipeSinkSubtaskLifeCycle> pipeSinkSubtaskLifeCycleList =
- new ArrayList<>(connectorNum);
+ final List<PipeSinkSubtaskLifeCycle> pipeSinkSubtaskLifeCycleList = new
ArrayList<>(sinkNum);
AtomicInteger counter = new AtomicInteger(0);
// Shared pending queue for all subtasks
@@ -128,24 +135,23 @@ public class PipeSinkSubtaskManager {
((PipeRealtimePriorityBlockingQueue)
pendingQueue).setOfferTsFileCounter(counter);
}
- for (int connectorIndex = 0; connectorIndex < connectorNum;
connectorIndex++) {
- final PipeConnector pipeConnector =
- isDataRegionConnector
- ?
PipeDataNodeAgent.plugin().dataRegion().reflectSink(pipeConnectorParameters)
- :
PipeDataNodeAgent.plugin().schemaRegion().reflectSink(pipeConnectorParameters);
+ for (int sinkIndex = 0; sinkIndex < sinkNum; sinkIndex++) {
+ final PipeConnector pipeSink =
+ isDataRegionSink
+ ?
PipeDataNodeAgent.plugin().dataRegion().reflectSink(pipeSinkParameters)
+ :
PipeDataNodeAgent.plugin().schemaRegion().reflectSink(pipeSinkParameters);
// 1. Construct, validate and customize PipeConnector, and then
handshake (create
// connection) with the target
try {
- if (pipeConnector instanceof IoTDBDataRegionAsyncSink) {
- ((IoTDBDataRegionAsyncSink)
pipeConnector).setTransferTsFileCounter(counter);
+ if (pipeSink instanceof IoTDBDataRegionAsyncSink) {
+ ((IoTDBDataRegionAsyncSink)
pipeSink).setTransferTsFileCounter(counter);
}
- pipeConnector.validate(new
PipeParameterValidator(pipeConnectorParameters));
- pipeConnector.customize(
- pipeConnectorParameters, new
PipeTaskRuntimeConfiguration(environment));
- pipeConnector.handshake();
+ pipeSink.validate(new PipeParameterValidator(pipeSinkParameters));
+ pipeSink.customize(pipeSinkParameters, new
PipeTaskRuntimeConfiguration(environment));
+ pipeSink.handshake();
} catch (final Exception e) {
try {
- pipeConnector.close();
+ pipeSink.close();
} catch (final Exception closeException) {
LOGGER.warn(
"Failed to close connector after failed to initialize
connector. "
@@ -160,20 +166,19 @@ public class PipeSinkSubtaskManager {
final PipeSinkSubtask pipeSinkSubtask =
new PipeSinkSubtask(
String.format(
- "%s_%s_%s",
- attributeSortedString, environment.getCreationTime(),
connectorIndex),
+ "%s_%s_%s", attributeSortedString,
environment.getCreationTime(), sinkIndex),
environment.getCreationTime(),
attributeSortedString,
- connectorIndex,
+ sinkIndex,
pendingQueue,
- pipeConnector);
+ pipeSink);
final PipeSinkSubtaskLifeCycle pipeSinkSubtaskLifeCycle =
new PipeSinkSubtaskLifeCycle(executor, pipeSinkSubtask,
pendingQueue);
pipeSinkSubtaskLifeCycleList.add(pipeSinkSubtaskLifeCycle);
}
LOGGER.info(
- "Pipe connector subtasks with attributes {} is bounded with
connectorExecutor {} and callbackExecutor {}.",
+ "Pipe sink subtasks with attributes {} is bounded with sinkExecutor
{} and callbackExecutor {}.",
attributeSortedString,
executor.getWorkingThreadName(),
executor.getCallbackThreadName());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
index 7eef4a03aa5..2502c385e5c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
@@ -46,7 +46,7 @@ public class PipeDataRegionSinkMetrics implements IMetricSet {
@SuppressWarnings("java:S3077")
private volatile AbstractMetricService metricService;
- private final Map<String, PipeSinkSubtask> connectorMap = new HashMap<>();
+ private final Map<String, PipeSinkSubtask> sinkMap = new HashMap<>();
private final Map<String, Rate> tabletRateMap = new ConcurrentHashMap<>();
@@ -61,7 +61,7 @@ public class PipeDataRegionSinkMetrics implements IMetricSet {
@Override
public void bindTo(final AbstractMetricService metricService) {
this.metricService = metricService;
- final ImmutableSet<String> taskIDs =
ImmutableSet.copyOf(connectorMap.keySet());
+ final ImmutableSet<String> taskIDs = ImmutableSet.copyOf(sinkMap.keySet());
for (String taskID : taskIDs) {
createMetrics(taskID);
}
@@ -75,91 +75,91 @@ public class PipeDataRegionSinkMetrics implements
IMetricSet {
}
private void createAutoGauge(final String taskID) {
- final PipeSinkSubtask connector = connectorMap.get(taskID);
+ final PipeSinkSubtask sink = sinkMap.get(taskID);
// Pending event count
metricService.createAutoGauge(
Metric.UNTRANSFERRED_TABLET_COUNT.toString(),
MetricLevel.IMPORTANT,
- connector,
+ sink,
PipeSinkSubtask::getTabletInsertionEventCount,
Tag.NAME.toString(),
- connector.getAttributeSortedString(),
+ sink.getAttributeSortedString(),
Tag.INDEX.toString(),
- String.valueOf(connector.getConnectorIndex()),
+ String.valueOf(sink.getSinkIndex()),
Tag.CREATION_TIME.toString(),
- String.valueOf(connector.getCreationTime()));
+ String.valueOf(sink.getCreationTime()));
metricService.createAutoGauge(
Metric.UNTRANSFERRED_TSFILE_COUNT.toString(),
MetricLevel.IMPORTANT,
- connector,
+ sink,
PipeSinkSubtask::getTsFileInsertionEventCount,
Tag.NAME.toString(),
- connector.getAttributeSortedString(),
+ sink.getAttributeSortedString(),
Tag.INDEX.toString(),
- String.valueOf(connector.getConnectorIndex()),
+ String.valueOf(sink.getSinkIndex()),
Tag.CREATION_TIME.toString(),
- String.valueOf(connector.getCreationTime()));
+ String.valueOf(sink.getCreationTime()));
metricService.createAutoGauge(
Metric.UNTRANSFERRED_HEARTBEAT_COUNT.toString(),
MetricLevel.IMPORTANT,
- connector,
+ sink,
PipeSinkSubtask::getPipeHeartbeatEventCount,
Tag.NAME.toString(),
- connector.getAttributeSortedString(),
+ sink.getAttributeSortedString(),
Tag.INDEX.toString(),
- String.valueOf(connector.getConnectorIndex()),
+ String.valueOf(sink.getSinkIndex()),
Tag.CREATION_TIME.toString(),
- String.valueOf(connector.getCreationTime()));
- // Metrics related to IoTDBThriftAsyncConnector
+ String.valueOf(sink.getCreationTime()));
+ // Metrics related to IoTDBThriftAsyncSink
metricService.createAutoGauge(
Metric.PIPE_ASYNC_CONNECTOR_RETRY_EVENT_QUEUE_SIZE.toString(),
MetricLevel.IMPORTANT,
- connector,
- PipeSinkSubtask::getAsyncConnectorRetryEventQueueSize,
+ sink,
+ PipeSinkSubtask::getAsyncSinkRetryEventQueueSize,
Tag.NAME.toString(),
- connector.getAttributeSortedString(),
+ sink.getAttributeSortedString(),
Tag.INDEX.toString(),
- String.valueOf(connector.getConnectorIndex()),
+ String.valueOf(sink.getSinkIndex()),
Tag.CREATION_TIME.toString(),
- String.valueOf(connector.getCreationTime()));
+ String.valueOf(sink.getCreationTime()));
metricService.createAutoGauge(
Metric.PIPE_PENDING_HANDLERS_SIZE.toString(),
MetricLevel.IMPORTANT,
- connector,
+ sink,
PipeSinkSubtask::getPendingHandlersSize,
Tag.NAME.toString(),
- connector.getAttributeSortedString(),
+ sink.getAttributeSortedString(),
Tag.INDEX.toString(),
- String.valueOf(connector.getConnectorIndex()),
+ String.valueOf(sink.getSinkIndex()),
Tag.CREATION_TIME.toString(),
- String.valueOf(connector.getCreationTime()));
- // Metrics related to IoTDB connector
+ String.valueOf(sink.getCreationTime()));
+ // Metrics related to IoTDB sink
metricService.createAutoGauge(
Metric.PIPE_TOTAL_UNCOMPRESSED_SIZE.toString(),
MetricLevel.IMPORTANT,
- connector,
+ sink,
PipeSinkSubtask::getTotalUncompressedSize,
Tag.NAME.toString(),
- connector.getAttributeSortedString(),
+ sink.getAttributeSortedString(),
Tag.INDEX.toString(),
- String.valueOf(connector.getConnectorIndex()),
+ String.valueOf(sink.getSinkIndex()),
Tag.CREATION_TIME.toString(),
- String.valueOf(connector.getCreationTime()));
+ String.valueOf(sink.getCreationTime()));
metricService.createAutoGauge(
Metric.PIPE_TOTAL_COMPRESSED_SIZE.toString(),
MetricLevel.IMPORTANT,
- connector,
+ sink,
PipeSinkSubtask::getTotalCompressedSize,
Tag.NAME.toString(),
- connector.getAttributeSortedString(),
+ sink.getAttributeSortedString(),
Tag.INDEX.toString(),
- String.valueOf(connector.getConnectorIndex()),
+ String.valueOf(sink.getSinkIndex()),
Tag.CREATION_TIME.toString(),
- String.valueOf(connector.getCreationTime()));
+ String.valueOf(sink.getCreationTime()));
}
private void createRate(final String taskID) {
- final PipeSinkSubtask connector = connectorMap.get(taskID);
+ final PipeSinkSubtask sink = sinkMap.get(taskID);
// Transfer event rate
tabletRateMap.put(
taskID,
@@ -167,109 +167,108 @@ public class PipeDataRegionSinkMetrics implements
IMetricSet {
Metric.PIPE_CONNECTOR_TABLET_TRANSFER.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
- connector.getAttributeSortedString(),
+ sink.getAttributeSortedString(),
Tag.INDEX.toString(),
- String.valueOf(connector.getConnectorIndex()),
+ String.valueOf(sink.getSinkIndex()),
Tag.CREATION_TIME.toString(),
- String.valueOf(connector.getCreationTime())));
+ String.valueOf(sink.getCreationTime())));
tsFileRateMap.put(
taskID,
metricService.getOrCreateRate(
Metric.PIPE_CONNECTOR_TSFILE_TRANSFER.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
- connector.getAttributeSortedString(),
+ sink.getAttributeSortedString(),
Tag.INDEX.toString(),
- String.valueOf(connector.getConnectorIndex()),
+ String.valueOf(sink.getSinkIndex()),
Tag.CREATION_TIME.toString(),
- String.valueOf(connector.getCreationTime())));
+ String.valueOf(sink.getCreationTime())));
pipeHeartbeatRateMap.put(
taskID,
metricService.getOrCreateRate(
Metric.PIPE_CONNECTOR_HEARTBEAT_TRANSFER.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
- connector.getAttributeSortedString(),
+ sink.getAttributeSortedString(),
Tag.INDEX.toString(),
- String.valueOf(connector.getConnectorIndex()),
+ String.valueOf(sink.getSinkIndex()),
Tag.CREATION_TIME.toString(),
- String.valueOf(connector.getCreationTime())));
+ String.valueOf(sink.getCreationTime())));
}
private void createTimer(final String taskID) {
- final PipeSinkSubtask connector = connectorMap.get(taskID);
+ final PipeSinkSubtask sink = sinkMap.get(taskID);
compressionTimerMap.putIfAbsent(
- connector.getAttributeSortedString(),
+ sink.getAttributeSortedString(),
metricService.getOrCreateTimer(
Metric.PIPE_COMPRESSION_TIME.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
- connector.getAttributeSortedString(),
+ sink.getAttributeSortedString(),
Tag.CREATION_TIME.toString(),
- String.valueOf(connector.getCreationTime())));
+ String.valueOf(sink.getCreationTime())));
}
private void createHistogram(final String taskID) {
- final PipeSinkSubtask connector = connectorMap.get(taskID);
+ final PipeSinkSubtask sink = sinkMap.get(taskID);
final Histogram tabletBatchSizeHistogram =
metricService.getOrCreateHistogram(
Metric.PIPE_INSERT_NODE_BATCH_SIZE.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
- connector.getAttributeSortedString(),
+ sink.getAttributeSortedString(),
Tag.CREATION_TIME.toString(),
- String.valueOf(connector.getCreationTime()));
- connector.setTabletBatchSizeHistogram(tabletBatchSizeHistogram);
+ String.valueOf(sink.getCreationTime()));
+ sink.setTabletBatchSizeHistogram(tabletBatchSizeHistogram);
final Histogram tsFileBatchSizeHistogram =
metricService.getOrCreateHistogram(
Metric.PIPE_TSFILE_BATCH_SIZE.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
- connector.getAttributeSortedString(),
+ sink.getAttributeSortedString(),
Tag.CREATION_TIME.toString(),
- String.valueOf(connector.getCreationTime()));
- connector.setTsFileBatchSizeHistogram(tsFileBatchSizeHistogram);
+ String.valueOf(sink.getCreationTime()));
+ sink.setTsFileBatchSizeHistogram(tsFileBatchSizeHistogram);
final Histogram tabletBatchTimeIntervalHistogram =
metricService.getOrCreateHistogram(
Metric.PIPE_INSERT_NODE_BATCH_TIME_COST.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
- connector.getAttributeSortedString(),
+ sink.getAttributeSortedString(),
Tag.CREATION_TIME.toString(),
- String.valueOf(connector.getCreationTime()));
-
connector.setTabletBatchTimeIntervalHistogram(tabletBatchTimeIntervalHistogram);
+ String.valueOf(sink.getCreationTime()));
+ sink.setTabletBatchTimeIntervalHistogram(tabletBatchTimeIntervalHistogram);
final Histogram tsFileBatchTimeIntervalHistogram =
metricService.getOrCreateHistogram(
Metric.PIPE_TSFILE_BATCH_TIME_COST.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
- connector.getAttributeSortedString(),
+ sink.getAttributeSortedString(),
Tag.CREATION_TIME.toString(),
- String.valueOf(connector.getCreationTime()));
-
connector.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram);
+ String.valueOf(sink.getCreationTime()));
+ sink.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram);
Histogram eventSizeHistogram =
metricService.getOrCreateHistogram(
Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
- connector.getAttributeSortedString());
- connector.setEventSizeHistogram(eventSizeHistogram);
+ sink.getAttributeSortedString());
+ sink.setEventSizeHistogram(eventSizeHistogram);
}
@Override
public void unbindFrom(final AbstractMetricService metricService) {
- final ImmutableSet<String> taskIDs =
ImmutableSet.copyOf(connectorMap.keySet());
+ final ImmutableSet<String> taskIDs = ImmutableSet.copyOf(sinkMap.keySet());
for (final String taskID : taskIDs) {
deregister(taskID);
}
- if (!connectorMap.isEmpty()) {
- LOGGER.warn(
- "Failed to unbind from pipe data region connector metrics, connector
map not empty");
+ if (!sinkMap.isEmpty()) {
+ LOGGER.warn("Failed to unbind from pipe data region sink metrics, sink
map not empty");
}
}
@@ -281,181 +280,181 @@ public class PipeDataRegionSinkMetrics implements
IMetricSet {
}
private void removeAutoGauge(final String taskID) {
- final PipeSinkSubtask connector = connectorMap.get(taskID);
+ final PipeSinkSubtask sink = sinkMap.get(taskID);
// Pending event count
metricService.remove(
MetricType.AUTO_GAUGE,
Metric.UNTRANSFERRED_TABLET_COUNT.toString(),
Tag.NAME.toString(),
- connector.getAttributeSortedString(),
+ sink.getAttributeSortedString(),
Tag.INDEX.toString(),
- String.valueOf(connector.getConnectorIndex()),
+ String.valueOf(sink.getSinkIndex()),
Tag.CREATION_TIME.toString(),
- String.valueOf(connector.getCreationTime()));
+ String.valueOf(sink.getCreationTime()));
metricService.remove(
MetricType.AUTO_GAUGE,
Metric.UNTRANSFERRED_TSFILE_COUNT.toString(),
Tag.NAME.toString(),
- connector.getAttributeSortedString(),
+ sink.getAttributeSortedString(),
Tag.INDEX.toString(),
- String.valueOf(connector.getConnectorIndex()),
+ String.valueOf(sink.getSinkIndex()),
Tag.CREATION_TIME.toString(),
- String.valueOf(connector.getCreationTime()));
+ String.valueOf(sink.getCreationTime()));
metricService.remove(
MetricType.AUTO_GAUGE,
Metric.UNTRANSFERRED_HEARTBEAT_COUNT.toString(),
Tag.NAME.toString(),
- connector.getAttributeSortedString(),
+ sink.getAttributeSortedString(),
Tag.INDEX.toString(),
- String.valueOf(connector.getConnectorIndex()),
+ String.valueOf(sink.getSinkIndex()),
Tag.CREATION_TIME.toString(),
- String.valueOf(connector.getCreationTime()));
- // Metrics related to IoTDBThriftAsyncConnector
+ String.valueOf(sink.getCreationTime()));
+ // Metrics related to IoTDBThriftAsyncSink
metricService.remove(
MetricType.AUTO_GAUGE,
Metric.PIPE_ASYNC_CONNECTOR_RETRY_EVENT_QUEUE_SIZE.toString(),
Tag.NAME.toString(),
- connector.getAttributeSortedString(),
+ sink.getAttributeSortedString(),
Tag.INDEX.toString(),
- String.valueOf(connector.getConnectorIndex()),
+ String.valueOf(sink.getSinkIndex()),
Tag.CREATION_TIME.toString(),
- String.valueOf(connector.getCreationTime()));
+ String.valueOf(sink.getCreationTime()));
metricService.remove(
MetricType.AUTO_GAUGE,
Metric.PIPE_PENDING_HANDLERS_SIZE.toString(),
Tag.NAME.toString(),
- connector.getAttributeSortedString(),
+ sink.getAttributeSortedString(),
Tag.INDEX.toString(),
- String.valueOf(connector.getConnectorIndex()),
+ String.valueOf(sink.getSinkIndex()),
Tag.CREATION_TIME.toString(),
- String.valueOf(connector.getCreationTime()));
- // Metrics related to IoTDB connector
+ String.valueOf(sink.getCreationTime()));
+ // Metrics related to IoTDB sink
metricService.remove(
MetricType.AUTO_GAUGE,
Metric.PIPE_TOTAL_UNCOMPRESSED_SIZE.toString(),
Tag.NAME.toString(),
- connector.getAttributeSortedString(),
+ sink.getAttributeSortedString(),
Tag.INDEX.toString(),
- String.valueOf(connector.getConnectorIndex()),
+ String.valueOf(sink.getSinkIndex()),
Tag.CREATION_TIME.toString(),
- String.valueOf(connector.getCreationTime()));
+ String.valueOf(sink.getCreationTime()));
metricService.remove(
MetricType.AUTO_GAUGE,
Metric.PIPE_TOTAL_COMPRESSED_SIZE.toString(),
Tag.NAME.toString(),
- connector.getAttributeSortedString(),
+ sink.getAttributeSortedString(),
Tag.INDEX.toString(),
- String.valueOf(connector.getConnectorIndex()),
+ String.valueOf(sink.getSinkIndex()),
Tag.CREATION_TIME.toString(),
- String.valueOf(connector.getCreationTime()));
+ String.valueOf(sink.getCreationTime()));
}
private void removeRate(final String taskID) {
- final PipeSinkSubtask connector = connectorMap.get(taskID);
+ final PipeSinkSubtask sink = sinkMap.get(taskID);
// Transfer event rate
metricService.remove(
MetricType.RATE,
Metric.PIPE_CONNECTOR_TABLET_TRANSFER.toString(),
Tag.NAME.toString(),
- connector.getAttributeSortedString(),
+ sink.getAttributeSortedString(),
Tag.INDEX.toString(),
- String.valueOf(connector.getConnectorIndex()),
+ String.valueOf(sink.getSinkIndex()),
Tag.CREATION_TIME.toString(),
- String.valueOf(connector.getCreationTime()));
+ String.valueOf(sink.getCreationTime()));
metricService.remove(
MetricType.RATE,
Metric.PIPE_CONNECTOR_TSFILE_TRANSFER.toString(),
Tag.NAME.toString(),
- connector.getAttributeSortedString(),
+ sink.getAttributeSortedString(),
Tag.INDEX.toString(),
- String.valueOf(connector.getConnectorIndex()),
+ String.valueOf(sink.getSinkIndex()),
Tag.CREATION_TIME.toString(),
- String.valueOf(connector.getCreationTime()));
+ String.valueOf(sink.getCreationTime()));
metricService.remove(
MetricType.RATE,
Metric.PIPE_CONNECTOR_HEARTBEAT_TRANSFER.toString(),
Tag.NAME.toString(),
- connector.getAttributeSortedString(),
+ sink.getAttributeSortedString(),
Tag.INDEX.toString(),
- String.valueOf(connector.getConnectorIndex()),
+ String.valueOf(sink.getSinkIndex()),
Tag.CREATION_TIME.toString(),
- String.valueOf(connector.getCreationTime()));
+ String.valueOf(sink.getCreationTime()));
tabletRateMap.remove(taskID);
tsFileRateMap.remove(taskID);
pipeHeartbeatRateMap.remove(taskID);
}
private void removeTimer(final String taskID) {
- final PipeSinkSubtask connector = connectorMap.get(taskID);
+ final PipeSinkSubtask sink = sinkMap.get(taskID);
metricService.remove(
MetricType.TIMER,
Metric.PIPE_COMPRESSION_TIME.toString(),
Tag.NAME.toString(),
- connector.getAttributeSortedString(),
+ sink.getAttributeSortedString(),
Tag.CREATION_TIME.toString(),
- String.valueOf(connector.getCreationTime()));
- compressionTimerMap.remove(connector.getAttributeSortedString());
+ String.valueOf(sink.getCreationTime()));
+ compressionTimerMap.remove(sink.getAttributeSortedString());
}
private void removeHistogram(final String taskID) {
- final PipeSinkSubtask connector = connectorMap.get(taskID);
+ final PipeSinkSubtask sink = sinkMap.get(taskID);
metricService.remove(
MetricType.HISTOGRAM,
Metric.PIPE_INSERT_NODE_BATCH_SIZE.toString(),
Tag.NAME.toString(),
- connector.getAttributeSortedString(),
+ sink.getAttributeSortedString(),
Tag.CREATION_TIME.toString(),
- String.valueOf(connector.getCreationTime()));
+ String.valueOf(sink.getCreationTime()));
metricService.remove(
MetricType.HISTOGRAM,
Metric.PIPE_TSFILE_BATCH_SIZE.toString(),
Tag.NAME.toString(),
- connector.getAttributeSortedString(),
+ sink.getAttributeSortedString(),
Tag.CREATION_TIME.toString(),
- String.valueOf(connector.getCreationTime()));
+ String.valueOf(sink.getCreationTime()));
metricService.remove(
MetricType.HISTOGRAM,
Metric.PIPE_INSERT_NODE_BATCH_TIME_COST.toString(),
Tag.NAME.toString(),
- connector.getAttributeSortedString(),
+ sink.getAttributeSortedString(),
Tag.CREATION_TIME.toString(),
- String.valueOf(connector.getCreationTime()));
+ String.valueOf(sink.getCreationTime()));
metricService.remove(
MetricType.HISTOGRAM,
Metric.PIPE_TSFILE_BATCH_TIME_COST.toString(),
Tag.NAME.toString(),
- connector.getAttributeSortedString(),
+ sink.getAttributeSortedString(),
Tag.CREATION_TIME.toString(),
- String.valueOf(connector.getCreationTime()));
+ String.valueOf(sink.getCreationTime()));
metricService.remove(
MetricType.HISTOGRAM,
Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
Tag.NAME.toString(),
- connector.getAttributeSortedString());
+ sink.getAttributeSortedString());
}
//////////////////////////// register & deregister (pipe integration)
////////////////////////////
public void register(final PipeSinkSubtask pipeSinkSubtask) {
final String taskID = pipeSinkSubtask.getTaskID();
- connectorMap.putIfAbsent(taskID, pipeSinkSubtask);
+ sinkMap.putIfAbsent(taskID, pipeSinkSubtask);
if (Objects.nonNull(metricService)) {
createMetrics(taskID);
}
}
public void deregister(final String taskID) {
- if (!connectorMap.containsKey(taskID)) {
+ if (!sinkMap.containsKey(taskID)) {
LOGGER.warn(
- "Failed to deregister pipe data region connector metrics,
PipeConnectorSubtask({}) does not exist",
+ "Failed to deregister pipe data region sink metrics,
PipeSinkSubtask({}) does not exist",
taskID);
return;
}
if (Objects.nonNull(metricService)) {
removeMetrics(taskID);
}
- connectorMap.remove(taskID);
+ sinkMap.remove(taskID);
}
public void markTabletEvent(final String taskID) {
@@ -465,7 +464,7 @@ public class PipeDataRegionSinkMetrics implements
IMetricSet {
final Rate rate = tabletRateMap.get(taskID);
if (rate == null) {
LOGGER.info(
- "Failed to mark pipe data region connector tablet event,
PipeConnectorSubtask({}) does not exist",
+ "Failed to mark pipe data region sink tablet event,
PipeSinkSubtask({}) does not exist",
taskID);
return;
}
@@ -479,7 +478,7 @@ public class PipeDataRegionSinkMetrics implements
IMetricSet {
final Rate rate = tsFileRateMap.get(taskID);
if (rate == null) {
LOGGER.info(
- "Failed to mark pipe data region connector tsfile event,
PipeConnectorSubtask({}) does not exist",
+ "Failed to mark pipe data region sink tsfile event,
PipeSinkSubtask({}) does not exist",
taskID);
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
index 56a90680185..2c2e860af7b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
@@ -278,6 +278,14 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
Objects.isNull(sink.getValueName()) ? name :
segments[segments.length - 1];
final NodeId nodeId = newNodeId(currentFolder + nodeName);
final UaVariableNode measurementNode;
+ final long utcTimestamp =
timestampToUtc(timestamps.get(timestamps.size() > 1 ? i : 0));
+ final DataValue dataValue =
+ new DataValue(
+ new Variant(values.get(i)),
+ currentQuality,
+ new DateTime(utcTimestamp),
+ new DateTime());
+
if (!getNodeManager().containsNode(nodeId)) {
measurementNode =
new UaVariableNode.UaVariableNodeBuilder(getNodeContext())
@@ -288,6 +296,7 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
.setDisplayName(LocalizedText.english(nodeName))
.setDataType(convertToOpcDataType(type))
.setTypeDefinition(Identifiers.BaseDataVariableType)
+ .setValue(dataValue)
.build();
getNodeManager().addNode(measurementNode);
if (Objects.nonNull(folderNode)) {
@@ -311,17 +320,11 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
String.format("The Node %s does not exist.",
nodeId)));
}
- final long utcTimestamp =
timestampToUtc(timestamps.get(timestamps.size() > 1 ? i : 0));
if (Objects.isNull(sink.getValueName())) {
if (Objects.isNull(measurementNode.getValue())
- ||
Objects.requireNonNull(measurementNode.getValue().getSourceTime()).getUtcTime()
- < utcTimestamp) {
- measurementNode.setValue(
- new DataValue(
- new Variant(values.get(i)),
- currentQuality,
- new DateTime(utcTimestamp),
- new DateTime()));
+ || Objects.isNull(measurementNode.getValue().getSourceTime())
+ || measurementNode.getValue().getSourceTime().getUtcTime() <
utcTimestamp) {
+ measurementNode.setValue(dataValue);
}
} else {
valueNode = measurementNode;
@@ -331,8 +334,8 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
}
if (Objects.nonNull(valueNode)) {
if (Objects.isNull(valueNode.getValue())
- ||
Objects.requireNonNull(valueNode.getValue().getSourceTime()).getUtcTime()
- < timestamp) {
+ || Objects.isNull(valueNode.getValue().getSourceTime())
+ || valueNode.getValue().getSourceTime().getUtcTime() < timestamp) {
valueNode.setValue(
new DataValue(
new Variant(value), currentQuality, new DateTime(timestamp),
new DateTime()));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
index ec9afce7c6f..7e1e38e1490 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.commons.pipe.config.constant;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import com.github.luben.zstd.Zstd;
@@ -55,6 +56,13 @@ public class PipeSinkConstant {
public static final String SINK_IOTDB_PARALLEL_TASKS_KEY =
"sink.parallel.tasks";
public static final int CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE =
PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum();
+ public static final Set<String> SINGLE_THREAD_DEFAULT_SINK =
+ new HashSet<>(
+ Arrays.asList(
+ BuiltinPipePlugin.OPC_UA_SINK.getPipePluginName(),
+ BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName(),
+ BuiltinPipePlugin.OPC_DA_SINK.getPipePluginName(),
+ BuiltinPipePlugin.OPC_DA_CONNECTOR.getPipePluginName()));
public static final String CONNECTOR_REALTIME_FIRST_KEY =
"connector.realtime-first";
public static final String SINK_REALTIME_FIRST_KEY = "sink.realtime-first";