This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch opc-large-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/opc-large-fix by this push:
new 3386c4bfeab fix
3386c4bfeab is described below
commit 3386c4bfeabd4ddac8f0f5069fcd115d6c2e6cbd
Author: Caideyipi <[email protected]>
AuthorDate: Mon Mar 30 11:41:08 2026 +0800
fix
---
.../db/pipe/sink/protocol/opcua/OpcUaSink.java | 70 ++++++++++++++++++----
.../plugin/env/PipeTaskSinkRuntimeEnvironment.java | 10 ++++
2 files changed, 67 insertions(+), 13 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
index 518142cd7e6..ac0c10d287f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.sink.protocol.opcua;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.consensus.DataRegionId;
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSinkRuntimeEnvironment;
import org.apache.iotdb.commons.utils.PathUtils;
@@ -61,6 +62,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.File;
+import java.time.ZoneId;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
@@ -148,6 +150,8 @@ public class OpcUaSink implements PipeConnector {
SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP = new
ConcurrentHashMap<>();
private static final Map<String, Pair<AtomicInteger, IoTDBOpcUaClient>>
CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP = new ConcurrentHashMap<>();
+ private static final SessionManager SESSION_MANAGER =
SessionManager.getInstance();
+ private static final String INITIALIZING_KEY = "initializing";
private String serverKey;
private String nodeUrl;
@@ -165,9 +169,7 @@ public class OpcUaSink implements PipeConnector {
private @Nullable IoTDBOpcUaClient client;
private boolean initialized;
private PipeTaskSinkRuntimeEnvironment environment;
-
- // To avoid potential multi thread issue
- private volatile boolean initializing;
+ private InternalClientSession session;
@Override
public void validate(final PipeParameterValidator validator) throws
Exception {
@@ -366,6 +368,8 @@ public class OpcUaSink implements PipeConnector {
SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP.get(serverKey).getLeft().incrementAndGet();
}
+ // PipeTaskTemporaryRuntimeEnvironment is for validation, no need to
trigger server
+ // initialization
if (environment instanceof PipeTaskSinkRuntimeEnvironment) {
this.environment = (PipeTaskSinkRuntimeEnvironment) environment;
}
@@ -377,13 +381,40 @@ public class OpcUaSink implements PipeConnector {
* Note that the first fetch may not be ok, thus it will be triggered per
heartbeat
*/
private void initializeServer() {
- if (initialized || initializing || Objects.isNull(environment)) {
+ if (initialized
+ || isInitializing()
+ || Objects.isNull(environment)
+ || Objects.isNull(environment.getSourceParameters())) {
return;
}
- initializing = true;
+ markInitializing(true);
- InternalClientSession session = null;
try {
+ initializeSession();
+
+ initialized = true;
+ if (Objects.nonNull(session)) {
+ SESSION_MANAGER.closeSession(
+ session, Coordinator.getInstance()::cleanupQueryExecution, false);
+ }
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Failed to initialize OPC Server, maybe the cluster is not
initialized, will retry later, message: {}",
+ e.getMessage());
+ } finally {
+ markInitializing(false);
+ }
+ }
+
+ private void initializeSession() {
+ final PipeParameters sourceParameters = environment.getSourceParameters();
+ final String usernameString =
+ sourceParameters.getStringByKeys(
+ CONNECTOR_IOTDB_USER_KEY,
+ SINK_IOTDB_USER_KEY,
+ CONNECTOR_IOTDB_USERNAME_KEY,
+ SINK_IOTDB_USERNAME_KEY);
+ if (Objects.isNull(session)) {
session =
new InternalClientSession(
String.format(
@@ -393,13 +424,26 @@ public class OpcUaSink implements PipeConnector {
environment.getCreationTime(),
environment.getRegionId(),
WriteBackSink.id.getAndIncrement()));
- initialized = true;
- } finally {
- initializing = false;
- if (Objects.nonNull(session)) {
- SessionManager.getInstance()
- .closeSession(session,
Coordinator.getInstance()::cleanupQueryExecution, false);
- }
+ session.setClientVersion(IoTDBConstant.ClientVersion.V_1_0);
+ session.setZoneId(ZoneId.systemDefault());
+ session.setUsername(usernameString);
+ }
+
+ if (SESSION_MANAGER.getCurrSession() == null) {
+ SESSION_MANAGER.registerSession(session);
+ }
+ // We skip the session login here because it's already done in source
customizing
+ }
+
+ private boolean isInitializing() {
+ return environment.getPublicParameters().containsKey(INITIALIZING_KEY);
+ }
+
+ private void markInitializing(final boolean initializing) {
+ if (initializing) {
+ environment.getPublicParameters().put(INITIALIZING_KEY, true);
+ } else {
+ environment.getPublicParameters().remove(INITIALIZING_KEY);
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSinkRuntimeEnvironment.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSinkRuntimeEnvironment.java
index 7c6783bbbfc..d3d48fac607 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSinkRuntimeEnvironment.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskSinkRuntimeEnvironment.java
@@ -21,10 +21,16 @@ package org.apache.iotdb.commons.pipe.config.plugin.env;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
public class PipeTaskSinkRuntimeEnvironment extends PipeTaskRuntimeEnvironment
{
private String attributeSortedString;
private PipeParameters sourceParameters;
+ // For reused sinks, this can be used to share information
+ private final Map<String, Object> publicParameters = new
ConcurrentHashMap<>();
+
public PipeTaskSinkRuntimeEnvironment(
final String pipeName, final long creationTime, final int regionId) {
super(pipeName, creationTime, regionId);
@@ -45,4 +51,8 @@ public class PipeTaskSinkRuntimeEnvironment extends
PipeTaskRuntimeEnvironment {
public void setSourceParameters(final PipeParameters sourceParameters) {
this.sourceParameters = sourceParameters;
}
+
+ public Map<String, Object> getPublicParameters() {
+ return publicParameters;
+ }
}