This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch timeoutQueue in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8f11ad53645a7984006a7c80224b6899c7122f70 Author: Alima777 <[email protected]> AuthorDate: Wed Feb 1 20:26:59 2023 +0800 add timeout queue --- .../db/mpp/execution/schedule/DriverScheduler.java | 29 ++++++---------------- 1 file changed, 8 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java index 4c0aeca51e..9c77ff7004 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java @@ -184,6 +184,13 @@ public class DriverScheduler implements IDriverScheduler, IService { DriverTaskStatus.READY, driverTaskHandle)) .collect(Collectors.toList()); + // If query has not been registered by other fragment instances, + // add the first task as timeout checking task to timeoutQueue. + // Concurrency problem may exist, but it's ok even if we add two or more tasks. + if (!queryMap.containsKey(queryId)) { + timeoutQueue.push(tasks.get(0)); + } + for (DriverTask driverTask : tasks) { queryMap .computeIfAbsent(queryId, v -> new ConcurrentHashMap<>()) @@ -208,7 +215,6 @@ public class DriverScheduler implements IDriverScheduler, IService { private void addTaskToReadyQueue(DriverTask task) { SettableFuture<?> isBlocked = readyQueue.push(task); if (isBlocked.isDone()) { - timeoutQueue.push(task); task.setLastEnterReadyQueueTime(System.nanoTime()); } else { isBlocked.addListener( @@ -426,31 +432,12 @@ public class DriverScheduler implements IDriverScheduler, IService { } task.updateSchedulePriority(context); task.setStatus(DriverTaskStatus.READY); - convertRunningTaskToReadyQueue(task); + addTaskToReadyQueue(task); } finally { task.unlock(); } } - private void convertRunningTaskToReadyQueue(DriverTask task) { - SettableFuture<?> isBlocked = readyQueue.push(task); - if (isBlocked.isDone()) { - task.setLastEnterReadyQueueTime(System.nanoTime()); - } else - isBlocked.addListener( - () -> { - task.lock(); - try { - if (task.getStatus() == DriverTaskStatus.READY) { - convertRunningTaskToReadyQueue(task); - } - } finally { - task.unlock(); - } - }, - MoreExecutors.directExecutor()); - } - @Override public void runningToBlocked(DriverTask task, ExecutionContext context) { task.lock();
