This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7759b8747a7 Add SLEEP_BOUND for the DriverTaskTimeoutSentinelThread
7759b8747a7 is described below
commit 7759b8747a736756b1df4eb2f25fd565bffde6d2
Author: Liao Lanyu <[email protected]>
AuthorDate: Sat Oct 21 16:39:00 2023 +0800
Add SLEEP_BOUND for the DriverTaskTimeoutSentinelThread
---
.../execution/schedule/DriverScheduler.java | 7 ++++++-
.../schedule/DriverTaskTimeoutSentinelThread.java | 18 ++++++++++++++----
.../queryengine/execution/schedule/ITaskScheduler.java | 8 ++++++++
3 files changed, 28 insertions(+), 5 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
index 9228adbd77f..1ad010eead7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
@@ -263,7 +263,7 @@ public class DriverScheduler implements IDriverScheduler,
IService {
for (DriverTask task : submittedTasks) {
registerTaskToQueryMap(queryId, task);
}
- timeoutQueue.push(submittedTasks.get(submittedTasks.size() - 1));
+ scheduler.enforceTimeLimit(submittedTasks.get(submittedTasks.size() - 1));
for (DriverTask task : submittedTasks) {
submitTaskToReadyQueue(task);
}
@@ -553,6 +553,11 @@ public class DriverScheduler implements IDriverScheduler,
IService {
clearDriverTask(task);
}
+ @Override
+ public void enforceTimeLimit(DriverTask task) {
+ timeoutQueue.push(task);
+ }
+
@Override
public void toAborted(DriverTask task) {
try (SetThreadName driverTaskName =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThread.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThread.java
index 58315930f17..f7bbfab725d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThread.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThread.java
@@ -31,6 +31,8 @@ public class DriverTaskTimeoutSentinelThread extends
AbstractDriverThread {
private static final Logger LOGGER =
LoggerFactory.getLogger(DriverTaskTimeoutSentinelThread.class);
+ private final long SLEEP_BOUND = 5 * 1000L;
+
public DriverTaskTimeoutSentinelThread(
String workerId,
ThreadGroup tg,
@@ -52,12 +54,15 @@ public class DriverTaskTimeoutSentinelThread extends
AbstractDriverThread {
} finally {
task.unlock();
}
- // If this task is not timeout, we can wait it to timeout.
- long waitTime = task.getDDL() - System.currentTimeMillis();
+
+ // If this task has not reached the time limit, we can wait for some time.
+ // SlEEP_BOUND ensures that DriverTaskTimeoutSentinelThread won't sleep
for too long when the
+ // waitTime of the task is long.
+ long waitTime = Math.min(task.getDDL() - System.currentTimeMillis(),
SLEEP_BOUND);
while (waitTime > 0L) {
- // After this time, the task must be timeout.
+ long startSleep = System.currentTimeMillis();
Thread.sleep(waitTime);
- waitTime = task.getDDL() - System.currentTimeMillis();
+ waitTime -= (System.currentTimeMillis() - startSleep);
}
task.lock();
@@ -67,6 +72,11 @@ public class DriverTaskTimeoutSentinelThread extends
AbstractDriverThread {
if (task.isEndState()) {
return;
}
+ // if the Task still has not reached the time limit, re-push the task in
the TimeoutQueue.
+ if (task.getDDL() - System.currentTimeMillis() > 0L) {
+ scheduler.enforceTimeLimit(task);
+ return;
+ }
} finally {
task.unlock();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/ITaskScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/ITaskScheduler.java
index 4214b1455ea..0240f646a54 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/ITaskScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/ITaskScheduler.java
@@ -70,4 +70,12 @@ public interface ITaskScheduler {
* @param task the task to be switched.
*/
void toAborted(DriverTask task);
+
+ /**
+ * Add a time watch for the task. The task will be aborted if it can not be
finished in the given
+ * amount of time.
+ *
+ * @param task the target task.
+ */
+ void enforceTimeLimit(DriverTask task);
}