This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 470d992b48 [IOTDB-4178] Stop StandaloneScheduler keep running while
dispatching failed (#7159)
470d992b48 is described below
commit 470d992b489a0183098077061bcc94f71d14dba3
Author: Jackie Tien <[email protected]>
AuthorDate: Mon Aug 29 19:20:16 2022 +0800
[IOTDB-4178] Stop StandaloneScheduler keep running while dispatching failed
(#7159)
---
.../mpp/FragmentInstanceDispatchException.java | 1 +
.../fragment/FragmentInstanceContext.java | 12 ++++++
.../fragment/FragmentInstanceExecution.java | 8 ----
.../fragment/FragmentInstanceManager.java | 43 +++++++++-------------
.../db/mpp/plan/scheduler/StandaloneScheduler.java | 19 ++++++++--
5 files changed, 45 insertions(+), 38 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/exception/mpp/FragmentInstanceDispatchException.java
b/server/src/main/java/org/apache/iotdb/db/exception/mpp/FragmentInstanceDispatchException.java
index d9e760e728..d9b9045729 100644
---
a/server/src/main/java/org/apache/iotdb/db/exception/mpp/FragmentInstanceDispatchException.java
+++
b/server/src/main/java/org/apache/iotdb/db/exception/mpp/FragmentInstanceDispatchException.java
@@ -30,6 +30,7 @@ public class FragmentInstanceDispatchException extends
Exception {
}
public FragmentInstanceDispatchException(TSStatus failureStatus) {
+ super(failureStatus.getMessage());
this.failureStatus = failureStatus;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index 4d0f9d9ffd..593947d080 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -176,10 +176,22 @@ public class FragmentInstanceContext extends QueryContext
{
stateMachine.transitionToFlushing();
}
+ public void cancel() {
+ stateMachine.cancel();
+ }
+
+ public void abort() {
+ stateMachine.abort();
+ }
+
public long getEndTime() {
return executionEndTime.get();
}
+ public FragmentInstanceInfo getInstanceInfo() {
+ return new FragmentInstanceInfo(stateMachine.getState(), getEndTime(),
getFailedCause());
+ }
+
public FragmentInstanceStateMachine getStateMachine() {
return stateMachine;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index 767487fc67..df99f50d58 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -89,14 +89,6 @@ public class FragmentInstanceExecution {
stateMachine.getState(), context.getEndTime(),
context.getFailedCause());
}
- public void cancel() {
- stateMachine.cancel();
- }
-
- public void abort() {
- stateMachine.abort();
- }
-
// this is a separate method to ensure that the `this` reference is not
leaked during construction
private void initialize(CounterStat failedInstances, IDriverScheduler
scheduler) {
requireNonNull(failedInstances, "failedInstances is null");
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 6506e7f619..f6ebb1b42e 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -162,26 +162,27 @@ public class FragmentInstanceManager {
return execution != null ? execution.getInstanceInfo() :
createFailedInstanceInfo(instanceId);
}
- /** Aborts a FragmentInstance. */
+ /** Aborts a FragmentInstance. keep FragmentInstanceContext for later state
tracking */
public FragmentInstanceInfo abortFragmentInstance(FragmentInstanceId
fragmentInstanceId) {
- FragmentInstanceExecution execution =
instanceExecution.remove(fragmentInstanceId);
- if (execution != null) {
- instanceContext.remove(fragmentInstanceId);
- execution.abort();
- return execution.getInstanceInfo();
+ instanceExecution.remove(fragmentInstanceId);
+ FragmentInstanceContext context = instanceContext.get(fragmentInstanceId);
+ if (context != null) {
+ context.abort();
+ return context.getInstanceInfo();
}
return null;
}
/** Cancels a FragmentInstance. */
public FragmentInstanceInfo cancelTask(FragmentInstanceId instanceId) {
+ logger.error("cancelTask");
requireNonNull(instanceId, "taskId is null");
- FragmentInstanceExecution execution = instanceExecution.remove(instanceId);
- if (execution != null) {
- instanceContext.remove(instanceId);
- execution.cancel();
- return execution.getInstanceInfo();
+ FragmentInstanceContext context = instanceContext.remove(instanceId);
+ if (context != null) {
+ instanceExecution.remove(instanceId);
+ context.cancel();
+ return context.getInstanceInfo();
}
return null;
}
@@ -194,11 +195,11 @@ public class FragmentInstanceManager {
*/
public FragmentInstanceInfo getInstanceInfo(FragmentInstanceId instanceId) {
requireNonNull(instanceId, "instanceId is null");
- FragmentInstanceExecution execution = instanceExecution.get(instanceId);
- if (execution == null) {
+ FragmentInstanceContext context = instanceContext.get(instanceId);
+ if (context == null) {
return null;
}
- return execution.getInstanceInfo();
+ return context.getInstanceInfo();
}
public CounterStat getFailedInstances() {
@@ -217,18 +218,8 @@ public class FragmentInstanceManager {
.entrySet()
.removeIf(
entry -> {
- FragmentInstanceId instanceId = entry.getKey();
- FragmentInstanceExecution execution =
instanceExecution.get(instanceId);
- if (execution == null) {
- return true;
- }
- long endTime = execution.getInstanceInfo().getEndTime();
- if (endTime != -1 && endTime <= oldestAllowedInstance) {
- instanceContext.remove(instanceId);
- return true;
- } else {
- return false;
- }
+ long endTime = entry.getValue().getEndTime();
+ return endTime != -1 && endTime <= oldestAllowedInstance;
});
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
index cf2ad5ca47..fb48c43e04 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
@@ -36,12 +36,14 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInfo;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import io.airlift.units.Duration;
@@ -100,8 +102,15 @@ public class StandaloneScheduler implements IScheduler {
if (groupId instanceof DataRegionId) {
DataRegion region =
StorageEngineV2.getInstance().getDataRegion((DataRegionId)
groupId);
- FragmentInstanceManager.getInstance()
- .execDataQueryFragmentInstance(fragmentInstance, region);
+ FragmentInstanceInfo info =
+ FragmentInstanceManager.getInstance()
+ .execDataQueryFragmentInstance(fragmentInstance, region);
+ // query dispatch failed
+ if (info.getState().isFailed()) {
+ stateMachine.transitionToFailed(
+ RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
info.getMessage()));
+ return;
+ }
} else {
ISchemaRegion region =
SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId)
groupId);
@@ -111,12 +120,14 @@ public class StandaloneScheduler implements IScheduler {
}
} catch (Exception e) {
stateMachine.transitionToFailed(e);
+ LOGGER.info("transit to FAILED");
+ return;
}
// The FragmentInstances has been dispatched successfully to
corresponding host, we mark the
stateMachine.transitionToRunning();
- LOGGER.info("{} transit to RUNNING", getLogHeader());
+ LOGGER.info("transit to RUNNING");
this.stateTracker.start();
- LOGGER.info("{} state tracker starts", getLogHeader());
+ LOGGER.info("state tracker starts");
break;
case WRITE:
// reject non-query operations when system is read-only