This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch opc-large-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/opc-large-fix by this push:
     new df223451d2c initial
df223451d2c is described below

commit df223451d2cc94608cf5402142f84e4f1f63330a
Author: Caideyipi <[email protected]>
AuthorDate: Mon Mar 30 11:03:50 2026 +0800

    initial
---
 .../task/builder/PipeDataNodeTaskBuilder.java      |  1 +
 .../pipe/agent/task/stage/PipeTaskSinkStage.java   | 14 ++++--
 .../task/subtask/sink/PipeSinkSubtaskManager.java  |  2 +
 .../db/pipe/sink/protocol/opcua/OpcUaSink.java     | 54 ++++++++++++++++++++--
 .../sink/protocol/writeback/WriteBackSink.java     |  6 +--
 .../task/stage/SubscriptionTaskSinkStage.java      |  2 +-
 .../plugin/env/PipeTaskSinkRuntimeEnvironment.java | 13 +++++-
 7 files changed, 79 insertions(+), 13 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
index d66a4f14a79..a4fa8ca95b6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
@@ -113,6 +113,7 @@ public class PipeDataNodeTaskBuilder {
               pipeStaticMeta.getPipeName(),
               pipeStaticMeta.getCreationTime(),
               sinkParameters,
+              sourceParameters,
               regionId,
               pipeType.equals(PipeType.USER)
                   ? 
PipeSubtaskExecutorManager.getInstance().getConnectorExecutorSupplier()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
index a22fbb536d7..494494e3cf9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskSinkStage.java
@@ -35,20 +35,23 @@ public class PipeTaskSinkStage extends PipeTaskStage {
   protected final String pipeName;
   protected final long creationTime;
   protected final PipeParameters pipeSinkParameters;
+  protected final PipeParameters pipeSourceParameters;
   protected final int regionId;
   protected final Supplier<? extends PipeSinkSubtaskExecutor> executor;
 
   protected String connectorSubtaskId;
 
   public PipeTaskSinkStage(
-      String pipeName,
-      long creationTime,
-      PipeParameters pipeSinkParameters,
-      int regionId,
-      Supplier<? extends PipeSinkSubtaskExecutor> executor) {
+      final String pipeName,
+      final long creationTime,
+      final PipeParameters pipeSinkParameters,
+      final PipeParameters pipeSourceParameters,
+      final int regionId,
+      final Supplier<? extends PipeSinkSubtaskExecutor> executor) {
     this.pipeName = pipeName;
     this.creationTime = creationTime;
     this.pipeSinkParameters = pipeSinkParameters;
+    this.pipeSourceParameters = pipeSourceParameters;
     this.regionId = regionId;
     this.executor = executor;
 
@@ -61,6 +64,7 @@ public class PipeTaskSinkStage extends PipeTaskStage {
             .register(
                 executor,
                 pipeSinkParameters,
+                pipeSourceParameters,
                 new PipeTaskSinkRuntimeEnvironment(pipeName, creationTime, 
regionId));
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
index 9138a075918..61a02f5f6db 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
@@ -65,6 +65,7 @@ public class PipeSinkSubtaskManager {
   public synchronized String register(
       final Supplier<? extends PipeSinkSubtaskExecutor> executorSupplier,
       final PipeParameters pipeSinkParameters,
+      final PipeParameters pipeSourceParameters,
       final PipeTaskSinkRuntimeEnvironment environment) {
     final String connectorKey =
         pipeSinkParameters
@@ -119,6 +120,7 @@ public class PipeSinkSubtaskManager {
       attributeSortedString = "schema_" + attributeSortedString;
     }
     environment.setAttributeSortedString(attributeSortedString);
+    environment.setSourceParameters(pipeSourceParameters);
 
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
       final PipeSinkSubtaskExecutor executor = executorSupplier.get();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
index 554ac3b0df4..518142cd7e6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.sink.protocol.opcua;
 
 import org.apache.iotdb.commons.consensus.DataRegionId;
+import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSinkRuntimeEnvironment;
 import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
@@ -28,12 +29,17 @@ import 
org.apache.iotdb.db.pipe.sink.protocol.opcua.client.ClientRunner;
 import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.IoTDBOpcUaClient;
 import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace;
 import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaServerBuilder;
+import org.apache.iotdb.db.pipe.sink.protocol.writeback.WriteBackSink;
+import org.apache.iotdb.db.protocol.session.InternalClientSession;
+import org.apache.iotdb.db.protocol.session.SessionManager;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.annotation.TableModel;
 import org.apache.iotdb.pipe.api.annotation.TreeModel;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -157,6 +163,11 @@ public class OpcUaSink implements PipeConnector {
 
   // Outer server
   private @Nullable IoTDBOpcUaClient client;
+  private boolean initialized;
+  private PipeTaskSinkRuntimeEnvironment environment;
+
+  // To avoid potential multi thread issue
+  private volatile boolean initializing;
 
   @Override
   public void validate(final PipeParameterValidator validator) throws 
Exception {
@@ -246,7 +257,7 @@ public class OpcUaSink implements PipeConnector {
 
     nodeUrl = parameters.getStringByKeys(CONNECTOR_OPC_UA_NODE_URL_KEY, 
SINK_OPC_UA_NODE_URL_KEY);
     if (Objects.isNull(nodeUrl)) {
-      customizeServer(parameters);
+      customizeServer(parameters, configuration.getRuntimeEnvironment());
     } else {
       if (PathUtils.isTableModelDatabase(databaseName)) {
         throw new PipeException(
@@ -256,7 +267,8 @@ public class OpcUaSink implements PipeConnector {
     }
   }
 
-  private void customizeServer(final PipeParameters parameters) {
+  private void customizeServer(
+      final PipeParameters parameters, final PipeRuntimeEnvironment 
environment) {
     final int tcpBindPort =
         parameters.getIntOrDefault(
             Arrays.asList(CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY, 
SINK_OPC_UA_TCP_BIND_PORT_KEY),
@@ -353,6 +365,42 @@ public class OpcUaSink implements PipeConnector {
               .getRight();
       
SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP.get(serverKey).getLeft().incrementAndGet();
     }
+
+    if (environment instanceof PipeTaskSinkRuntimeEnvironment) {
+      this.environment = (PipeTaskSinkRuntimeEnvironment) environment;
+    }
+    initializeServer();
+  }
+
+  /**
+   * This function pushes the last data to opc server, to initialize first 
time or after restart.
+   * Note that the first fetch may not be ok, thus it will be triggered per 
heartbeat
+   */
+  private void initializeServer() {
+    if (initialized || initializing || Objects.isNull(environment)) {
+      return;
+    }
+    initializing = true;
+
+    InternalClientSession session = null;
+    try {
+      session =
+          new InternalClientSession(
+              String.format(
+                  "%s_%s_%s_%s_%s",
+                  WriteBackSink.class.getSimpleName(),
+                  environment.getPipeName(),
+                  environment.getCreationTime(),
+                  environment.getRegionId(),
+                  WriteBackSink.id.getAndIncrement()));
+      initialized = true;
+    } finally {
+      initializing = false;
+      if (Objects.nonNull(session)) {
+        SessionManager.getInstance()
+            .closeSession(session, 
Coordinator.getInstance()::cleanupQueryExecution, false);
+      }
+    }
   }
 
   private void customizeClient(final PipeParameters parameters) {
@@ -460,7 +508,7 @@ public class OpcUaSink implements PipeConnector {
 
   @Override
   public void heartbeat() throws Exception {
-    // Server side, do nothing
+    initializeServer();
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
index b5dccb55c88..c11498c18fe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
@@ -169,14 +169,14 @@ public class WriteBackSink implements PipeConnector {
     session.setClientVersion(IoTDBConstant.ClientVersion.V_1_0);
     session.setZoneId(ZoneId.systemDefault());
 
-    final String connectorSkipIfValue =
+    final String sinkSkipIfValue =
         parameters
             .getStringOrDefault(
                 Arrays.asList(CONNECTOR_SKIP_IF_KEY, SINK_SKIP_IF_KEY),
                 WRITE_BACK_CONNECTOR_SKIP_IF_DEFAULT_VALUE)
             .trim();
     final Set<String> skipIfOptionSet =
-        Arrays.stream(connectorSkipIfValue.split(","))
+        Arrays.stream(sinkSkipIfValue.split(","))
             .map(String::trim)
             .filter(s -> !s.isEmpty())
             .map(String::toLowerCase)
@@ -262,7 +262,7 @@ public class WriteBackSink implements PipeConnector {
 
   private void doTransfer(
       final PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent)
-      throws PipeException, WALPipeException, IOException {
+      throws PipeException {
     final InsertNode insertNode = 
pipeInsertNodeTabletInsertionEvent.getInsertNode();
     final String dataBaseName =
         pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
index 73fca57a1fd..74a1280843c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java
@@ -36,7 +36,7 @@ public class SubscriptionTaskSinkStage extends 
PipeTaskSinkStage {
       final PipeParameters pipeSinkParameters,
       final int regionId,
       final PipeSinkSubtaskExecutor executor) {
-    super(pipeName, creationTime, pipeSinkParameters, regionId, () -> 
executor);
+    super(pipeName, creationTime, pipeSinkParameters, null, regionId, () -> 
executor);
   }
 
   @Override
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSinkRuntimeEnvironment.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSinkRuntimeEnvironment.java
index b8382891348..7c6783bbbfc 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSinkRuntimeEnvironment.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSinkRuntimeEnvironment.java
@@ -19,8 +19,11 @@
 
 package org.apache.iotdb.commons.pipe.config.plugin.env;
 
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+
 public class PipeTaskSinkRuntimeEnvironment extends PipeTaskRuntimeEnvironment 
{
   private String attributeSortedString;
+  private PipeParameters sourceParameters;
 
   public PipeTaskSinkRuntimeEnvironment(
       final String pipeName, final long creationTime, final int regionId) {
@@ -31,7 +34,15 @@ public class PipeTaskSinkRuntimeEnvironment extends 
PipeTaskRuntimeEnvironment {
     return attributeSortedString;
   }
 
-  public void setAttributeSortedString(String attributeSortedString) {
+  public void setAttributeSortedString(final String attributeSortedString) {
     this.attributeSortedString = attributeSortedString;
   }
+
+  public PipeParameters getSourceParameters() {
+    return sourceParameters;
+  }
+
+  public void setSourceParameters(final PipeParameters sourceParameters) {
+    this.sourceParameters = sourceParameters;
+  }
 }

Reply via email to