This is an automated email from the ASF dual-hosted git repository.

lancelly pushed a commit to branch timeout
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 210a8865997166fafc7244423567a679ae9d9c37
Author: lancelly <[email protected]>
AuthorDate: Fri Oct 20 16:11:39 2023 +0800

    fix
---
 .../db/queryengine/execution/schedule/DriverScheduler.java    |  7 ++++++-
 .../execution/schedule/DriverTaskTimeoutSentinelThread.java   | 11 +++++++++++
 .../db/queryengine/execution/schedule/ITaskScheduler.java     |  8 ++++++++
 3 files changed, 25 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
index 9228adbd77f..1ad010eead7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
@@ -263,7 +263,7 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
     for (DriverTask task : submittedTasks) {
       registerTaskToQueryMap(queryId, task);
     }
-    timeoutQueue.push(submittedTasks.get(submittedTasks.size() - 1));
+    scheduler.enforceTimeLimit(submittedTasks.get(submittedTasks.size() - 1));
     for (DriverTask task : submittedTasks) {
       submitTaskToReadyQueue(task);
     }
@@ -553,6 +553,11 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
       clearDriverTask(task);
     }
 
+    @Override
+    public void enforceTimeLimit(DriverTask task) {
+      timeoutQueue.push(task);
+    }
+
     @Override
     public void toAborted(DriverTask task) {
       try (SetThreadName driverTaskName =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThread.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThread.java
index 58315930f17..424cd77f65f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThread.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThread.java
@@ -31,6 +31,8 @@ public class DriverTaskTimeoutSentinelThread extends 
AbstractDriverThread {
   private static final Logger LOGGER =
       LoggerFactory.getLogger(DriverTaskTimeoutSentinelThread.class);
 
+  private final long SLEEP_BOUND = 5 * 1000L;
+
   public DriverTaskTimeoutSentinelThread(
       String workerId,
       ThreadGroup tg,
@@ -49,9 +51,18 @@ public class DriverTaskTimeoutSentinelThread extends 
AbstractDriverThread {
       if (task.isEndState()) {
         return;
       }
+      long waitTime = task.getDDL() - System.currentTimeMillis();
+      // if the waitTime is more than SLEEP_BOUND, re-push the task in the 
TimeoutQueue. SlEEP_BOUND
+      // ensures that DriverTaskTimeoutSentinelThread won't sleep for too long 
when the waitTime
+      // of the current task is long.
+      if (waitTime > SLEEP_BOUND) {
+        scheduler.enforceTimeLimit(task);
+        return;
+      }
     } finally {
       task.unlock();
     }
+
     // If this task is not timeout, we can wait it to timeout.
     long waitTime = task.getDDL() - System.currentTimeMillis();
     while (waitTime > 0L) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/ITaskScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/ITaskScheduler.java
index 4214b1455ea..0240f646a54 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/ITaskScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/ITaskScheduler.java
@@ -70,4 +70,12 @@ public interface ITaskScheduler {
    * @param task the task to be switched.
    */
   void toAborted(DriverTask task);
+
+  /**
+   * Add a time watch for the task. The task will be aborted if it can not be 
finished in the given
+   * amount of time.
+   *
+   * @param task the target task.
+   */
+  void enforceTimeLimit(DriverTask task);
 }

Reply via email to