This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch opc-large-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/opc-large-fix by this push:
new df223451d2c initial
df223451d2c is described below
commit df223451d2cc94608cf5402142f84e4f1f63330a
Author: Caideyipi <[email protected]>
AuthorDate: Mon Mar 30 11:03:50 2026 +0800
initial
---
.../task/builder/PipeDataNodeTaskBuilder.java | 1 +
.../pipe/agent/task/stage/PipeTaskSinkStage.java | 14 ++++--
.../task/subtask/sink/PipeSinkSubtaskManager.java | 2 +
.../db/pipe/sink/protocol/opcua/OpcUaSink.java | 54 ++++++++++++++++++++--
.../sink/protocol/writeback/WriteBackSink.java | 6 +--
.../task/stage/SubscriptionTaskSinkStage.java | 2 +-
.../plugin/env/PipeTaskSinkRuntimeEnvironment.java | 13 +++++-
7 files changed, 79 insertions(+), 13 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
index d66a4f14a79..a4fa8ca95b6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
@@ -113,6 +113,7 @@ public class PipeDataNodeTaskBuilder {
pipeStaticMeta.getPipeName(),
pipeStaticMeta.getCreationTime(),
sinkParameters,
+ sourceParameters,
regionId,
pipeType.equals(PipeType.USER)
?
PipeSubtaskExecutorManager.getInstance().getConnectorExecutorSupplier()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
index a22fbb536d7..494494e3cf9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
@@ -35,20 +35,23 @@ public class PipeTaskSinkStage extends PipeTaskStage {
protected final String pipeName;
protected final long creationTime;
protected final PipeParameters pipeSinkParameters;
+ protected final PipeParameters pipeSourceParameters;
protected final int regionId;
protected final Supplier<? extends PipeSinkSubtaskExecutor> executor;
protected String connectorSubtaskId;
public PipeTaskSinkStage(
- String pipeName,
- long creationTime,
- PipeParameters pipeSinkParameters,
- int regionId,
- Supplier<? extends PipeSinkSubtaskExecutor> executor) {
+ final String pipeName,
+ final long creationTime,
+ final PipeParameters pipeSinkParameters,
+ final PipeParameters pipeSourceParameters,
+ final int regionId,
+ final Supplier<? extends PipeSinkSubtaskExecutor> executor) {
this.pipeName = pipeName;
this.creationTime = creationTime;
this.pipeSinkParameters = pipeSinkParameters;
+ this.pipeSourceParameters = pipeSourceParameters;
this.regionId = regionId;
this.executor = executor;
@@ -61,6 +64,7 @@ public class PipeTaskSinkStage extends PipeTaskStage {
.register(
executor,
pipeSinkParameters,
+ pipeSourceParameters,
new PipeTaskSinkRuntimeEnvironment(pipeName, creationTime,
regionId));
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
index 9138a075918..61a02f5f6db 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
@@ -65,6 +65,7 @@ public class PipeSinkSubtaskManager {
public synchronized String register(
final Supplier<? extends PipeSinkSubtaskExecutor> executorSupplier,
final PipeParameters pipeSinkParameters,
+ final PipeParameters pipeSourceParameters,
final PipeTaskSinkRuntimeEnvironment environment) {
final String connectorKey =
pipeSinkParameters
@@ -119,6 +120,7 @@ public class PipeSinkSubtaskManager {
attributeSortedString = "schema_" + attributeSortedString;
}
environment.setAttributeSortedString(attributeSortedString);
+ environment.setSourceParameters(pipeSourceParameters);
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
final PipeSinkSubtaskExecutor executor = executorSupplier.get();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
index 554ac3b0df4..518142cd7e6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.sink.protocol.opcua;
import org.apache.iotdb.commons.consensus.DataRegionId;
+import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSinkRuntimeEnvironment;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
@@ -28,12 +29,17 @@ import
org.apache.iotdb.db.pipe.sink.protocol.opcua.client.ClientRunner;
import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.IoTDBOpcUaClient;
import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace;
import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaServerBuilder;
+import org.apache.iotdb.db.pipe.sink.protocol.writeback.WriteBackSink;
+import org.apache.iotdb.db.protocol.session.InternalClientSession;
+import org.apache.iotdb.db.protocol.session.SessionManager;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.annotation.TableModel;
import org.apache.iotdb.pipe.api.annotation.TreeModel;
import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
@@ -157,6 +163,11 @@ public class OpcUaSink implements PipeConnector {
// Outer server
private @Nullable IoTDBOpcUaClient client;
+ private boolean initialized;
+ private PipeTaskSinkRuntimeEnvironment environment;
+
+ // To avoid potential multi thread issue
+ private volatile boolean initializing;
@Override
public void validate(final PipeParameterValidator validator) throws
Exception {
@@ -246,7 +257,7 @@ public class OpcUaSink implements PipeConnector {
nodeUrl = parameters.getStringByKeys(CONNECTOR_OPC_UA_NODE_URL_KEY,
SINK_OPC_UA_NODE_URL_KEY);
if (Objects.isNull(nodeUrl)) {
- customizeServer(parameters);
+ customizeServer(parameters, configuration.getRuntimeEnvironment());
} else {
if (PathUtils.isTableModelDatabase(databaseName)) {
throw new PipeException(
@@ -256,7 +267,8 @@ public class OpcUaSink implements PipeConnector {
}
}
- private void customizeServer(final PipeParameters parameters) {
+ private void customizeServer(
+ final PipeParameters parameters, final PipeRuntimeEnvironment
environment) {
final int tcpBindPort =
parameters.getIntOrDefault(
Arrays.asList(CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY,
SINK_OPC_UA_TCP_BIND_PORT_KEY),
@@ -353,6 +365,42 @@ public class OpcUaSink implements PipeConnector {
.getRight();
SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP.get(serverKey).getLeft().incrementAndGet();
}
+
+ if (environment instanceof PipeTaskSinkRuntimeEnvironment) {
+ this.environment = (PipeTaskSinkRuntimeEnvironment) environment;
+ }
+ initializeServer();
+ }
+
+ /**
+ * This function pushes the last data to opc server, to initialize first
time or after restart.
+ * Note that the first fetch may not be ok, thus it will be triggered per
heartbeat
+ */
+ private void initializeServer() {
+ if (initialized || initializing || Objects.isNull(environment)) {
+ return;
+ }
+ initializing = true;
+
+ InternalClientSession session = null;
+ try {
+ session =
+ new InternalClientSession(
+ String.format(
+ "%s_%s_%s_%s_%s",
+ WriteBackSink.class.getSimpleName(),
+ environment.getPipeName(),
+ environment.getCreationTime(),
+ environment.getRegionId(),
+ WriteBackSink.id.getAndIncrement()));
+ initialized = true;
+ } finally {
+ initializing = false;
+ if (Objects.nonNull(session)) {
+ SessionManager.getInstance()
+ .closeSession(session,
Coordinator.getInstance()::cleanupQueryExecution, false);
+ }
+ }
}
private void customizeClient(final PipeParameters parameters) {
@@ -460,7 +508,7 @@ public class OpcUaSink implements PipeConnector {
@Override
public void heartbeat() throws Exception {
- // Server side, do nothing
+ initializeServer();
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
index b5dccb55c88..c11498c18fe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
@@ -169,14 +169,14 @@ public class WriteBackSink implements PipeConnector {
session.setClientVersion(IoTDBConstant.ClientVersion.V_1_0);
session.setZoneId(ZoneId.systemDefault());
- final String connectorSkipIfValue =
+ final String sinkSkipIfValue =
parameters
.getStringOrDefault(
Arrays.asList(CONNECTOR_SKIP_IF_KEY, SINK_SKIP_IF_KEY),
WRITE_BACK_CONNECTOR_SKIP_IF_DEFAULT_VALUE)
.trim();
final Set<String> skipIfOptionSet =
- Arrays.stream(connectorSkipIfValue.split(","))
+ Arrays.stream(sinkSkipIfValue.split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
.map(String::toLowerCase)
@@ -262,7 +262,7 @@ public class WriteBackSink implements PipeConnector {
private void doTransfer(
final PipeInsertNodeTabletInsertionEvent
pipeInsertNodeTabletInsertionEvent)
- throws PipeException, WALPipeException, IOException {
+ throws PipeException {
final InsertNode insertNode =
pipeInsertNodeTabletInsertionEvent.getInsertNode();
final String dataBaseName =
pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
index 73fca57a1fd..74a1280843c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
@@ -36,7 +36,7 @@ public class SubscriptionTaskSinkStage extends
PipeTaskSinkStage {
final PipeParameters pipeSinkParameters,
final int regionId,
final PipeSinkSubtaskExecutor executor) {
- super(pipeName, creationTime, pipeSinkParameters, regionId, () ->
executor);
+ super(pipeName, creationTime, pipeSinkParameters, null, regionId, () ->
executor);
}
@Override
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSinkRuntimeEnvironment.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSinkRuntimeEnvironment.java
index b8382891348..7c6783bbbfc 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSinkRuntimeEnvironment.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSinkRuntimeEnvironment.java
@@ -19,8 +19,11 @@
package org.apache.iotdb.commons.pipe.config.plugin.env;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+
public class PipeTaskSinkRuntimeEnvironment extends PipeTaskRuntimeEnvironment
{
private String attributeSortedString;
+ private PipeParameters sourceParameters;
public PipeTaskSinkRuntimeEnvironment(
final String pipeName, final long creationTime, final int regionId) {
@@ -31,7 +34,15 @@ public class PipeTaskSinkRuntimeEnvironment extends
PipeTaskRuntimeEnvironment {
return attributeSortedString;
}
- public void setAttributeSortedString(String attributeSortedString) {
+ public void setAttributeSortedString(final String attributeSortedString) {
this.attributeSortedString = attributeSortedString;
}
+
+ public PipeParameters getSourceParameters() {
+ return sourceParameters;
+ }
+
+ public void setSourceParameters(final PipeParameters sourceParameters) {
+ this.sourceParameters = sourceParameters;
+ }
}