This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d6723f4d54 [IoTDB-4118] leverage client RPC to do the retry logic 
rather than threadPool (#6978)
d6723f4d54 is described below

commit d6723f4d5453049c59d9c59070b73af10d2037f3
Author: Zhang.Jinrui <[email protected]>
AuthorDate: Fri Aug 12 17:40:43 2022 +0800

    [IoTDB-4118] leverage client RPC to do the retry logic rather than 
threadPool (#6978)
---
 .../apache/iotdb/db/mpp/execution/QueryState.java  |  2 +-
 .../iotdb/db/mpp/execution/QueryStateMachine.java  |  8 ++++++--
 .../db/mpp/plan/execution/QueryExecution.java      | 23 ++++++++++++++--------
 .../db/mpp/plan/scheduler/ClusterScheduler.java    |  2 +-
 4 files changed, 23 insertions(+), 12 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java
index d4ed15d9ea..8cd69763c2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java
@@ -28,7 +28,7 @@ public enum QueryState {
   QUEUED(false),
   PLANNED(false),
   DISPATCHING(false),
-  RETRYING(false),
+  PENDING_RETRY(false),
   RUNNING(false),
   FINISHED(true),
   CANCELED(true),
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index 44f16e2ad9..703255afe7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -92,6 +92,10 @@ public class QueryStateMachine {
     return queryState.get();
   }
 
+  public void transitionToQueued() {
+    queryState.set(QueryState.QUEUED);
+  }
+
   public void transitionToPlanned() {
     queryState.set(QueryState.PLANNED);
   }
@@ -100,12 +104,12 @@ public class QueryStateMachine {
     queryState.set(QueryState.DISPATCHING);
   }
 
-  public void transitionToRetrying(TSStatus failureStatus) {
+  public void transitionToPendingRetry(TSStatus failureStatus) {
     if (queryState.get().isDone()) {
       return;
     }
     this.failureStatus = failureStatus;
-    queryState.set(QueryState.RETRYING);
+    queryState.set(QueryState.PENDING_RETRY);
   }
 
   public void transitionToRunning() {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 842352a7b6..4a6ac5fdc0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -144,10 +144,6 @@ public class QueryExecution implements IQueryExecution {
     stateMachine.addStateChangeListener(
         state -> {
           try (SetThreadName queryName = new 
SetThreadName(context.getQueryId().getId())) {
-            if (state == QueryState.RETRYING) {
-              retry();
-              return;
-            }
             if (!state.isDone()) {
               return;
             }
@@ -179,17 +175,18 @@ public class QueryExecution implements IQueryExecution {
 
     doLogicalPlan();
     doDistributedPlan();
+    stateMachine.transitionToPlanned();
     if (context.getQueryType() == QueryType.READ) {
       initResultHandle();
     }
     schedule();
   }
 
-  private void retry() {
+  private ExecutionResult retry() {
     if (retryCount >= MAX_RETRY_COUNT) {
       logger.error("reach max retry count. transit query to failed");
       stateMachine.transitionToFailed();
-      return;
+      return getStatus();
     }
     logger.warn("error when executing query. {}", 
stateMachine.getFailureMessage());
     // stop and clean up resources the QueryExecution used
@@ -203,13 +200,14 @@ public class QueryExecution implements IQueryExecution {
     }
     retryCount++;
     logger.info("start to retry. Retry count is: {}", retryCount);
-
+    stateMachine.transitionToQueued();
     // force invalid PartitionCache
     partitionFetcher.invalidAllCache();
     // re-analyze the query
     this.analysis = analyze(rawStatement, context, partitionFetcher, 
schemaFetcher);
     // re-start the QueryExecution
     this.start();
+    return getStatus();
   }
 
   private boolean skipExecute() {
@@ -407,11 +405,20 @@ public class QueryExecution implements IQueryExecution {
       SettableFuture<QueryState> future = SettableFuture.create();
       stateMachine.addStateChangeListener(
           state -> {
-            if (state == QueryState.RUNNING || state.isDone()) {
+            if (state == QueryState.RUNNING
+                || state.isDone()
+                || state == QueryState.PENDING_RETRY) {
               future.set(state);
             }
           });
       QueryState state = future.get();
+      if (state == QueryState.PENDING_RETRY) {
+        // That we put retry() here is aimed to leverage the ClientRPC thread 
rather than
+        // create another new thread to do the retry() logic.
+        // This way will lead to recursive call because retry() calls 
getStatus() inside.
+        // The max depths of recursive call is equal to the max retry count.
+        return retry();
+      }
       // TODO: (xingtanzjr) use more TSStatusCode if the QueryState isn't 
FINISHED
       return getExecutionResult(state);
     } catch (InterruptedException | ExecutionException e) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index e9829cc637..417b6b5e18 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -106,7 +106,7 @@ public class ClusterScheduler implements IScheduler {
       FragInstanceDispatchResult result = dispatchResultFuture.get();
       if (!result.isSuccessful()) {
         if (needRetry(result.getFailureStatus())) {
-          stateMachine.transitionToRetrying(result.getFailureStatus());
+          stateMachine.transitionToPendingRetry(result.getFailureStatus());
         } else {
           stateMachine.transitionToFailed(result.getFailureStatus());
         }

Reply via email to