This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch opitimize_query_terminator_in_local in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit af7974ef0e44df9bc55105ee469b7813df3c2076 Author: Beyyes <[email protected]> AuthorDate: Wed May 24 10:20:13 2023 +0800 Avoid rpc invoking for SimpleQueryTerminator when endpoint is local address --- .../mpp/plan/scheduler/SimpleQueryTerminator.java | 27 ++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java index 841abf4deae..be1712c6c6f 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java @@ -23,8 +23,11 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.mpp.common.FragmentInstanceId; import org.apache.iotdb.db.mpp.common.MPPQueryContext; import org.apache.iotdb.db.mpp.common.QueryId; +import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager; import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance; import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId; @@ -101,6 +104,18 @@ public class SimpleQueryTerminator implements IQueryTerminator { if (unfinishedFIs.isEmpty()) { continue; } + + String internalAddress = IoTDBDescriptor.getInstance().getConfig().getInternalAddress(); + int internalPort = IoTDBDescriptor.getInstance().getConfig().getInternalPort(); + if (internalAddress.equalsIgnoreCase(endPoint.getIp()) + && internalPort == endPoint.getPort()) { + for (TFragmentInstanceId insId : unfinishedFIs) { + FragmentInstanceManager.getInstance() + .cancelTask(FragmentInstanceId.fromThrift(insId), false); + } + return true; + } + try (SyncDataNodeInternalServiceClient client = internalServiceClientManager.borrowClient(endPoint)) { client.cancelQuery(new TCancelQueryReq(queryId.getId(), unfinishedFIs, false)); @@ -126,6 +141,18 @@ public class SimpleQueryTerminator implements IQueryTerminator { if (unfinishedFIs.isEmpty()) { continue; } + + String internalAddress = IoTDBDescriptor.getInstance().getConfig().getInternalAddress(); + int internalPort = IoTDBDescriptor.getInstance().getConfig().getInternalPort(); + if (internalAddress.equalsIgnoreCase(endPoint.getIp()) + && internalPort == endPoint.getPort()) { + for (TFragmentInstanceId insId : unfinishedFIs) { + FragmentInstanceManager.getInstance() + .cancelTask(FragmentInstanceId.fromThrift(insId), true); + } + return true; + } + try (SyncDataNodeInternalServiceClient client = internalServiceClientManager.borrowClient(endPoint)) { client.cancelQuery(new TCancelQueryReq(queryId.getId(), unfinishedFIs, true));
