This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/query_retry_redirect in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 204a445aed4a602dcf315f1fb73d383e0eb5b12f Author: Jinrui.Zhang <[email protected]> AuthorDate: Tue Aug 16 16:25:00 2022 +0800 add redirect available node policy when retry --- .../iotdb/db/mpp/common/MPPQueryContext.java | 22 ++++++++--- .../SimpleFragmentParallelPlanner.java | 46 +++++++++++++++++++--- .../db/mpp/plan/scheduler/ClusterScheduler.java | 6 ++- .../scheduler/FragmentInstanceDispatcherImpl.java | 5 +++ 4 files changed, 67 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java index 81e1330eca..08da8f11e5 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java @@ -21,6 +21,9 @@ package org.apache.iotdb.db.mpp.common; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.db.mpp.plan.analyze.QueryType; +import java.util.LinkedList; +import java.util.List; + /** * This class is used to record the context of a query including QueryId, query statement, session * info and so on @@ -37,8 +40,11 @@ public class MPPQueryContext { private TEndPoint localInternalEndpoint; private ResultNodeContext resultNodeContext; + private final List<TEndPoint> endPointBlackList; + public MPPQueryContext(QueryId queryId) { this.queryId = queryId; + this.endPointBlackList = new LinkedList<>(); } public MPPQueryContext( @@ -47,8 +53,8 @@ public class MPPQueryContext { SessionInfo session, TEndPoint localDataBlockEndpoint, TEndPoint localInternalEndpoint) { + this(queryId); this.sql = sql; - this.queryId = queryId; this.session = session; this.localDataBlockEndpoint = localDataBlockEndpoint; this.localInternalEndpoint = localInternalEndpoint; @@ -63,11 +69,7 @@ public class MPPQueryContext { TEndPoint localInternalEndpoint, long timeOut, long startTime) { - this.sql = sql; - this.queryId = queryId; - this.session = session; - this.localDataBlockEndpoint = localDataBlockEndpoint; - this.localInternalEndpoint = localInternalEndpoint; + this(sql, queryId, session, localDataBlockEndpoint, localInternalEndpoint); this.resultNodeContext = new ResultNodeContext(queryId); this.timeOut = timeOut; this.startTime = startTime; @@ -116,4 +118,12 @@ public class MPPQueryContext { public void setStartTime(long startTime) { this.startTime = startTime; } + + public void addFailedEndPoint(TEndPoint endPoint) { + this.endPointBlackList.add(endPoint); + } + + public List<TEndPoint> getEndPointBlackList() { + return endPointBlackList; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java index 854bb15467..7f00ffd9e4 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.mpp.plan.planner.distribution; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.mpp.common.MPPQueryContext; @@ -35,8 +36,12 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode; import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -45,6 +50,7 @@ import java.util.Map; * into only one FragmentInstance. */ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner { + private static final Logger logger = LoggerFactory.getLogger(SimpleFragmentParallelPlanner.class); private SubPlan subPlan; private Analysis analysis; @@ -122,16 +128,46 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner { // TODO: (Chen Rongzhao) need to make the values of ReadConsistencyLevel as static variable or // enums boolean selectRandomDataNode = "weak".equals(readConsistencyLevel); + + List<TDataNodeLocation> availableDataNodes = + filterAvailableTDataNode(regionReplicaSet.getDataNodeLocations()); + if (availableDataNodes.size() == 0) { + String errorMsg = + String.format( + "all replicas for region[%s] are not available in these DataNodes[%s]", + regionReplicaSet.getRegionId(), regionReplicaSet.getDataNodeLocations()); + throw new IllegalArgumentException(errorMsg); + } + if (regionReplicaSet.getDataNodeLocationsSize() != availableDataNodes.size()) { + logger.info("Available replicas: " + availableDataNodes); + } int targetIndex; if (!selectRandomDataNode || queryContext.getSession() == null) { targetIndex = 0; } else { - targetIndex = - (int) - (queryContext.getSession().getSessionId() - % regionReplicaSet.getDataNodeLocationsSize()); + targetIndex = (int) (queryContext.getSession().getSessionId() % availableDataNodes.size()); + } + return availableDataNodes.get(targetIndex); + } + + private List<TDataNodeLocation> filterAvailableTDataNode( + List<TDataNodeLocation> originalDataNodeList) { + List<TDataNodeLocation> result = new LinkedList<>(); + for (TDataNodeLocation dataNodeLocation : originalDataNodeList) { + if (isAvailableDataNode(dataNodeLocation)) { + result.add(dataNodeLocation); + } + } + return result; + } + + private boolean isAvailableDataNode(TDataNodeLocation dataNodeLocation) { + for (TEndPoint endPoint : queryContext.getEndPointBlackList()) { + if (endPoint.getIp().equals(dataNodeLocation.internalEndPoint.getIp())) { + return false; + } } - return regionReplicaSet.getDataNodeLocations().get(targetIndex); + return true; } private void calculateNodeTopologyBetweenInstance() { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java index 417b6b5e18..10c3139071 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java @@ -76,7 +76,11 @@ public class ClusterScheduler implements IScheduler { this.queryType = queryType; this.dispatcher = new FragmentInstanceDispatcherImpl( - queryType, executor, writeOperationExecutor, internalServiceClientManager); + queryType, + queryContext, + executor, + writeOperationExecutor, + internalServiceClientManager); if (queryType == QueryType.READ) { this.stateTracker = new FixedRateFragInsStateTracker( diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java index f4a756d522..7849cc4201 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -32,6 +32,7 @@ import org.apache.iotdb.db.consensus.DataRegionConsensusImpl; import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl; import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException; import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.mpp.common.MPPQueryContext; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo; import org.apache.iotdb.db.mpp.plan.analyze.QueryType; import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator; @@ -66,6 +67,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { private final ExecutorService executor; private final ExecutorService writeOperationExecutor; private final QueryType type; + private final MPPQueryContext queryContext; private final String localhostIpAddr; private final int localhostInternalPort; private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> @@ -73,10 +75,12 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { public FragmentInstanceDispatcherImpl( QueryType type, + MPPQueryContext queryContext, ExecutorService executor, ExecutorService writeOperationExecutor, IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) { this.type = type; + this.queryContext = queryContext; this.executor = executor; this.writeOperationExecutor = writeOperationExecutor; this.internalServiceClientManager = internalServiceClientManager; @@ -187,6 +191,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { TSStatus status = new TSStatus(); status.setCode(TSStatusCode.SYNC_CONNECTION_EXCEPTION.getStatusCode()); status.setMessage("can't connect to node {}" + endPoint); + queryContext.addFailedEndPoint(endPoint); throw new FragmentInstanceDispatchException(status); } }
