This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch IOTDB-4178 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 418791a77f69779e3a1e91ed64c59e8e14a70467 Author: JackieTien97 <[email protected]> AuthorDate: Mon Aug 29 14:32:20 2022 +0800 [IOTDB-4178] Stop StandaloneScheduler keep running while dispatching failed --- .../mpp/FragmentInstanceDispatchException.java | 1 + .../fragment/FragmentInstanceContext.java | 12 ++++++ .../fragment/FragmentInstanceExecution.java | 8 ---- .../fragment/FragmentInstanceManager.java | 43 +++++++++------------- .../db/mpp/plan/scheduler/StandaloneScheduler.java | 19 ++++++++-- 5 files changed, 45 insertions(+), 38 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/exception/mpp/FragmentInstanceDispatchException.java b/server/src/main/java/org/apache/iotdb/db/exception/mpp/FragmentInstanceDispatchException.java index d9e760e728..d9b9045729 100644 --- a/server/src/main/java/org/apache/iotdb/db/exception/mpp/FragmentInstanceDispatchException.java +++ b/server/src/main/java/org/apache/iotdb/db/exception/mpp/FragmentInstanceDispatchException.java @@ -30,6 +30,7 @@ public class FragmentInstanceDispatchException extends Exception { } public FragmentInstanceDispatchException(TSStatus failureStatus) { + super(failureStatus.getMessage()); this.failureStatus = failureStatus; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java index 4d0f9d9ffd..593947d080 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java @@ -176,10 +176,22 @@ public class FragmentInstanceContext extends QueryContext { stateMachine.transitionToFlushing(); } + public void cancel() { + stateMachine.cancel(); + } + + public void abort() { + stateMachine.abort(); + } + public long getEndTime() { return executionEndTime.get(); } + public FragmentInstanceInfo getInstanceInfo() { + return new FragmentInstanceInfo(stateMachine.getState(), getEndTime(), getFailedCause()); + } + public FragmentInstanceStateMachine getStateMachine() { return stateMachine; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java index 767487fc67..df99f50d58 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java @@ -89,14 +89,6 @@ public class FragmentInstanceExecution { stateMachine.getState(), context.getEndTime(), context.getFailedCause()); } - public void cancel() { - stateMachine.cancel(); - } - - public void abort() { - stateMachine.abort(); - } - // this is a separate method to ensure that the `this` reference is not leaked during construction private void initialize(CounterStat failedInstances, IDriverScheduler scheduler) { requireNonNull(failedInstances, "failedInstances is null"); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java index 6506e7f619..f6ebb1b42e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java @@ -162,26 +162,27 @@ public class FragmentInstanceManager { return execution != null ? execution.getInstanceInfo() : createFailedInstanceInfo(instanceId); } - /** Aborts a FragmentInstance. */ + /** Aborts a FragmentInstance. keep FragmentInstanceContext for later state tracking */ public FragmentInstanceInfo abortFragmentInstance(FragmentInstanceId fragmentInstanceId) { - FragmentInstanceExecution execution = instanceExecution.remove(fragmentInstanceId); - if (execution != null) { - instanceContext.remove(fragmentInstanceId); - execution.abort(); - return execution.getInstanceInfo(); + instanceExecution.remove(fragmentInstanceId); + FragmentInstanceContext context = instanceContext.get(fragmentInstanceId); + if (context != null) { + context.abort(); + return context.getInstanceInfo(); } return null; } /** Cancels a FragmentInstance. */ public FragmentInstanceInfo cancelTask(FragmentInstanceId instanceId) { + logger.error("cancelTask"); requireNonNull(instanceId, "taskId is null"); - FragmentInstanceExecution execution = instanceExecution.remove(instanceId); - if (execution != null) { - instanceContext.remove(instanceId); - execution.cancel(); - return execution.getInstanceInfo(); + FragmentInstanceContext context = instanceContext.remove(instanceId); + if (context != null) { + instanceExecution.remove(instanceId); + context.cancel(); + return context.getInstanceInfo(); } return null; } @@ -194,11 +195,11 @@ public class FragmentInstanceManager { */ public FragmentInstanceInfo getInstanceInfo(FragmentInstanceId instanceId) { requireNonNull(instanceId, "instanceId is null"); - FragmentInstanceExecution execution = instanceExecution.get(instanceId); - if (execution == null) { + FragmentInstanceContext context = instanceContext.get(instanceId); + if (context == null) { return null; } - return execution.getInstanceInfo(); + return context.getInstanceInfo(); } public CounterStat getFailedInstances() { @@ -217,18 +218,8 @@ public class FragmentInstanceManager { .entrySet() .removeIf( entry -> { - FragmentInstanceId instanceId = entry.getKey(); - FragmentInstanceExecution execution = instanceExecution.get(instanceId); - if (execution == null) { - return true; - } - long endTime = execution.getInstanceInfo().getEndTime(); - if (endTime != -1 && endTime <= oldestAllowedInstance) { - instanceContext.remove(instanceId); - return true; - } else { - return false; - } + long endTime = entry.getValue().getEndTime(); + return endTime != -1 && endTime <= oldestAllowedInstance; }); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java index cf2ad5ca47..fb48c43e04 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java @@ -36,12 +36,14 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext; import org.apache.iotdb.db.mpp.common.PlanFragmentId; import org.apache.iotdb.db.mpp.execution.QueryStateMachine; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInfo; +import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager; import org.apache.iotdb.db.mpp.plan.analyze.QueryType; import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator; import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode; +import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import io.airlift.units.Duration; @@ -100,8 +102,15 @@ public class StandaloneScheduler implements IScheduler { if (groupId instanceof DataRegionId) { DataRegion region = StorageEngineV2.getInstance().getDataRegion((DataRegionId) groupId); - FragmentInstanceManager.getInstance() - .execDataQueryFragmentInstance(fragmentInstance, region); + FragmentInstanceInfo info = + FragmentInstanceManager.getInstance() + .execDataQueryFragmentInstance(fragmentInstance, region); + // query dispatch failed + if (info.getState().isFailed()) { + stateMachine.transitionToFailed( + RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, info.getMessage())); + return; + } } else { ISchemaRegion region = SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) groupId); @@ -111,12 +120,14 @@ public class StandaloneScheduler implements IScheduler { } } catch (Exception e) { stateMachine.transitionToFailed(e); + LOGGER.info("transit to FAILED"); + return; } // The FragmentInstances has been dispatched successfully to corresponding host, we mark the stateMachine.transitionToRunning(); - LOGGER.info("{} transit to RUNNING", getLogHeader()); + LOGGER.info("transit to RUNNING"); this.stateTracker.start(); - LOGGER.info("{} state tracker starts", getLogHeader()); + LOGGER.info("state tracker starts"); break; case WRITE: // reject non-query operations when system is read-only
