This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 68f93800607 [IOTDB-5939] Correct Flusing Task Timeout Detect Thread's
timeout (#9988)
68f93800607 is described below
commit 68f93800607e029067efbb1fb7111d176668791c
Author: Jackie Tien <[email protected]>
AuthorDate: Tue May 30 20:28:50 2023 +0800
[IOTDB-5939] Correct Flusing Task Timeout Detect Thread's timeout (#9988)
---
.../mpp/execution/fragment/FragmentInstanceExecution.java | 14 +++++++++++---
.../db/mpp/execution/fragment/FragmentInstanceManager.java | 11 ++++++-----
2 files changed, 17 insertions(+), 8 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index b68ec33f096..4cb85b0ae93 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -53,6 +53,8 @@ public class FragmentInstanceExecution {
private final FragmentInstanceStateMachine stateMachine;
+ private final long timeoutInMs;
+
private long lastHeartbeat;
public static FragmentInstanceExecution createFragmentInstanceExecution(
@@ -66,9 +68,9 @@ public class FragmentInstanceExecution {
long timeOut)
throws CpuNotEnoughException, MemoryNotEnoughException {
FragmentInstanceExecution execution =
- new FragmentInstanceExecution(instanceId, context, drivers,
sinkHandle, stateMachine);
+ new FragmentInstanceExecution(
+ instanceId, context, drivers, sinkHandle, stateMachine, timeOut);
execution.initialize(failedInstances, scheduler);
- LOGGER.debug("timeout is {}ms.", timeOut);
scheduler.submitDrivers(instanceId.getQueryId(), drivers, timeOut,
context.getSessionInfo());
return execution;
}
@@ -78,12 +80,14 @@ public class FragmentInstanceExecution {
FragmentInstanceContext context,
List<IDriver> drivers,
ISink sink,
- FragmentInstanceStateMachine stateMachine) {
+ FragmentInstanceStateMachine stateMachine,
+ long timeoutInMs) {
this.instanceId = instanceId;
this.context = context;
this.drivers = drivers;
this.sink = sink;
this.stateMachine = stateMachine;
+ this.timeoutInMs = timeoutInMs;
}
public void recordHeartbeat() {
@@ -110,6 +114,10 @@ public class FragmentInstanceExecution {
return context.getStartTime();
}
+ public long getTimeoutInMs() {
+ return timeoutInMs;
+ }
+
public FragmentInstanceStateMachine getStateMachine() {
return stateMachine;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 1cf2718e9fe..b0f816f21e0 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -70,9 +70,6 @@ public class FragmentInstanceManager {
// record failed instances count
private final CounterStat failedInstances = new CounterStat();
- private static final long QUERY_TIMEOUT_MS =
- IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold();
-
private final ExecutorService intoOperationExecutor;
private static final QueryExecutionMetricSet QUERY_EXECUTION_METRIC_SET =
@@ -314,8 +311,12 @@ public class FragmentInstanceManager {
instanceExecution.forEach(
(key, execution) -> {
if (execution.getStateMachine().getState() ==
FragmentInstanceState.FLUSHING
- && (now - execution.getStartTime()) > QUERY_TIMEOUT_MS) {
- execution.getStateMachine().failed(new TimeoutException());
+ && (now - execution.getStartTime()) >
execution.getTimeoutInMs()) {
+ execution
+ .getStateMachine()
+ .failed(
+ new TimeoutException(
+ "Query has executed more than " +
execution.getTimeoutInMs() + "ms"));
}
});
}