This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e9e6505adf Add query redirect logic in planning side when retrying
(#7035)
e9e6505adf is described below
commit e9e6505adf55f9345c678cf88215a428dc77f43e
Author: Zhang.Jinrui <[email protected]>
AuthorDate: Thu Aug 18 14:40:49 2022 +0800
Add query redirect logic in planning side when retrying (#7035)
---
.../iotdb/db/mpp/common/MPPQueryContext.java | 25 ++++++++---
.../db/mpp/plan/analyze/ExpressionAnalyzer.java | 1 +
.../SimpleFragmentParallelPlanner.java | 48 +++++++++++++++++++---
.../db/mpp/plan/scheduler/ClusterScheduler.java | 11 ++---
.../scheduler/FragmentInstanceDispatcherImpl.java | 7 ++++
.../mpp/plan/scheduler/SimpleQueryTerminator.java | 12 +++++-
6 files changed, 86 insertions(+), 18 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
index 81e1330eca..91d4d85f89 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.db.mpp.common;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import java.util.LinkedList;
+import java.util.List;
+
/**
* This class is used to record the context of a query including QueryId,
query statement, session
* info and so on
@@ -37,8 +40,14 @@ public class MPPQueryContext {
private TEndPoint localInternalEndpoint;
private ResultNodeContext resultNodeContext;
+ // When some DataNode cannot be connected, its endPoint will be put
+ // in this list. And the following retry will avoid planning fragment
+ // onto this node.
+ private final List<TEndPoint> endPointBlackList;
+
public MPPQueryContext(QueryId queryId) {
this.queryId = queryId;
+ this.endPointBlackList = new LinkedList<>();
}
public MPPQueryContext(
@@ -47,8 +56,8 @@ public class MPPQueryContext {
SessionInfo session,
TEndPoint localDataBlockEndpoint,
TEndPoint localInternalEndpoint) {
+ this(queryId);
this.sql = sql;
- this.queryId = queryId;
this.session = session;
this.localDataBlockEndpoint = localDataBlockEndpoint;
this.localInternalEndpoint = localInternalEndpoint;
@@ -63,11 +72,7 @@ public class MPPQueryContext {
TEndPoint localInternalEndpoint,
long timeOut,
long startTime) {
- this.sql = sql;
- this.queryId = queryId;
- this.session = session;
- this.localDataBlockEndpoint = localDataBlockEndpoint;
- this.localInternalEndpoint = localInternalEndpoint;
+ this(sql, queryId, session, localDataBlockEndpoint, localInternalEndpoint);
this.resultNodeContext = new ResultNodeContext(queryId);
this.timeOut = timeOut;
this.startTime = startTime;
@@ -116,4 +121,12 @@ public class MPPQueryContext {
public void setStartTime(long startTime) {
this.startTime = startTime;
}
+
+ public void addFailedEndPoint(TEndPoint endPoint) {
+ this.endPointBlackList.add(endPoint);
+ }
+
+ public List<TEndPoint> getEndPointBlackList() {
+ return endPointBlackList;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java
index 9c46f1676d..84fa0e4f3e 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java
@@ -264,6 +264,7 @@ public class ExpressionAnalyzer {
List<PartialPath> actualPaths = new ArrayList<>();
if (rawPath.getFullPath().startsWith(SQLConstant.ROOT +
TsFileConstant.PATH_SEPARATOR)) {
actualPaths.add(rawPath);
+ patternTree.appendPathPattern(rawPath);
} else {
for (PartialPath prefixPath : prefixPaths) {
PartialPath concatPath = prefixPath.concatPath(rawPath);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index 854bb15467..e49b8b90e8 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.plan.planner.distribution;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
@@ -35,8 +36,12 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -45,6 +50,7 @@ import java.util.Map;
* into only one FragmentInstance.
*/
public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
+ private static final Logger logger =
LoggerFactory.getLogger(SimpleFragmentParallelPlanner.class);
private SubPlan subPlan;
private Analysis analysis;
@@ -122,16 +128,48 @@ public class SimpleFragmentParallelPlanner implements
IFragmentParallelPlaner {
// TODO: (Chen Rongzhao) need to make the values of ReadConsistencyLevel
as static variable or
// enums
boolean selectRandomDataNode = "weak".equals(readConsistencyLevel);
+
+ // When planning fragment onto specific DataNode, the DataNode whose
endPoint is in
+ // black list won't be considered because it may have connection issue now.
+ List<TDataNodeLocation> availableDataNodes =
+ filterAvailableTDataNode(regionReplicaSet.getDataNodeLocations());
+ if (availableDataNodes.size() == 0) {
+ String errorMsg =
+ String.format(
+ "all replicas for region[%s] are not available in these
DataNodes[%s]",
+ regionReplicaSet.getRegionId(),
regionReplicaSet.getDataNodeLocations());
+ throw new IllegalArgumentException(errorMsg);
+ }
+ if (regionReplicaSet.getDataNodeLocationsSize() !=
availableDataNodes.size()) {
+ logger.info("available replicas: " + availableDataNodes);
+ }
int targetIndex;
if (!selectRandomDataNode || queryContext.getSession() == null) {
targetIndex = 0;
} else {
- targetIndex =
- (int)
- (queryContext.getSession().getSessionId()
- % regionReplicaSet.getDataNodeLocationsSize());
+ targetIndex = (int) (queryContext.getSession().getSessionId() %
availableDataNodes.size());
+ }
+ return availableDataNodes.get(targetIndex);
+ }
+
+ private List<TDataNodeLocation> filterAvailableTDataNode(
+ List<TDataNodeLocation> originalDataNodeList) {
+ List<TDataNodeLocation> result = new LinkedList<>();
+ for (TDataNodeLocation dataNodeLocation : originalDataNodeList) {
+ if (isAvailableDataNode(dataNodeLocation)) {
+ result.add(dataNodeLocation);
+ }
+ }
+ return result;
+ }
+
+ private boolean isAvailableDataNode(TDataNodeLocation dataNodeLocation) {
+ for (TEndPoint endPoint : queryContext.getEndPointBlackList()) {
+ if (endPoint.getIp().equals(dataNodeLocation.internalEndPoint.getIp())) {
+ return false;
+ }
}
- return regionReplicaSet.getDataNodeLocations().get(targetIndex);
+ return true;
}
private void calculateNodeTopologyBetweenInstance() {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index 417b6b5e18..1df6c79d61 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -76,17 +76,18 @@ public class ClusterScheduler implements IScheduler {
this.queryType = queryType;
this.dispatcher =
new FragmentInstanceDispatcherImpl(
- queryType, executor, writeOperationExecutor,
internalServiceClientManager);
+ queryType,
+ queryContext,
+ executor,
+ writeOperationExecutor,
+ internalServiceClientManager);
if (queryType == QueryType.READ) {
this.stateTracker =
new FixedRateFragInsStateTracker(
stateMachine, scheduledExecutor, instances,
internalServiceClientManager);
this.queryTerminator =
new SimpleQueryTerminator(
- scheduledExecutor,
- queryContext.getQueryId(),
- instances,
- internalServiceClientManager);
+ scheduledExecutor, queryContext, instances,
internalServiceClientManager);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index f4a756d522..ab5b2e8909 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
@@ -66,6 +67,7 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
private final ExecutorService executor;
private final ExecutorService writeOperationExecutor;
private final QueryType type;
+ private final MPPQueryContext queryContext;
private final String localhostIpAddr;
private final int localhostInternalPort;
private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
@@ -73,10 +75,12 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
public FragmentInstanceDispatcherImpl(
QueryType type,
+ MPPQueryContext queryContext,
ExecutorService executor,
ExecutorService writeOperationExecutor,
IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
internalServiceClientManager) {
this.type = type;
+ this.queryContext = queryContext;
this.executor = executor;
this.writeOperationExecutor = writeOperationExecutor;
this.internalServiceClientManager = internalServiceClientManager;
@@ -187,6 +191,9 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
TSStatus status = new TSStatus();
status.setCode(TSStatusCode.SYNC_CONNECTION_EXCEPTION.getStatusCode());
status.setMessage("can't connect to node {}" + endPoint);
+ // If the DataNode cannot be connected, its endPoint will be put into
black list
+ // so that the following retry will avoid dispatching instance towards
this DataNode.
+ queryContext.addFailedEndPoint(endPoint);
throw new FragmentInstanceDispatchException(status);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
index 32b329d640..1bfd27438f 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.plan.scheduler;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
@@ -45,6 +46,7 @@ public class SimpleQueryTerminator implements
IQueryTerminator {
private static final long TERMINATION_GRACE_PERIOD_IN_MS = 1000L;
protected ScheduledExecutorService scheduledExecutor;
private final QueryId queryId;
+ private final MPPQueryContext queryContext;
private List<TEndPoint> relatedHost;
private Map<TEndPoint, List<TFragmentInstanceId>> ownedFragmentInstance;
@@ -53,11 +55,12 @@ public class SimpleQueryTerminator implements
IQueryTerminator {
public SimpleQueryTerminator(
ScheduledExecutorService scheduledExecutor,
- QueryId queryId,
+ MPPQueryContext queryContext,
List<FragmentInstance> fragmentInstances,
IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
internalServiceClientManager) {
this.scheduledExecutor = scheduledExecutor;
- this.queryId = queryId;
+ this.queryId = queryContext.getQueryId();
+ this.queryContext = queryContext;
this.internalServiceClientManager = internalServiceClientManager;
calculateParameter(fragmentInstances);
}
@@ -72,6 +75,11 @@ public class SimpleQueryTerminator implements
IQueryTerminator {
@Override
public Future<Boolean> terminate() {
+ // For the failure dispatch, the termination should not be triggered
because of connection issue
+ this.relatedHost =
+ this.relatedHost.stream()
+ .filter(endPoint ->
!queryContext.getEndPointBlackList().contains(endPoint))
+ .collect(Collectors.toList());
return scheduledExecutor.schedule(
this::syncTerminate, TERMINATION_GRACE_PERIOD_IN_MS,
TimeUnit.MILLISECONDS);
}