This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.1 by this push:
new 6f68ab5f16f Avoid rpc invoking for SimpleQueryTerminator when endpoint
is local address
6f68ab5f16f is described below
commit 6f68ab5f16fb7b13b45bfb459642b03827ab1e63
Author: Beyyes <[email protected]>
AuthorDate: Thu May 25 16:44:20 2023 +0800
Avoid rpc invoking for SimpleQueryTerminator when endpoint is local address
---
.../scheduler/FixedRateFragInsStateTracker.java | 14 +++---
.../plan/scheduler/IFragInstanceStateTracker.java | 4 +-
.../mpp/plan/scheduler/SimpleQueryTerminator.java | 52 ++++++++++++++++++----
3 files changed, 52 insertions(+), 18 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
index 8ad594abcc7..2f389ec1b1a 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
@@ -30,7 +30,6 @@ import
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.utils.SetThreadName;
-import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -81,18 +80,17 @@ public class FixedRateFragInsStateTracker extends
AbstractFragInsStateTracker {
}
@Override
- public synchronized List<TFragmentInstanceId> filterUnFinishedFIs(
- List<TFragmentInstanceId> instanceIds) {
- List<TFragmentInstanceId> res = new ArrayList<>();
+ public synchronized List<FragmentInstanceId> filterUnFinishedFIs(
+ List<FragmentInstanceId> instanceIds) {
+ List<FragmentInstanceId> res = new ArrayList<>();
if (instanceIds == null) {
return res;
}
- for (TFragmentInstanceId tFragmentInstanceId : instanceIds) {
- InstanceStateMetrics stateMetrics =
-
instanceStateMap.get(FragmentInstanceId.fromThrift(tFragmentInstanceId));
+ for (FragmentInstanceId fragmentInstanceId : instanceIds) {
+ InstanceStateMetrics stateMetrics =
instanceStateMap.get(fragmentInstanceId);
if (stateMetrics != null
&& (stateMetrics.lastState == null ||
!stateMetrics.lastState.isDone())) {
- res.add(tFragmentInstanceId);
+ res.add(fragmentInstanceId);
}
}
return res;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IFragInstanceStateTracker.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IFragInstanceStateTracker.java
index 796af2e5753..25fd262d80a 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IFragInstanceStateTracker.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IFragInstanceStateTracker.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.mpp.plan.scheduler;
-import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import java.util.List;
@@ -28,5 +28,5 @@ public interface IFragInstanceStateTracker {
void abort();
- List<TFragmentInstanceId> filterUnFinishedFIs(List<TFragmentInstanceId>
instanceIds);
+ List<FragmentInstanceId> filterUnFinishedFIs(List<FragmentInstanceId>
instanceIds);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
index 841abf4deae..8e8594c22d0 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
@@ -23,11 +23,13 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
-import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -50,7 +52,7 @@ public class SimpleQueryTerminator implements
IQueryTerminator {
private final IFragInstanceStateTracker stateTracker;
private List<TEndPoint> relatedHost;
- private Map<TEndPoint, List<TFragmentInstanceId>> ownedFragmentInstance;
+ private Map<TEndPoint, List<FragmentInstanceId>> ownedFragmentInstance;
private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
internalServiceClientManager;
@@ -96,14 +98,31 @@ public class SimpleQueryTerminator implements
IQueryTerminator {
boolean succeed = true;
for (TEndPoint endPoint : relatedHost) {
// we only send cancel query request if there is remaining unfinished FI
in that node
- List<TFragmentInstanceId> unfinishedFIs =
+ List<FragmentInstanceId> unfinishedFIs =
stateTracker.filterUnFinishedFIs(ownedFragmentInstance.get(endPoint));
if (unfinishedFIs.isEmpty()) {
continue;
}
+
+ String internalAddress =
IoTDBDescriptor.getInstance().getConfig().getInternalAddress();
+ int internalPort =
IoTDBDescriptor.getInstance().getConfig().getInternalPort();
+ if (internalAddress.equalsIgnoreCase(endPoint.getIp())
+ && internalPort == endPoint.getPort()) {
+ for (FragmentInstanceId insId : unfinishedFIs) {
+ FragmentInstanceManager.getInstance().cancelTask(insId, false);
+ }
+ continue;
+ }
+
try (SyncDataNodeInternalServiceClient client =
internalServiceClientManager.borrowClient(endPoint)) {
- client.cancelQuery(new TCancelQueryReq(queryId.getId(), unfinishedFIs,
false));
+ client.cancelQuery(
+ new TCancelQueryReq(
+ queryId.getId(),
+ unfinishedFIs.stream()
+ .map(FragmentInstanceId::toThrift)
+ .collect(Collectors.toList()),
+ false));
} catch (ClientManagerException e) {
logger.warn("can't connect to node {}", endPoint, e);
// we shouldn't return here and need to cancel queryTasks in other
nodes
@@ -121,14 +140,31 @@ public class SimpleQueryTerminator implements
IQueryTerminator {
boolean succeed = true;
for (TEndPoint endPoint : relatedHost) {
// we only send cancel query request if there is remaining unfinished FI
in that node
- List<TFragmentInstanceId> unfinishedFIs =
+ List<FragmentInstanceId> unfinishedFIs =
stateTracker.filterUnFinishedFIs(ownedFragmentInstance.get(endPoint));
if (unfinishedFIs.isEmpty()) {
continue;
}
+
+ String internalAddress =
IoTDBDescriptor.getInstance().getConfig().getInternalAddress();
+ int internalPort =
IoTDBDescriptor.getInstance().getConfig().getInternalPort();
+ if (internalAddress.equalsIgnoreCase(endPoint.getIp())
+ && internalPort == endPoint.getPort()) {
+ for (FragmentInstanceId insId : unfinishedFIs) {
+ FragmentInstanceManager.getInstance().cancelTask(insId, true);
+ }
+ continue;
+ }
+
try (SyncDataNodeInternalServiceClient client =
internalServiceClientManager.borrowClient(endPoint)) {
- client.cancelQuery(new TCancelQueryReq(queryId.getId(), unfinishedFIs,
true));
+ client.cancelQuery(
+ new TCancelQueryReq(
+ queryId.getId(),
+ unfinishedFIs.stream()
+ .map(FragmentInstanceId::toThrift)
+ .collect(Collectors.toList()),
+ true));
} catch (ClientManagerException e) {
logger.warn("can't connect to node {}", endPoint, e);
// we shouldn't return here and need to cancel queryTasks in other
nodes
@@ -149,11 +185,11 @@ public class SimpleQueryTerminator implements
IQueryTerminator {
.collect(Collectors.toList());
}
- private List<TFragmentInstanceId> getRelatedFragmentInstances(
+ private List<FragmentInstanceId> getRelatedFragmentInstances(
TEndPoint endPoint, List<FragmentInstance> fragmentInstances) {
return fragmentInstances.stream()
.filter(instance ->
instance.getHostDataNode().internalEndPoint.equals(endPoint))
- .map(instance -> instance.getId().toThrift())
+ .map(FragmentInstance::getId)
.collect(Collectors.toList());
}
}