This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/query_retry_condition in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8069b4a54c7bcf60a4eafb1dbaca03f6182abc2a Author: Jinrui.Zhang <[email protected]> AuthorDate: Wed Jul 20 18:47:47 2022 +0800 change the condition to triger retry in QueryExecution --- .../iotdb/db/mpp/execution/QueryStateMachine.java | 4 ++-- .../iotdb/db/mpp/plan/scheduler/ClusterScheduler.java | 17 +++++++++++------ 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java index 42e793b41f..44f16e2ad9 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java @@ -100,12 +100,12 @@ public class QueryStateMachine { queryState.set(QueryState.DISPATCHING); } - public void transitionToRetrying(Throwable throwable) { + public void transitionToRetrying(TSStatus failureStatus) { if (queryState.get().isDone()) { return; } + this.failureStatus = failureStatus; queryState.set(QueryState.RETRYING); - this.failureException = throwable; } public void transitionToRunning() { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java index bd0e166c36..f2d36f093d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.mpp.plan.scheduler; import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; import org.apache.iotdb.db.mpp.common.FragmentInstanceId; @@ -29,6 +30,7 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInfo; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState; import org.apache.iotdb.db.mpp.plan.analyze.QueryType; import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance; +import org.apache.iotdb.rpc.TSStatusCode; import io.airlift.units.Duration; import org.slf4j.Logger; @@ -89,6 +91,11 @@ public class ClusterScheduler implements IScheduler { } } + private boolean needRetry(TSStatus failureStatus) { + return failureStatus != null + && failureStatus.getCode() == TSStatusCode.SYNC_CONNECTION_EXCEPTION.getStatusCode(); + } + @Override public void start() { stateMachine.transitionToDispatching(); @@ -99,12 +106,10 @@ public class ClusterScheduler implements IScheduler { try { FragInstanceDispatchResult result = dispatchResultFuture.get(); if (!result.isSuccessful()) { - if (result.getFailureStatus() != null) { - stateMachine.transitionToFailed(result.getFailureStatus()); + if (needRetry(result.getFailureStatus())) { + stateMachine.transitionToRetrying(result.getFailureStatus()); } else { - // won't get into here - stateMachine.transitionToFailed( - new IllegalStateException("Fragment cannot be dispatched")); + stateMachine.transitionToFailed(result.getFailureStatus()); } return; } @@ -113,7 +118,7 @@ public class ClusterScheduler implements IScheduler { if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); } - stateMachine.transitionToRetrying(e); + stateMachine.transitionToFailed(e); return; }
