This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3a67962016 change state tracker from RPC to method call locally (#6898)
3a67962016 is described below
commit 3a679620165962a30fc0af63c42368189ab394fd
Author: Zhang.Jinrui <[email protected]>
AuthorDate: Fri Aug 5 09:00:09 2022 +0800
change state tracker from RPC to method call locally (#6898)
---
.../scheduler/AbstractFragInsStateTracker.java | 24 +++++++++++++++++-----
1 file changed, 19 insertions(+), 5 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
index 50031dc9b3..eb77ef7587 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
@@ -22,7 +22,9 @@ package org.apache.iotdb.db.mpp.plan.scheduler;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStateReq;
@@ -40,6 +42,8 @@ public abstract class AbstractFragInsStateTracker implements
IFragInstanceStateT
protected QueryStateMachine stateMachine;
protected ScheduledExecutorService scheduledExecutor;
protected List<FragmentInstance> instances;
+ protected final String localhostIpAddr;
+ protected final int localhostInternalPort;
private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
internalServiceClientManager;
@@ -53,6 +57,8 @@ public abstract class AbstractFragInsStateTracker implements
IFragInstanceStateT
this.scheduledExecutor = scheduledExecutor;
this.instances = instances;
this.internalServiceClientManager = internalServiceClientManager;
+ this.localhostIpAddr =
IoTDBDescriptor.getInstance().getConfig().getInternalAddress();
+ this.localhostInternalPort =
IoTDBDescriptor.getInstance().getConfig().getInternalPort();
}
public abstract void start();
@@ -62,14 +68,22 @@ public abstract class AbstractFragInsStateTracker
implements IFragInstanceStateT
protected FragmentInstanceState fetchState(FragmentInstance instance)
throws TException, IOException {
TEndPoint endPoint = instance.getHostDataNode().internalEndPoint;
- try (SyncDataNodeInternalServiceClient client =
- internalServiceClientManager.borrowClient(endPoint)) {
- TFragmentInstanceStateResp resp =
- client.fetchFragmentInstanceState(new
TFetchFragmentInstanceStateReq(getTId(instance)));
- return FragmentInstanceState.valueOf(resp.state);
+ if (isInstanceRunningLocally(endPoint)) {
+ return
FragmentInstanceManager.getInstance().getInstanceInfo(instance.getId()).getState();
+ } else {
+ try (SyncDataNodeInternalServiceClient client =
+ internalServiceClientManager.borrowClient(endPoint)) {
+ TFragmentInstanceStateResp resp =
+ client.fetchFragmentInstanceState(new
TFetchFragmentInstanceStateReq(getTId(instance)));
+ return FragmentInstanceState.valueOf(resp.state);
+ }
}
}
+ private boolean isInstanceRunningLocally(TEndPoint endPoint) {
+ return this.localhostIpAddr.equals(endPoint.getIp()) &&
localhostInternalPort == endPoint.port;
+ }
+
private TFragmentInstanceId getTId(FragmentInstance instance) {
return new TFragmentInstanceId(
instance.getId().getQueryId().getId(),