This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7759b8747a7 Add SLEEP_BOUND for the DriverTaskTimeoutSentinelThread
7759b8747a7 is described below

commit 7759b8747a736756b1df4eb2f25fd565bffde6d2
Author: Liao Lanyu <[email protected]>
AuthorDate: Sat Oct 21 16:39:00 2023 +0800

    Add SLEEP_BOUND for the DriverTaskTimeoutSentinelThread
---
 .../execution/schedule/DriverScheduler.java            |  7 ++++++-
 .../schedule/DriverTaskTimeoutSentinelThread.java      | 18 ++++++++++++++----
 .../queryengine/execution/schedule/ITaskScheduler.java |  8 ++++++++
 3 files changed, 28 insertions(+), 5 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
index 9228adbd77f..1ad010eead7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
@@ -263,7 +263,7 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
     for (DriverTask task : submittedTasks) {
       registerTaskToQueryMap(queryId, task);
     }
-    timeoutQueue.push(submittedTasks.get(submittedTasks.size() - 1));
+    scheduler.enforceTimeLimit(submittedTasks.get(submittedTasks.size() - 1));
     for (DriverTask task : submittedTasks) {
       submitTaskToReadyQueue(task);
     }
@@ -553,6 +553,11 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
       clearDriverTask(task);
     }
 
+    @Override
+    public void enforceTimeLimit(DriverTask task) {
+      timeoutQueue.push(task);
+    }
+
     @Override
     public void toAborted(DriverTask task) {
       try (SetThreadName driverTaskName =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThread.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThread.java
index 58315930f17..f7bbfab725d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThread.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThread.java
@@ -31,6 +31,8 @@ public class DriverTaskTimeoutSentinelThread extends 
AbstractDriverThread {
   private static final Logger LOGGER =
       LoggerFactory.getLogger(DriverTaskTimeoutSentinelThread.class);
 
+  private final long SLEEP_BOUND = 5 * 1000L;
+
   public DriverTaskTimeoutSentinelThread(
       String workerId,
       ThreadGroup tg,
@@ -52,12 +54,15 @@ public class DriverTaskTimeoutSentinelThread extends 
AbstractDriverThread {
     } finally {
       task.unlock();
     }
-    // If this task is not timeout, we can wait it to timeout.
-    long waitTime = task.getDDL() - System.currentTimeMillis();
+
+    // If this task has not reached the time limit, we can wait for some time.
+    // SlEEP_BOUND ensures that DriverTaskTimeoutSentinelThread won't sleep 
for too long when the
+    // waitTime of the task is long.
+    long waitTime = Math.min(task.getDDL() - System.currentTimeMillis(), 
SLEEP_BOUND);
     while (waitTime > 0L) {
-      // After this time, the task must be timeout.
+      long startSleep = System.currentTimeMillis();
       Thread.sleep(waitTime);
-      waitTime = task.getDDL() - System.currentTimeMillis();
+      waitTime -= (System.currentTimeMillis() - startSleep);
     }
 
     task.lock();
@@ -67,6 +72,11 @@ public class DriverTaskTimeoutSentinelThread extends 
AbstractDriverThread {
       if (task.isEndState()) {
         return;
       }
+      // if the Task still has not reached the time limit, re-push the task in 
the TimeoutQueue.
+      if (task.getDDL() - System.currentTimeMillis() > 0L) {
+        scheduler.enforceTimeLimit(task);
+        return;
+      }
     } finally {
       task.unlock();
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/ITaskScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/ITaskScheduler.java
index 4214b1455ea..0240f646a54 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/ITaskScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/ITaskScheduler.java
@@ -70,4 +70,12 @@ public interface ITaskScheduler {
    * @param task the task to be switched.
    */
   void toAborted(DriverTask task);
+
+  /**
+   * Add a time watch for the task. The task will be aborted if it can not be 
finished in the given
+   * amount of time.
+   *
+   * @param task the target task.
+   */
+  void enforceTimeLimit(DriverTask task);
 }

Reply via email to