This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch opc-large-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/opc-large-fix by this push:
     new 3386c4bfeab fix
3386c4bfeab is described below

commit 3386c4bfeabd4ddac8f0f5069fcd115d6c2e6cbd
Author: Caideyipi <[email protected]>
AuthorDate: Mon Mar 30 11:41:08 2026 +0800

    fix
---
 .../db/pipe/sink/protocol/opcua/OpcUaSink.java     | 70 ++++++++++++++++++----
 .../plugin/env/PipeTaskSinkRuntimeEnvironment.java | 10 ++++
 2 files changed, 67 insertions(+), 13 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
index 518142cd7e6..ac0c10d287f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.sink.protocol.opcua;
 
+import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSinkRuntimeEnvironment;
 import org.apache.iotdb.commons.utils.PathUtils;
@@ -61,6 +62,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.io.File;
+import java.time.ZoneId;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.Objects;
@@ -148,6 +150,8 @@ public class OpcUaSink implements PipeConnector {
       SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP = new 
ConcurrentHashMap<>();
   private static final Map<String, Pair<AtomicInteger, IoTDBOpcUaClient>>
       CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP = new ConcurrentHashMap<>();
+  private static final SessionManager SESSION_MANAGER = 
SessionManager.getInstance();
+  private static final String INITIALIZING_KEY = "initializing";
 
   private String serverKey;
   private String nodeUrl;
@@ -165,9 +169,7 @@ public class OpcUaSink implements PipeConnector {
   private @Nullable IoTDBOpcUaClient client;
   private boolean initialized;
   private PipeTaskSinkRuntimeEnvironment environment;
-
-  // To avoid potential multi thread issue
-  private volatile boolean initializing;
+  private InternalClientSession session;
 
   @Override
   public void validate(final PipeParameterValidator validator) throws 
Exception {
@@ -366,6 +368,8 @@ public class OpcUaSink implements PipeConnector {
       
SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP.get(serverKey).getLeft().incrementAndGet();
     }
 
+    // PipeTaskTemporaryRuntimeEnvironment is for validation, no need to 
trigger server
+    // initialization
     if (environment instanceof PipeTaskSinkRuntimeEnvironment) {
       this.environment = (PipeTaskSinkRuntimeEnvironment) environment;
     }
@@ -377,13 +381,40 @@ public class OpcUaSink implements PipeConnector {
    * Note that the first fetch may not be ok, thus it will be triggered per 
heartbeat
    */
   private void initializeServer() {
-    if (initialized || initializing || Objects.isNull(environment)) {
+    if (initialized
+        || isInitializing()
+        || Objects.isNull(environment)
+        || Objects.isNull(environment.getSourceParameters())) {
       return;
     }
-    initializing = true;
+    markInitializing(true);
 
-    InternalClientSession session = null;
     try {
+      initializeSession();
+
+      initialized = true;
+      if (Objects.nonNull(session)) {
+        SESSION_MANAGER.closeSession(
+            session, Coordinator.getInstance()::cleanupQueryExecution, false);
+      }
+    } catch (final Exception e) {
+      LOGGER.warn(
+          "Failed to initialize OPC Server, maybe the cluster is not 
initialized, will retry later, message: {}",
+          e.getMessage());
+    } finally {
+      markInitializing(false);
+    }
+  }
+
+  private void initializeSession() {
+    final PipeParameters sourceParameters = environment.getSourceParameters();
+    final String usernameString =
+        sourceParameters.getStringByKeys(
+            CONNECTOR_IOTDB_USER_KEY,
+            SINK_IOTDB_USER_KEY,
+            CONNECTOR_IOTDB_USERNAME_KEY,
+            SINK_IOTDB_USERNAME_KEY);
+    if (Objects.isNull(session)) {
       session =
           new InternalClientSession(
               String.format(
@@ -393,13 +424,26 @@ public class OpcUaSink implements PipeConnector {
                   environment.getCreationTime(),
                   environment.getRegionId(),
                   WriteBackSink.id.getAndIncrement()));
-      initialized = true;
-    } finally {
-      initializing = false;
-      if (Objects.nonNull(session)) {
-        SessionManager.getInstance()
-            .closeSession(session, 
Coordinator.getInstance()::cleanupQueryExecution, false);
-      }
+      session.setClientVersion(IoTDBConstant.ClientVersion.V_1_0);
+      session.setZoneId(ZoneId.systemDefault());
+      session.setUsername(usernameString);
+    }
+
+    if (SESSION_MANAGER.getCurrSession() == null) {
+      SESSION_MANAGER.registerSession(session);
+    }
+    // We skip the session login here because it's already done in source 
customizing
+  }
+
+  private boolean isInitializing() {
+    return environment.getPublicParameters().containsKey(INITIALIZING_KEY);
+  }
+
+  private void markInitializing(final boolean initializing) {
+    if (initializing) {
+      environment.getPublicParameters().put(INITIALIZING_KEY, true);
+    } else {
+      environment.getPublicParameters().remove(INITIALIZING_KEY);
     }
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSinkRuntimeEnvironment.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSinkRuntimeEnvironment.java
index 7c6783bbbfc..d3d48fac607 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSinkRuntimeEnvironment.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSinkRuntimeEnvironment.java
@@ -21,10 +21,16 @@ package org.apache.iotdb.commons.pipe.config.plugin.env;
 
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
 public class PipeTaskSinkRuntimeEnvironment extends PipeTaskRuntimeEnvironment 
{
   private String attributeSortedString;
   private PipeParameters sourceParameters;
 
+  // For reused sinks, this can be used to share information
+  private final Map<String, Object> publicParameters = new 
ConcurrentHashMap<>();
+
   public PipeTaskSinkRuntimeEnvironment(
       final String pipeName, final long creationTime, final int regionId) {
     super(pipeName, creationTime, regionId);
@@ -45,4 +51,8 @@ public class PipeTaskSinkRuntimeEnvironment extends 
PipeTaskRuntimeEnvironment {
   public void setSourceParameters(final PipeParameters sourceParameters) {
     this.sourceParameters = sourceParameters;
   }
+
+  public Map<String, Object> getPublicParameters() {
+    return publicParameters;
+  }
 }

Reply via email to