This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.1 by this push:
     new 6f68ab5f16f Avoid rpc invoking for SimpleQueryTerminator when endpoint 
is local address
6f68ab5f16f is described below

commit 6f68ab5f16fb7b13b45bfb459642b03827ab1e63
Author: Beyyes <[email protected]>
AuthorDate: Thu May 25 16:44:20 2023 +0800

    Avoid rpc invoking for SimpleQueryTerminator when endpoint is local address
---
 .../scheduler/FixedRateFragInsStateTracker.java    | 14 +++---
 .../plan/scheduler/IFragInstanceStateTracker.java  |  4 +-
 .../mpp/plan/scheduler/SimpleQueryTerminator.java  | 52 ++++++++++++++++++----
 3 files changed, 52 insertions(+), 18 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
index 8ad594abcc7..2f389ec1b1a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
@@ -30,7 +30,6 @@ import 
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.utils.SetThreadName;
-import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -81,18 +80,17 @@ public class FixedRateFragInsStateTracker extends 
AbstractFragInsStateTracker {
   }
 
   @Override
-  public synchronized List<TFragmentInstanceId> filterUnFinishedFIs(
-      List<TFragmentInstanceId> instanceIds) {
-    List<TFragmentInstanceId> res = new ArrayList<>();
+  public synchronized List<FragmentInstanceId> filterUnFinishedFIs(
+      List<FragmentInstanceId> instanceIds) {
+    List<FragmentInstanceId> res = new ArrayList<>();
     if (instanceIds == null) {
       return res;
     }
-    for (TFragmentInstanceId tFragmentInstanceId : instanceIds) {
-      InstanceStateMetrics stateMetrics =
-          
instanceStateMap.get(FragmentInstanceId.fromThrift(tFragmentInstanceId));
+    for (FragmentInstanceId fragmentInstanceId : instanceIds) {
+      InstanceStateMetrics stateMetrics = 
instanceStateMap.get(fragmentInstanceId);
       if (stateMetrics != null
           && (stateMetrics.lastState == null || 
!stateMetrics.lastState.isDone())) {
-        res.add(tFragmentInstanceId);
+        res.add(fragmentInstanceId);
       }
     }
     return res;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IFragInstanceStateTracker.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IFragInstanceStateTracker.java
index 796af2e5753..25fd262d80a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IFragInstanceStateTracker.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IFragInstanceStateTracker.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.mpp.plan.scheduler;
 
-import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 
 import java.util.List;
 
@@ -28,5 +28,5 @@ public interface IFragInstanceStateTracker {
 
   void abort();
 
-  List<TFragmentInstanceId> filterUnFinishedFIs(List<TFragmentInstanceId> 
instanceIds);
+  List<FragmentInstanceId> filterUnFinishedFIs(List<FragmentInstanceId> 
instanceIds);
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
index 841abf4deae..8e8594c22d0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
@@ -23,11 +23,13 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
-import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -50,7 +52,7 @@ public class SimpleQueryTerminator implements 
IQueryTerminator {
 
   private final IFragInstanceStateTracker stateTracker;
   private List<TEndPoint> relatedHost;
-  private Map<TEndPoint, List<TFragmentInstanceId>> ownedFragmentInstance;
+  private Map<TEndPoint, List<FragmentInstanceId>> ownedFragmentInstance;
 
   private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
       internalServiceClientManager;
@@ -96,14 +98,31 @@ public class SimpleQueryTerminator implements 
IQueryTerminator {
     boolean succeed = true;
     for (TEndPoint endPoint : relatedHost) {
       // we only send cancel query request if there is remaining unfinished FI 
in that node
-      List<TFragmentInstanceId> unfinishedFIs =
+      List<FragmentInstanceId> unfinishedFIs =
           
stateTracker.filterUnFinishedFIs(ownedFragmentInstance.get(endPoint));
       if (unfinishedFIs.isEmpty()) {
         continue;
       }
+
+      String internalAddress = 
IoTDBDescriptor.getInstance().getConfig().getInternalAddress();
+      int internalPort = 
IoTDBDescriptor.getInstance().getConfig().getInternalPort();
+      if (internalAddress.equalsIgnoreCase(endPoint.getIp())
+          && internalPort == endPoint.getPort()) {
+        for (FragmentInstanceId insId : unfinishedFIs) {
+          FragmentInstanceManager.getInstance().cancelTask(insId, false);
+        }
+        continue;
+      }
+
       try (SyncDataNodeInternalServiceClient client =
           internalServiceClientManager.borrowClient(endPoint)) {
-        client.cancelQuery(new TCancelQueryReq(queryId.getId(), unfinishedFIs, 
false));
+        client.cancelQuery(
+            new TCancelQueryReq(
+                queryId.getId(),
+                unfinishedFIs.stream()
+                    .map(FragmentInstanceId::toThrift)
+                    .collect(Collectors.toList()),
+                false));
       } catch (ClientManagerException e) {
         logger.warn("can't connect to node {}", endPoint, e);
         // we shouldn't return here and need to cancel queryTasks in other 
nodes
@@ -121,14 +140,31 @@ public class SimpleQueryTerminator implements 
IQueryTerminator {
     boolean succeed = true;
     for (TEndPoint endPoint : relatedHost) {
       // we only send cancel query request if there is remaining unfinished FI 
in that node
-      List<TFragmentInstanceId> unfinishedFIs =
+      List<FragmentInstanceId> unfinishedFIs =
           
stateTracker.filterUnFinishedFIs(ownedFragmentInstance.get(endPoint));
       if (unfinishedFIs.isEmpty()) {
         continue;
       }
+
+      String internalAddress = 
IoTDBDescriptor.getInstance().getConfig().getInternalAddress();
+      int internalPort = 
IoTDBDescriptor.getInstance().getConfig().getInternalPort();
+      if (internalAddress.equalsIgnoreCase(endPoint.getIp())
+          && internalPort == endPoint.getPort()) {
+        for (FragmentInstanceId insId : unfinishedFIs) {
+          FragmentInstanceManager.getInstance().cancelTask(insId, true);
+        }
+        continue;
+      }
+
       try (SyncDataNodeInternalServiceClient client =
           internalServiceClientManager.borrowClient(endPoint)) {
-        client.cancelQuery(new TCancelQueryReq(queryId.getId(), unfinishedFIs, 
true));
+        client.cancelQuery(
+            new TCancelQueryReq(
+                queryId.getId(),
+                unfinishedFIs.stream()
+                    .map(FragmentInstanceId::toThrift)
+                    .collect(Collectors.toList()),
+                true));
       } catch (ClientManagerException e) {
         logger.warn("can't connect to node {}", endPoint, e);
         // we shouldn't return here and need to cancel queryTasks in other 
nodes
@@ -149,11 +185,11 @@ public class SimpleQueryTerminator implements 
IQueryTerminator {
         .collect(Collectors.toList());
   }
 
-  private List<TFragmentInstanceId> getRelatedFragmentInstances(
+  private List<FragmentInstanceId> getRelatedFragmentInstances(
       TEndPoint endPoint, List<FragmentInstance> fragmentInstances) {
     return fragmentInstances.stream()
         .filter(instance -> 
instance.getHostDataNode().internalEndPoint.equals(endPoint))
-        .map(instance -> instance.getId().toThrift())
+        .map(FragmentInstance::getId)
         .collect(Collectors.toList());
   }
 }

Reply via email to