This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.1 by this push:
new 43a50b1fd17 [To rel/1.1] [IOTDB-5939] Correct Flusing Task Timeout
Detect Thread's timeout
43a50b1fd17 is described below
commit 43a50b1fd177c2a270729af7127bbe665f505a91
Author: JackieTien97 <[email protected]>
AuthorDate: Tue May 30 16:44:14 2023 +0800
[To rel/1.1] [IOTDB-5939] Correct Flusing Task Timeout Detect Thread's
timeout
---
.../mpp/execution/fragment/FragmentInstanceExecution.java | 14 +++++++++++---
.../db/mpp/execution/fragment/FragmentInstanceManager.java | 11 ++++++-----
2 files changed, 17 insertions(+), 8 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index 765837b33e3..35a20199a27 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -47,6 +47,8 @@ public class FragmentInstanceExecution {
private final FragmentInstanceStateMachine stateMachine;
+ private final long timeoutInMs;
+
private long lastHeartbeat;
public static FragmentInstanceExecution createFragmentInstanceExecution(
@@ -59,9 +61,9 @@ public class FragmentInstanceExecution {
CounterStat failedInstances,
long timeOut) {
FragmentInstanceExecution execution =
- new FragmentInstanceExecution(instanceId, context, drivers,
sinkHandle, stateMachine);
+ new FragmentInstanceExecution(
+ instanceId, context, drivers, sinkHandle, stateMachine, timeOut);
execution.initialize(failedInstances, scheduler);
- LOGGER.debug("timeout is {}ms.", timeOut);
scheduler.submitDrivers(instanceId.getQueryId(), drivers, timeOut);
return execution;
}
@@ -71,12 +73,14 @@ public class FragmentInstanceExecution {
FragmentInstanceContext context,
List<IDriver> drivers,
ISink sink,
- FragmentInstanceStateMachine stateMachine) {
+ FragmentInstanceStateMachine stateMachine,
+ long timeoutInMs) {
this.instanceId = instanceId;
this.context = context;
this.drivers = drivers;
this.sink = sink;
this.stateMachine = stateMachine;
+ this.timeoutInMs = timeoutInMs;
}
public void recordHeartbeat() {
@@ -103,6 +107,10 @@ public class FragmentInstanceExecution {
return context.getStartTime();
}
+ public long getTimeoutInMs() {
+ return timeoutInMs;
+ }
+
public FragmentInstanceStateMachine getStateMachine() {
return stateMachine;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index d563f9cf24d..0b092ca0b8c 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -70,9 +70,6 @@ public class FragmentInstanceManager {
// record failed instances count
private final CounterStat failedInstances = new CounterStat();
- private static final long QUERY_TIMEOUT_MS =
- IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold();
-
private final ExecutorService intoOperationExecutor;
private static final QueryMetricsManager QUERY_METRICS =
QueryMetricsManager.getInstance();
@@ -304,8 +301,12 @@ public class FragmentInstanceManager {
instanceExecution.forEach(
(key, execution) -> {
if (execution.getStateMachine().getState() ==
FragmentInstanceState.FLUSHING
- && (now - execution.getStartTime()) > QUERY_TIMEOUT_MS) {
- execution.getStateMachine().failed(new TimeoutException());
+ && (now - execution.getStartTime()) >
execution.getTimeoutInMs()) {
+ execution
+ .getStateMachine()
+ .failed(
+ new TimeoutException(
+ "Query has executed more than " +
execution.getTimeoutInMs() + "ms"));
}
});
}