This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/fix_retry_deadlock in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bffa8d12dc43feb0471c5b707c730149bcb2edbc Author: Jinrui.Zhang <[email protected]> AuthorDate: Fri Aug 12 14:09:41 2022 +0800 leverage client RPC to do the retry logic rather than threadPool --- .../apache/iotdb/db/mpp/execution/QueryState.java | 2 +- .../iotdb/db/mpp/execution/QueryStateMachine.java | 8 ++++++-- .../db/mpp/plan/execution/QueryExecution.java | 23 ++++++++++++++-------- .../db/mpp/plan/scheduler/ClusterScheduler.java | 2 +- 4 files changed, 23 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java index d4ed15d9ea..8cd69763c2 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java @@ -28,7 +28,7 @@ public enum QueryState { QUEUED(false), PLANNED(false), DISPATCHING(false), - RETRYING(false), + PENDING_RETRY(false), RUNNING(false), FINISHED(true), CANCELED(true), diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java index 44f16e2ad9..703255afe7 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java @@ -92,6 +92,10 @@ public class QueryStateMachine { return queryState.get(); } + public void transitionToQueued() { + queryState.set(QueryState.QUEUED); + } + public void transitionToPlanned() { queryState.set(QueryState.PLANNED); } @@ -100,12 +104,12 @@ public class QueryStateMachine { queryState.set(QueryState.DISPATCHING); } - public void transitionToRetrying(TSStatus failureStatus) { + public void transitionToPendingRetry(TSStatus failureStatus) { if (queryState.get().isDone()) { return; } this.failureStatus = failureStatus; - queryState.set(QueryState.RETRYING); + queryState.set(QueryState.PENDING_RETRY); } public void transitionToRunning() { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java index 842352a7b6..4a6ac5fdc0 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java @@ -144,10 +144,6 @@ public class QueryExecution implements IQueryExecution { stateMachine.addStateChangeListener( state -> { try (SetThreadName queryName = new SetThreadName(context.getQueryId().getId())) { - if (state == QueryState.RETRYING) { - retry(); - return; - } if (!state.isDone()) { return; } @@ -179,17 +175,18 @@ public class QueryExecution implements IQueryExecution { doLogicalPlan(); doDistributedPlan(); + stateMachine.transitionToPlanned(); if (context.getQueryType() == QueryType.READ) { initResultHandle(); } schedule(); } - private void retry() { + private ExecutionResult retry() { if (retryCount >= MAX_RETRY_COUNT) { logger.error("reach max retry count. transit query to failed"); stateMachine.transitionToFailed(); - return; + return getStatus(); } logger.warn("error when executing query. {}", stateMachine.getFailureMessage()); // stop and clean up resources the QueryExecution used @@ -203,13 +200,14 @@ public class QueryExecution implements IQueryExecution { } retryCount++; logger.info("start to retry. Retry count is: {}", retryCount); - + stateMachine.transitionToQueued(); // force invalid PartitionCache partitionFetcher.invalidAllCache(); // re-analyze the query this.analysis = analyze(rawStatement, context, partitionFetcher, schemaFetcher); // re-start the QueryExecution this.start(); + return getStatus(); } private boolean skipExecute() { @@ -407,11 +405,20 @@ public class QueryExecution implements IQueryExecution { SettableFuture<QueryState> future = SettableFuture.create(); stateMachine.addStateChangeListener( state -> { - if (state == QueryState.RUNNING || state.isDone()) { + if (state == QueryState.RUNNING + || state.isDone() + || state == QueryState.PENDING_RETRY) { future.set(state); } }); QueryState state = future.get(); + if (state == QueryState.PENDING_RETRY) { + // That we put retry() here is aimed to leverage the ClientRPC thread rather than + // create another new thread to do the retry() logic. + // This way will lead to recursive call because retry() calls getStatus() inside. + // The max depths of recursive call is equal to the max retry count. + return retry(); + } // TODO: (xingtanzjr) use more TSStatusCode if the QueryState isn't FINISHED return getExecutionResult(state); } catch (InterruptedException | ExecutionException e) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java index e9829cc637..417b6b5e18 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java @@ -106,7 +106,7 @@ public class ClusterScheduler implements IScheduler { FragInstanceDispatchResult result = dispatchResultFuture.get(); if (!result.isSuccessful()) { if (needRetry(result.getFailureStatus())) { - stateMachine.transitionToRetrying(result.getFailureStatus()); + stateMachine.transitionToPendingRetry(result.getFailureStatus()); } else { stateMachine.transitionToFailed(result.getFailureStatus()); }
