This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/discard_rpc_new_server in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit de1d165b4838fd87c49d330e55b1af6fe3f484c7 Author: Jinrui.Zhang <[email protected]> AuthorDate: Thu Aug 4 23:30:16 2022 +0800 change state tracker from RPC to method call locally --- .../scheduler/AbstractFragInsStateTracker.java | 24 +++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java index 50031dc9b3..eb77ef7587 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java @@ -22,7 +22,9 @@ package org.apache.iotdb.db.mpp.plan.scheduler; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.mpp.execution.QueryStateMachine; +import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState; import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance; import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStateReq; @@ -40,6 +42,8 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT protected QueryStateMachine stateMachine; protected ScheduledExecutorService scheduledExecutor; protected List<FragmentInstance> instances; + protected final String localhostIpAddr; + protected final int localhostInternalPort; private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager; @@ -53,6 +57,8 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT this.scheduledExecutor = scheduledExecutor; this.instances = instances; this.internalServiceClientManager = internalServiceClientManager; + this.localhostIpAddr = IoTDBDescriptor.getInstance().getConfig().getInternalAddress(); + this.localhostInternalPort = IoTDBDescriptor.getInstance().getConfig().getInternalPort(); } public abstract void start(); @@ -62,14 +68,22 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT protected FragmentInstanceState fetchState(FragmentInstance instance) throws TException, IOException { TEndPoint endPoint = instance.getHostDataNode().internalEndPoint; - try (SyncDataNodeInternalServiceClient client = - internalServiceClientManager.borrowClient(endPoint)) { - TFragmentInstanceStateResp resp = - client.fetchFragmentInstanceState(new TFetchFragmentInstanceStateReq(getTId(instance))); - return FragmentInstanceState.valueOf(resp.state); + if (isInstanceRunningLocally(endPoint)) { + return FragmentInstanceManager.getInstance().getInstanceInfo(instance.getId()).getState(); + } else { + try (SyncDataNodeInternalServiceClient client = + internalServiceClientManager.borrowClient(endPoint)) { + TFragmentInstanceStateResp resp = + client.fetchFragmentInstanceState(new TFetchFragmentInstanceStateReq(getTId(instance))); + return FragmentInstanceState.valueOf(resp.state); + } } } + private boolean isInstanceRunningLocally(TEndPoint endPoint) { + return this.localhostIpAddr.equals(endPoint.getIp()) && localhostInternalPort == endPoint.port; + } + private TFragmentInstanceId getTId(FragmentInstance instance) { return new TFragmentInstanceId( instance.getId().getQueryId().getId(),
