This is an automated email from the ASF dual-hosted git repository. justinchen pushed a commit to branch opc-large-fix in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 53ec096b457ac7f3cf266d7679c3d77ccd35b65d Author: Caideyipi <[email protected]> AuthorDate: Mon Mar 30 12:20:06 2026 +0800 partial --- .../apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java | 13 +++++++++++++ .../org/apache/iotdb/db/service/ExternalRPCService.java | 4 ++++ 2 files changed, 17 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java index ac0c10d287f..91b959b229c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java @@ -31,9 +31,13 @@ import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.IoTDBOpcUaClient; import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace; import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaServerBuilder; import org.apache.iotdb.db.pipe.sink.protocol.writeback.WriteBackSink; +import org.apache.iotdb.db.protocol.session.IClientSession; import org.apache.iotdb.db.protocol.session.InternalClientSession; import org.apache.iotdb.db.protocol.session.SessionManager; +import org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl; +import org.apache.iotdb.db.protocol.thrift.impl.IClientRPCServiceWithHandler; import org.apache.iotdb.db.queryengine.plan.Coordinator; +import org.apache.iotdb.db.service.ExternalRPCService; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.pipe.api.PipeConnector; @@ -47,6 +51,7 @@ import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq; import org.apache.tsfile.common.conf.TSFileConfig; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.write.record.Tablet; @@ -64,6 +69,7 @@ import javax.annotation.Nullable; import java.io.File; import java.time.ZoneId; import java.util.Arrays; +import java.util.Collections; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -387,10 +393,16 @@ public class OpcUaSink implements PipeConnector { || Objects.isNull(environment.getSourceParameters())) { return; } + final IClientRPCServiceWithHandler clientRPCServiceImpl = + ExternalRPCService.getInstance().getImpl(); + if (!(clientRPCServiceImpl instanceof ClientRPCServiceImpl)) { + return; + } markInitializing(true); try { initializeSession(); + clientRPCServiceImpl.executeLastDataQueryV2(new TSLastDataQueryReq(session.getId(), Collections.emptyList(), )); initialized = true; if (Objects.nonNull(session)) { @@ -427,6 +439,7 @@ public class OpcUaSink implements PipeConnector { session.setClientVersion(IoTDBConstant.ClientVersion.V_1_0); session.setZoneId(ZoneId.systemDefault()); session.setUsername(usernameString); + session.setSqlDialect(IClientSession.SqlDialect.TREE); } if (SESSION_MANAGER.getCurrSession() == null) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/ExternalRPCService.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/ExternalRPCService.java index dcc8690de8a..b4d576874e0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/ExternalRPCService.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/ExternalRPCService.java @@ -118,6 +118,10 @@ public class ExternalRPCService extends ThriftService implements ExternalRPCServ return getBindPort(); } + public IClientRPCServiceWithHandler getImpl() { + return impl; + } + private static class RPCServiceHolder { private static final ExternalRPCService INSTANCE = new ExternalRPCService();
