This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e9e6505adf Add query redirect logic in planning side when retrying 
(#7035)
e9e6505adf is described below

commit e9e6505adf55f9345c678cf88215a428dc77f43e
Author: Zhang.Jinrui <[email protected]>
AuthorDate: Thu Aug 18 14:40:49 2022 +0800

    Add query redirect logic in planning side when retrying (#7035)
---
 .../iotdb/db/mpp/common/MPPQueryContext.java       | 25 ++++++++---
 .../db/mpp/plan/analyze/ExpressionAnalyzer.java    |  1 +
 .../SimpleFragmentParallelPlanner.java             | 48 +++++++++++++++++++---
 .../db/mpp/plan/scheduler/ClusterScheduler.java    | 11 ++---
 .../scheduler/FragmentInstanceDispatcherImpl.java  |  7 ++++
 .../mpp/plan/scheduler/SimpleQueryTerminator.java  | 12 +++++-
 6 files changed, 86 insertions(+), 18 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
index 81e1330eca..91d4d85f89 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.db.mpp.common;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
 
+import java.util.LinkedList;
+import java.util.List;
+
 /**
  * This class is used to record the context of a query including QueryId, 
query statement, session
  * info and so on
@@ -37,8 +40,14 @@ public class MPPQueryContext {
   private TEndPoint localInternalEndpoint;
   private ResultNodeContext resultNodeContext;
 
+  // When some DataNode cannot be connected, its endPoint will be put
+  // in this list. And the following retry will avoid planning fragment
+  // onto this node.
+  private final List<TEndPoint> endPointBlackList;
+
   public MPPQueryContext(QueryId queryId) {
     this.queryId = queryId;
+    this.endPointBlackList = new LinkedList<>();
   }
 
   public MPPQueryContext(
@@ -47,8 +56,8 @@ public class MPPQueryContext {
       SessionInfo session,
       TEndPoint localDataBlockEndpoint,
       TEndPoint localInternalEndpoint) {
+    this(queryId);
     this.sql = sql;
-    this.queryId = queryId;
     this.session = session;
     this.localDataBlockEndpoint = localDataBlockEndpoint;
     this.localInternalEndpoint = localInternalEndpoint;
@@ -63,11 +72,7 @@ public class MPPQueryContext {
       TEndPoint localInternalEndpoint,
       long timeOut,
       long startTime) {
-    this.sql = sql;
-    this.queryId = queryId;
-    this.session = session;
-    this.localDataBlockEndpoint = localDataBlockEndpoint;
-    this.localInternalEndpoint = localInternalEndpoint;
+    this(sql, queryId, session, localDataBlockEndpoint, localInternalEndpoint);
     this.resultNodeContext = new ResultNodeContext(queryId);
     this.timeOut = timeOut;
     this.startTime = startTime;
@@ -116,4 +121,12 @@ public class MPPQueryContext {
   public void setStartTime(long startTime) {
     this.startTime = startTime;
   }
+
+  public void addFailedEndPoint(TEndPoint endPoint) {
+    this.endPointBlackList.add(endPoint);
+  }
+
+  public List<TEndPoint> getEndPointBlackList() {
+    return endPointBlackList;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java
index 9c46f1676d..84fa0e4f3e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java
@@ -264,6 +264,7 @@ public class ExpressionAnalyzer {
       List<PartialPath> actualPaths = new ArrayList<>();
       if (rawPath.getFullPath().startsWith(SQLConstant.ROOT + 
TsFileConstant.PATH_SEPARATOR)) {
         actualPaths.add(rawPath);
+        patternTree.appendPathPattern(rawPath);
       } else {
         for (PartialPath prefixPath : prefixPaths) {
           PartialPath concatPath = prefixPath.concatPath(rawPath);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index 854bb15467..e49b8b90e8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.mpp.plan.planner.distribution;
 
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
@@ -35,8 +36,12 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -45,6 +50,7 @@ import java.util.Map;
  * into only one FragmentInstance.
  */
 public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
+  private static final Logger logger = 
LoggerFactory.getLogger(SimpleFragmentParallelPlanner.class);
 
   private SubPlan subPlan;
   private Analysis analysis;
@@ -122,16 +128,48 @@ public class SimpleFragmentParallelPlanner implements 
IFragmentParallelPlaner {
     // TODO: (Chen Rongzhao) need to make the values of ReadConsistencyLevel 
as static variable or
     // enums
     boolean selectRandomDataNode = "weak".equals(readConsistencyLevel);
+
+    // When planning fragment onto specific DataNode, the DataNode whose 
endPoint is in
+    // black list won't be considered because it may have connection issue now.
+    List<TDataNodeLocation> availableDataNodes =
+        filterAvailableTDataNode(regionReplicaSet.getDataNodeLocations());
+    if (availableDataNodes.size() == 0) {
+      String errorMsg =
+          String.format(
+              "all replicas for region[%s] are not available in these 
DataNodes[%s]",
+              regionReplicaSet.getRegionId(), 
regionReplicaSet.getDataNodeLocations());
+      throw new IllegalArgumentException(errorMsg);
+    }
+    if (regionReplicaSet.getDataNodeLocationsSize() != 
availableDataNodes.size()) {
+      logger.info("available replicas: " + availableDataNodes);
+    }
     int targetIndex;
     if (!selectRandomDataNode || queryContext.getSession() == null) {
       targetIndex = 0;
     } else {
-      targetIndex =
-          (int)
-              (queryContext.getSession().getSessionId()
-                  % regionReplicaSet.getDataNodeLocationsSize());
+      targetIndex = (int) (queryContext.getSession().getSessionId() % 
availableDataNodes.size());
+    }
+    return availableDataNodes.get(targetIndex);
+  }
+
+  private List<TDataNodeLocation> filterAvailableTDataNode(
+      List<TDataNodeLocation> originalDataNodeList) {
+    List<TDataNodeLocation> result = new LinkedList<>();
+    for (TDataNodeLocation dataNodeLocation : originalDataNodeList) {
+      if (isAvailableDataNode(dataNodeLocation)) {
+        result.add(dataNodeLocation);
+      }
+    }
+    return result;
+  }
+
+  private boolean isAvailableDataNode(TDataNodeLocation dataNodeLocation) {
+    for (TEndPoint endPoint : queryContext.getEndPointBlackList()) {
+      if (endPoint.getIp().equals(dataNodeLocation.internalEndPoint.getIp())) {
+        return false;
+      }
     }
-    return regionReplicaSet.getDataNodeLocations().get(targetIndex);
+    return true;
   }
 
   private void calculateNodeTopologyBetweenInstance() {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index 417b6b5e18..1df6c79d61 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -76,17 +76,18 @@ public class ClusterScheduler implements IScheduler {
     this.queryType = queryType;
     this.dispatcher =
         new FragmentInstanceDispatcherImpl(
-            queryType, executor, writeOperationExecutor, 
internalServiceClientManager);
+            queryType,
+            queryContext,
+            executor,
+            writeOperationExecutor,
+            internalServiceClientManager);
     if (queryType == QueryType.READ) {
       this.stateTracker =
           new FixedRateFragInsStateTracker(
               stateMachine, scheduledExecutor, instances, 
internalServiceClientManager);
       this.queryTerminator =
           new SimpleQueryTerminator(
-              scheduledExecutor,
-              queryContext.getQueryId(),
-              instances,
-              internalServiceClientManager);
+              scheduledExecutor, queryContext, instances, 
internalServiceClientManager);
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index f4a756d522..ab5b2e8909 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
 import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
 import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
@@ -66,6 +67,7 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
   private final ExecutorService executor;
   private final ExecutorService writeOperationExecutor;
   private final QueryType type;
+  private final MPPQueryContext queryContext;
   private final String localhostIpAddr;
   private final int localhostInternalPort;
   private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
@@ -73,10 +75,12 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
 
   public FragmentInstanceDispatcherImpl(
       QueryType type,
+      MPPQueryContext queryContext,
       ExecutorService executor,
       ExecutorService writeOperationExecutor,
       IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
internalServiceClientManager) {
     this.type = type;
+    this.queryContext = queryContext;
     this.executor = executor;
     this.writeOperationExecutor = writeOperationExecutor;
     this.internalServiceClientManager = internalServiceClientManager;
@@ -187,6 +191,9 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
       TSStatus status = new TSStatus();
       status.setCode(TSStatusCode.SYNC_CONNECTION_EXCEPTION.getStatusCode());
       status.setMessage("can't connect to node {}" + endPoint);
+      // If the DataNode cannot be connected, its endPoint will be put into 
black list
+      // so that the following retry will avoid dispatching instance towards 
this DataNode.
+      queryContext.addFailedEndPoint(endPoint);
       throw new FragmentInstanceDispatchException(status);
     }
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
index 32b329d640..1bfd27438f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.plan.scheduler;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
@@ -45,6 +46,7 @@ public class SimpleQueryTerminator implements 
IQueryTerminator {
   private static final long TERMINATION_GRACE_PERIOD_IN_MS = 1000L;
   protected ScheduledExecutorService scheduledExecutor;
   private final QueryId queryId;
+  private final MPPQueryContext queryContext;
   private List<TEndPoint> relatedHost;
   private Map<TEndPoint, List<TFragmentInstanceId>> ownedFragmentInstance;
 
@@ -53,11 +55,12 @@ public class SimpleQueryTerminator implements 
IQueryTerminator {
 
   public SimpleQueryTerminator(
       ScheduledExecutorService scheduledExecutor,
-      QueryId queryId,
+      MPPQueryContext queryContext,
       List<FragmentInstance> fragmentInstances,
       IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
internalServiceClientManager) {
     this.scheduledExecutor = scheduledExecutor;
-    this.queryId = queryId;
+    this.queryId = queryContext.getQueryId();
+    this.queryContext = queryContext;
     this.internalServiceClientManager = internalServiceClientManager;
     calculateParameter(fragmentInstances);
   }
@@ -72,6 +75,11 @@ public class SimpleQueryTerminator implements 
IQueryTerminator {
 
   @Override
   public Future<Boolean> terminate() {
+    // For the failure dispatch, the termination should not be triggered 
because of connection issue
+    this.relatedHost =
+        this.relatedHost.stream()
+            .filter(endPoint -> 
!queryContext.getEndPointBlackList().contains(endPoint))
+            .collect(Collectors.toList());
     return scheduledExecutor.schedule(
         this::syncTerminate, TERMINATION_GRACE_PERIOD_IN_MS, 
TimeUnit.MILLISECONDS);
   }

Reply via email to