This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c31a35933b Add query retry logic to make the query won't fail when 
some DataNode shutdown (#6717)
c31a35933b is described below

commit c31a35933baef77bfc751eaebc51ed94b9adba01
Author: Zhang.Jinrui <[email protected]>
AuthorDate: Wed Jul 20 10:18:33 2022 +0800

    Add query retry logic to make the query won't fail when some DataNode 
shutdown (#6717)
---
 .../apache/iotdb/db/mpp/execution/QueryState.java  |  1 +
 .../iotdb/db/mpp/execution/QueryStateMachine.java  | 15 ++++
 .../db/mpp/plan/execution/QueryExecution.java      | 79 +++++++++++++++-------
 .../db/mpp/plan/scheduler/ClusterScheduler.java    |  4 +-
 4 files changed, 74 insertions(+), 25 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java
index 383875a022..d4ed15d9ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java
@@ -28,6 +28,7 @@ public enum QueryState {
   QUEUED(false),
   PLANNED(false),
   DISPATCHING(false),
+  RETRYING(false),
   RUNNING(false),
   FINISHED(true),
   CANCELED(true),
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index 801a1d3df2..42e793b41f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -100,6 +100,14 @@ public class QueryStateMachine {
     queryState.set(QueryState.DISPATCHING);
   }
 
+  public void transitionToRetrying(Throwable throwable) {
+    if (queryState.get().isDone()) {
+      return;
+    }
+    queryState.set(QueryState.RETRYING);
+    this.failureException = throwable;
+  }
+
   public void transitionToRunning() {
     queryState.set(QueryState.RUNNING);
   }
@@ -125,6 +133,13 @@ public class QueryStateMachine {
     queryState.set(QueryState.ABORTED);
   }
 
+  public void transitionToFailed() {
+    if (queryState.get().isDone()) {
+      return;
+    }
+    queryState.set(QueryState.FAILED);
+  }
+
   public void transitionToFailed(Throwable throwable) {
     if (queryState.get().isDone()) {
       return;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index fe6d48afa2..9c9d290052 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -88,14 +88,17 @@ public class QueryExecution implements IQueryExecution {
   private static final Logger logger = 
LoggerFactory.getLogger(QueryExecution.class);
 
   private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
-
+  private static final int MAX_RETRY_COUNT = 3;
+  private static final long RETRY_INTERVAL_IN_MS = 2000;
+  private int retryCount = 0;
   private final MPPQueryContext context;
   private IScheduler scheduler;
   private final QueryStateMachine stateMachine;
 
   private final List<PlanOptimizer> planOptimizers;
 
-  private final Analysis analysis;
+  private Statement rawStatement;
+  private Analysis analysis;
   private LogicalQueryPlan logicalPlan;
   private DistributedQueryPlan distributedPlan;
 
@@ -123,6 +126,7 @@ public class QueryExecution implements IQueryExecution {
       IPartitionFetcher partitionFetcher,
       ISchemaFetcher schemaFetcher,
       IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
internalServiceClientManager) {
+    this.rawStatement = statement;
     this.executor = executor;
     this.writeOperationExecutor = writeOperationExecutor;
     this.scheduledExecutor = scheduledExecutor;
@@ -139,6 +143,10 @@ public class QueryExecution implements IQueryExecution {
     stateMachine.addStateChangeListener(
         state -> {
           try (SetThreadName queryName = new 
SetThreadName(context.getQueryId().getId())) {
+            if (state == QueryState.RETRYING) {
+              retry();
+              return;
+            }
             if (!state.isDone()) {
               return;
             }
@@ -170,6 +178,33 @@ public class QueryExecution implements IQueryExecution {
     schedule();
   }
 
+  private void retry() {
+    if (retryCount >= MAX_RETRY_COUNT) {
+      logger.error("reach max retry count. transit query to failed");
+      stateMachine.transitionToFailed();
+      return;
+    }
+    logger.warn("error when executing query. {}", 
stateMachine.getFailureMessage());
+    // stop and clean up resources the QueryExecution used
+    this.stopAndCleanup();
+    logger.info("wait {}ms before retry...", RETRY_INTERVAL_IN_MS);
+    try {
+      Thread.sleep(RETRY_INTERVAL_IN_MS);
+    } catch (InterruptedException e) {
+      logger.error("interrupted when waiting retry");
+      Thread.currentThread().interrupt();
+    }
+    retryCount++;
+    logger.info("start to retry. Retry count is: {}", retryCount);
+
+    // force invalid PartitionCache
+    partitionFetcher.invalidAllCache();
+    // re-analyze the query
+    this.analysis = analyze(rawStatement, context, partitionFetcher, 
schemaFetcher);
+    // re-start the QueryExecution
+    this.start();
+  }
+
   private boolean skipExecute() {
     return analysis.isFinishQueryAfterAnalyze()
         || (context.getQueryType() == QueryType.READ && 
!analysis.hasDataSource());
@@ -388,27 +423,25 @@ public class QueryExecution implements IQueryExecution {
   }
 
   private void initResultHandle() {
-    if (this.resultHandle == null) {
-      TEndPoint upstreamEndPoint = 
context.getResultNodeContext().getUpStreamEndpoint();
-
-      this.resultHandle =
-          isSameNode(upstreamEndPoint)
-              ? MPPDataExchangeService.getInstance()
-                  .getMPPDataExchangeManager()
-                  .createLocalSourceHandle(
-                      
context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
-                      
context.getResultNodeContext().getVirtualResultNodeId().getId(),
-                      
context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(),
-                      stateMachine::transitionToFailed)
-              : MPPDataExchangeService.getInstance()
-                  .getMPPDataExchangeManager()
-                  .createSourceHandle(
-                      
context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
-                      
context.getResultNodeContext().getVirtualResultNodeId().getId(),
-                      upstreamEndPoint,
-                      
context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(),
-                      stateMachine::transitionToFailed);
-    }
+    TEndPoint upstreamEndPoint = 
context.getResultNodeContext().getUpStreamEndpoint();
+
+    this.resultHandle =
+        isSameNode(upstreamEndPoint)
+            ? MPPDataExchangeService.getInstance()
+                .getMPPDataExchangeManager()
+                .createLocalSourceHandle(
+                    
context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
+                    
context.getResultNodeContext().getVirtualResultNodeId().getId(),
+                    
context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(),
+                    stateMachine::transitionToFailed)
+            : MPPDataExchangeService.getInstance()
+                .getMPPDataExchangeManager()
+                .createSourceHandle(
+                    
context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
+                    
context.getResultNodeContext().getVirtualResultNodeId().getId(),
+                    upstreamEndPoint,
+                    
context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(),
+                    stateMachine::transitionToFailed);
   }
 
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index 9563357d12..bd0e166c36 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -109,11 +109,11 @@ public class ClusterScheduler implements IScheduler {
         return;
       }
     } catch (InterruptedException | ExecutionException e) {
-      // If the dispatch failed, we make the QueryState as failed, and return.
+      // If the dispatch request cannot be sent or TException is caught, we 
will retry this query.
       if (e instanceof InterruptedException) {
         Thread.currentThread().interrupt();
       }
-      stateMachine.transitionToFailed(e);
+      stateMachine.transitionToRetrying(e);
       return;
     }
 

Reply via email to