This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/query_retry in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit be53e3a8a92e5adecb37497a7b1605ec0dab6c53 Author: Jinrui.Zhang <[email protected]> AuthorDate: Wed Jul 13 17:21:23 2022 +0800 add retry logic to QueryExecution --- .../apache/iotdb/db/mpp/execution/QueryState.java | 1 + .../iotdb/db/mpp/execution/QueryStateMachine.java | 15 ++++++++++++ .../db/mpp/plan/execution/QueryExecution.java | 28 ++++++++++++++++++++-- .../db/mpp/plan/scheduler/ClusterScheduler.java | 4 ++-- 4 files changed, 44 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java index 383875a022..d4ed15d9ea 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java @@ -28,6 +28,7 @@ public enum QueryState { QUEUED(false), PLANNED(false), DISPATCHING(false), + RETRYING(false), RUNNING(false), FINISHED(true), CANCELED(true), diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java index 801a1d3df2..42e793b41f 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java @@ -100,6 +100,14 @@ public class QueryStateMachine { queryState.set(QueryState.DISPATCHING); } + public void transitionToRetrying(Throwable throwable) { + if (queryState.get().isDone()) { + return; + } + queryState.set(QueryState.RETRYING); + this.failureException = throwable; + } + public void transitionToRunning() { queryState.set(QueryState.RUNNING); } @@ -125,6 +133,13 @@ public class QueryStateMachine { queryState.set(QueryState.ABORTED); } + public void transitionToFailed() { + if (queryState.get().isDone()) { + return; + } + queryState.set(QueryState.FAILED); + } + public void transitionToFailed(Throwable throwable) { if (queryState.get().isDone()) { return; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java index 4bae3eeafc..75ea799ac5 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java @@ -87,14 +87,15 @@ public class QueryExecution implements IQueryExecution { private static final Logger logger = LoggerFactory.getLogger(QueryExecution.class); private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); - + private static final int MAX_RETRY_COUNT = 3; + private int retryCount = 0; private final MPPQueryContext context; private IScheduler scheduler; private final QueryStateMachine stateMachine; private final List<PlanOptimizer> planOptimizers; - private final Analysis analysis; + private Analysis analysis; private LogicalQueryPlan logicalPlan; private DistributedQueryPlan distributedPlan; @@ -138,6 +139,10 @@ public class QueryExecution implements IQueryExecution { stateMachine.addStateChangeListener( state -> { try (SetThreadName queryName = new SetThreadName(context.getQueryId().getId())) { + if (state == QueryState.RETRYING) { + retry(); + return; + } if (!state.isDone()) { return; } @@ -169,6 +174,25 @@ public class QueryExecution implements IQueryExecution { schedule(); } + private void retry() { + if (retryCount >= MAX_RETRY_COUNT) { + logger.error("reach max retry count. transit query to failed"); + stateMachine.transitionToFailed(); + return; + } + retryCount++; + logger.error("error when executing query. {}", stateMachine.getFailureMessage()); + logger.warn("start to retry. Retry count is: {}", retryCount); + // stop and clean up resources the QueryExecution used + this.stopAndCleanup(); + // force invalid PartitionCache + partitionFetcher.invalidAllCache(); + // re-analyze the query + this.analysis = analyze(this.analysis.getStatement(), context, partitionFetcher, schemaFetcher); + // re-start the QueryExecution + this.start(); + } + private boolean skipExecute() { return analysis.isFinishQueryAfterAnalyze() || (context.getQueryType() == QueryType.READ && !analysis.hasDataSource()); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java index 0a6d51eb4a..bca37f6816 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java @@ -109,11 +109,11 @@ public class ClusterScheduler implements IScheduler { return; } } catch (InterruptedException | ExecutionException e) { - // If the dispatch failed, we make the QueryState as failed, and return. + // If the dispatch request cannot be sent or TException is caught, we will retry this query. if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); } - stateMachine.transitionToFailed(e); + stateMachine.transitionToRetrying(e); return; }
