This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c31a35933b Add query retry logic to make the query won't fail when
some DataNode shutdown (#6717)
c31a35933b is described below
commit c31a35933baef77bfc751eaebc51ed94b9adba01
Author: Zhang.Jinrui <[email protected]>
AuthorDate: Wed Jul 20 10:18:33 2022 +0800
Add query retry logic to make the query won't fail when some DataNode
shutdown (#6717)
---
.../apache/iotdb/db/mpp/execution/QueryState.java | 1 +
.../iotdb/db/mpp/execution/QueryStateMachine.java | 15 ++++
.../db/mpp/plan/execution/QueryExecution.java | 79 +++++++++++++++-------
.../db/mpp/plan/scheduler/ClusterScheduler.java | 4 +-
4 files changed, 74 insertions(+), 25 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java
index 383875a022..d4ed15d9ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryState.java
@@ -28,6 +28,7 @@ public enum QueryState {
QUEUED(false),
PLANNED(false),
DISPATCHING(false),
+ RETRYING(false),
RUNNING(false),
FINISHED(true),
CANCELED(true),
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index 801a1d3df2..42e793b41f 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -100,6 +100,14 @@ public class QueryStateMachine {
queryState.set(QueryState.DISPATCHING);
}
+ public void transitionToRetrying(Throwable throwable) {
+ if (queryState.get().isDone()) {
+ return;
+ }
+ queryState.set(QueryState.RETRYING);
+ this.failureException = throwable;
+ }
+
public void transitionToRunning() {
queryState.set(QueryState.RUNNING);
}
@@ -125,6 +133,13 @@ public class QueryStateMachine {
queryState.set(QueryState.ABORTED);
}
+ public void transitionToFailed() {
+ if (queryState.get().isDone()) {
+ return;
+ }
+ queryState.set(QueryState.FAILED);
+ }
+
public void transitionToFailed(Throwable throwable) {
if (queryState.get().isDone()) {
return;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index fe6d48afa2..9c9d290052 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -88,14 +88,17 @@ public class QueryExecution implements IQueryExecution {
private static final Logger logger =
LoggerFactory.getLogger(QueryExecution.class);
private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
-
+ private static final int MAX_RETRY_COUNT = 3;
+ private static final long RETRY_INTERVAL_IN_MS = 2000;
+ private int retryCount = 0;
private final MPPQueryContext context;
private IScheduler scheduler;
private final QueryStateMachine stateMachine;
private final List<PlanOptimizer> planOptimizers;
- private final Analysis analysis;
+ private Statement rawStatement;
+ private Analysis analysis;
private LogicalQueryPlan logicalPlan;
private DistributedQueryPlan distributedPlan;
@@ -123,6 +126,7 @@ public class QueryExecution implements IQueryExecution {
IPartitionFetcher partitionFetcher,
ISchemaFetcher schemaFetcher,
IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
internalServiceClientManager) {
+ this.rawStatement = statement;
this.executor = executor;
this.writeOperationExecutor = writeOperationExecutor;
this.scheduledExecutor = scheduledExecutor;
@@ -139,6 +143,10 @@ public class QueryExecution implements IQueryExecution {
stateMachine.addStateChangeListener(
state -> {
try (SetThreadName queryName = new
SetThreadName(context.getQueryId().getId())) {
+ if (state == QueryState.RETRYING) {
+ retry();
+ return;
+ }
if (!state.isDone()) {
return;
}
@@ -170,6 +178,33 @@ public class QueryExecution implements IQueryExecution {
schedule();
}
+ private void retry() {
+ if (retryCount >= MAX_RETRY_COUNT) {
+ logger.error("reach max retry count. transit query to failed");
+ stateMachine.transitionToFailed();
+ return;
+ }
+ logger.warn("error when executing query. {}",
stateMachine.getFailureMessage());
+ // stop and clean up resources the QueryExecution used
+ this.stopAndCleanup();
+ logger.info("wait {}ms before retry...", RETRY_INTERVAL_IN_MS);
+ try {
+ Thread.sleep(RETRY_INTERVAL_IN_MS);
+ } catch (InterruptedException e) {
+ logger.error("interrupted when waiting retry");
+ Thread.currentThread().interrupt();
+ }
+ retryCount++;
+ logger.info("start to retry. Retry count is: {}", retryCount);
+
+ // force invalid PartitionCache
+ partitionFetcher.invalidAllCache();
+ // re-analyze the query
+ this.analysis = analyze(rawStatement, context, partitionFetcher,
schemaFetcher);
+ // re-start the QueryExecution
+ this.start();
+ }
+
private boolean skipExecute() {
return analysis.isFinishQueryAfterAnalyze()
|| (context.getQueryType() == QueryType.READ &&
!analysis.hasDataSource());
@@ -388,27 +423,25 @@ public class QueryExecution implements IQueryExecution {
}
private void initResultHandle() {
- if (this.resultHandle == null) {
- TEndPoint upstreamEndPoint =
context.getResultNodeContext().getUpStreamEndpoint();
-
- this.resultHandle =
- isSameNode(upstreamEndPoint)
- ? MPPDataExchangeService.getInstance()
- .getMPPDataExchangeManager()
- .createLocalSourceHandle(
-
context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
-
context.getResultNodeContext().getVirtualResultNodeId().getId(),
-
context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(),
- stateMachine::transitionToFailed)
- : MPPDataExchangeService.getInstance()
- .getMPPDataExchangeManager()
- .createSourceHandle(
-
context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
-
context.getResultNodeContext().getVirtualResultNodeId().getId(),
- upstreamEndPoint,
-
context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(),
- stateMachine::transitionToFailed);
- }
+ TEndPoint upstreamEndPoint =
context.getResultNodeContext().getUpStreamEndpoint();
+
+ this.resultHandle =
+ isSameNode(upstreamEndPoint)
+ ? MPPDataExchangeService.getInstance()
+ .getMPPDataExchangeManager()
+ .createLocalSourceHandle(
+
context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
+
context.getResultNodeContext().getVirtualResultNodeId().getId(),
+
context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(),
+ stateMachine::transitionToFailed)
+ : MPPDataExchangeService.getInstance()
+ .getMPPDataExchangeManager()
+ .createSourceHandle(
+
context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
+
context.getResultNodeContext().getVirtualResultNodeId().getId(),
+ upstreamEndPoint,
+
context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(),
+ stateMachine::transitionToFailed);
}
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index 9563357d12..bd0e166c36 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -109,11 +109,11 @@ public class ClusterScheduler implements IScheduler {
return;
}
} catch (InterruptedException | ExecutionException e) {
- // If the dispatch failed, we make the QueryState as failed, and return.
+ // If the dispatch request cannot be sent or TException is caught, we
will retry this query.
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
- stateMachine.transitionToFailed(e);
+ stateMachine.transitionToRetrying(e);
return;
}