This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch rel/1.1_local in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a242691f46325651f1e7ce6e53e7fc4e9e4f8072 Author: Beyyes <[email protected]> AuthorDate: Wed May 24 19:40:10 2023 +0800 Avoid rpc invoking for SimpleQueryTerminator when endpoint is local address --- .../scheduler/FixedRateFragInsStateTracker.java | 14 +++--- .../plan/scheduler/IFragInstanceStateTracker.java | 4 +- .../mpp/plan/scheduler/SimpleQueryTerminator.java | 52 ++++++++++++++++++---- 3 files changed, 52 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java index 8ad594abcc7..2f389ec1b1a 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java @@ -30,7 +30,6 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState; import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance; import org.apache.iotdb.db.utils.SetThreadName; -import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -81,18 +80,17 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker { } @Override - public synchronized List<TFragmentInstanceId> filterUnFinishedFIs( - List<TFragmentInstanceId> instanceIds) { - List<TFragmentInstanceId> res = new ArrayList<>(); + public synchronized List<FragmentInstanceId> filterUnFinishedFIs( + List<FragmentInstanceId> instanceIds) { + List<FragmentInstanceId> res = new ArrayList<>(); if (instanceIds == null) { return res; } - for (TFragmentInstanceId tFragmentInstanceId : instanceIds) { - InstanceStateMetrics stateMetrics = - instanceStateMap.get(FragmentInstanceId.fromThrift(tFragmentInstanceId)); + for (FragmentInstanceId fragmentInstanceId : instanceIds) { + InstanceStateMetrics stateMetrics = instanceStateMap.get(fragmentInstanceId); if (stateMetrics != null && (stateMetrics.lastState == null || !stateMetrics.lastState.isDone())) { - res.add(tFragmentInstanceId); + res.add(fragmentInstanceId); } } return res; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IFragInstanceStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IFragInstanceStateTracker.java index 796af2e5753..25fd262d80a 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IFragInstanceStateTracker.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IFragInstanceStateTracker.java @@ -19,7 +19,7 @@ package org.apache.iotdb.db.mpp.plan.scheduler; -import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId; +import org.apache.iotdb.db.mpp.common.FragmentInstanceId; import java.util.List; @@ -28,5 +28,5 @@ public interface IFragInstanceStateTracker { void abort(); - List<TFragmentInstanceId> filterUnFinishedFIs(List<TFragmentInstanceId> instanceIds); + List<FragmentInstanceId> filterUnFinishedFIs(List<FragmentInstanceId> instanceIds); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java index 841abf4deae..8e8594c22d0 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java @@ -23,11 +23,13 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.mpp.common.FragmentInstanceId; import org.apache.iotdb.db.mpp.common.MPPQueryContext; import org.apache.iotdb.db.mpp.common.QueryId; +import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager; import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance; import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq; -import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -50,7 +52,7 @@ public class SimpleQueryTerminator implements IQueryTerminator { private final IFragInstanceStateTracker stateTracker; private List<TEndPoint> relatedHost; - private Map<TEndPoint, List<TFragmentInstanceId>> ownedFragmentInstance; + private Map<TEndPoint, List<FragmentInstanceId>> ownedFragmentInstance; private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager; @@ -96,14 +98,31 @@ public class SimpleQueryTerminator implements IQueryTerminator { boolean succeed = true; for (TEndPoint endPoint : relatedHost) { // we only send cancel query request if there is remaining unfinished FI in that node - List<TFragmentInstanceId> unfinishedFIs = + List<FragmentInstanceId> unfinishedFIs = stateTracker.filterUnFinishedFIs(ownedFragmentInstance.get(endPoint)); if (unfinishedFIs.isEmpty()) { continue; } + + String internalAddress = IoTDBDescriptor.getInstance().getConfig().getInternalAddress(); + int internalPort = IoTDBDescriptor.getInstance().getConfig().getInternalPort(); + if (internalAddress.equalsIgnoreCase(endPoint.getIp()) + && internalPort == endPoint.getPort()) { + for (FragmentInstanceId insId : unfinishedFIs) { + FragmentInstanceManager.getInstance().cancelTask(insId, false); + } + continue; + } + try (SyncDataNodeInternalServiceClient client = internalServiceClientManager.borrowClient(endPoint)) { - client.cancelQuery(new TCancelQueryReq(queryId.getId(), unfinishedFIs, false)); + client.cancelQuery( + new TCancelQueryReq( + queryId.getId(), + unfinishedFIs.stream() + .map(FragmentInstanceId::toThrift) + .collect(Collectors.toList()), + false)); } catch (ClientManagerException e) { logger.warn("can't connect to node {}", endPoint, e); // we shouldn't return here and need to cancel queryTasks in other nodes @@ -121,14 +140,31 @@ public class SimpleQueryTerminator implements IQueryTerminator { boolean succeed = true; for (TEndPoint endPoint : relatedHost) { // we only send cancel query request if there is remaining unfinished FI in that node - List<TFragmentInstanceId> unfinishedFIs = + List<FragmentInstanceId> unfinishedFIs = stateTracker.filterUnFinishedFIs(ownedFragmentInstance.get(endPoint)); if (unfinishedFIs.isEmpty()) { continue; } + + String internalAddress = IoTDBDescriptor.getInstance().getConfig().getInternalAddress(); + int internalPort = IoTDBDescriptor.getInstance().getConfig().getInternalPort(); + if (internalAddress.equalsIgnoreCase(endPoint.getIp()) + && internalPort == endPoint.getPort()) { + for (FragmentInstanceId insId : unfinishedFIs) { + FragmentInstanceManager.getInstance().cancelTask(insId, true); + } + continue; + } + try (SyncDataNodeInternalServiceClient client = internalServiceClientManager.borrowClient(endPoint)) { - client.cancelQuery(new TCancelQueryReq(queryId.getId(), unfinishedFIs, true)); + client.cancelQuery( + new TCancelQueryReq( + queryId.getId(), + unfinishedFIs.stream() + .map(FragmentInstanceId::toThrift) + .collect(Collectors.toList()), + true)); } catch (ClientManagerException e) { logger.warn("can't connect to node {}", endPoint, e); // we shouldn't return here and need to cancel queryTasks in other nodes @@ -149,11 +185,11 @@ public class SimpleQueryTerminator implements IQueryTerminator { .collect(Collectors.toList()); } - private List<TFragmentInstanceId> getRelatedFragmentInstances( + private List<FragmentInstanceId> getRelatedFragmentInstances( TEndPoint endPoint, List<FragmentInstance> fragmentInstances) { return fragmentInstances.stream() .filter(instance -> instance.getHostDataNode().internalEndPoint.equals(endPoint)) - .map(instance -> instance.getId().toThrift()) + .map(FragmentInstance::getId) .collect(Collectors.toList()); } }
