This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch timeoutQueue
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8f11ad53645a7984006a7c80224b6899c7122f70
Author: Alima777 <[email protected]>
AuthorDate: Wed Feb 1 20:26:59 2023 +0800

    add timeout queue
---
 .../db/mpp/execution/schedule/DriverScheduler.java | 29 ++++++----------------
 1 file changed, 8 insertions(+), 21 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
index 4c0aeca51e..9c77ff7004 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
@@ -184,6 +184,13 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
                         DriverTaskStatus.READY,
                         driverTaskHandle))
             .collect(Collectors.toList());
+    // If query has not been registered by other fragment instances,
+    // add the first task as timeout checking task to timeoutQueue.
+    // Concurrency problem may exist, but it's ok even if we add two or more 
tasks.
+    if (!queryMap.containsKey(queryId)) {
+      timeoutQueue.push(tasks.get(0));
+    }
+
     for (DriverTask driverTask : tasks) {
       queryMap
           .computeIfAbsent(queryId, v -> new ConcurrentHashMap<>())
@@ -208,7 +215,6 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
   private void addTaskToReadyQueue(DriverTask task) {
     SettableFuture<?> isBlocked = readyQueue.push(task);
     if (isBlocked.isDone()) {
-      timeoutQueue.push(task);
       task.setLastEnterReadyQueueTime(System.nanoTime());
     } else {
       isBlocked.addListener(
@@ -426,31 +432,12 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
         }
         task.updateSchedulePriority(context);
         task.setStatus(DriverTaskStatus.READY);
-        convertRunningTaskToReadyQueue(task);
+        addTaskToReadyQueue(task);
       } finally {
         task.unlock();
       }
     }
 
-    private void convertRunningTaskToReadyQueue(DriverTask task) {
-      SettableFuture<?> isBlocked = readyQueue.push(task);
-      if (isBlocked.isDone()) {
-        task.setLastEnterReadyQueueTime(System.nanoTime());
-      } else
-        isBlocked.addListener(
-            () -> {
-              task.lock();
-              try {
-                if (task.getStatus() == DriverTaskStatus.READY) {
-                  convertRunningTaskToReadyQueue(task);
-                }
-              } finally {
-                task.unlock();
-              }
-            },
-            MoreExecutors.directExecutor());
-    }
-
     @Override
     public void runningToBlocked(DriverTask task, ExecutionContext context) {
       task.lock();

Reply via email to