This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch advancePipeline in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit cf097a1bb62c06415316ec943fe58e711f48d7a1 Author: Alima777 <[email protected]> AuthorDate: Thu Feb 9 21:51:29 2023 +0800 Fix deadlock caused by submitting task which has dependency lazily --- .../db/mpp/execution/schedule/DriverScheduler.java | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java index f959007648..ef191ec5cb 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java @@ -195,8 +195,16 @@ public class DriverScheduler implements IDriverScheduler, IService { tasks.get(driver.getDependencyDriverIndex()).getBlockedDependencyDriver(); blockedDependencyFuture.addListener( () -> { - registerTaskToQueryMap(queryId, task); - submitTaskToReadyQueue(task); + // Only if query is alive, we can submit this task + Map<FragmentInstanceId, Set<DriverTask>> queryRelatedTasks = queryMap.get(queryId); + if (queryRelatedTasks != null) { + Set<DriverTask> instanceRelatedTasks = + queryRelatedTasks.get(task.getDriverTaskId().getFragmentInstanceId()); + if (instanceRelatedTasks != null) { + instanceRelatedTasks.add(task); + submitTaskToReadyQueue(task); + } + } }, MoreExecutors.directExecutor()); } else { @@ -473,8 +481,14 @@ public class DriverScheduler implements IDriverScheduler, IService { } task.updateSchedulePriority(context); task.setStatus(DriverTaskStatus.FINISHED); + } finally { + task.unlock(); + } + // Dependency driver must be submitted before this task is cleared + task.submitDependencyDriver(); + task.lock(); + try { clearDriverTask(task); - task.submitDependencyDriver(); } finally { task.unlock(); } @@ -499,7 +513,7 @@ public class DriverScheduler implements IDriverScheduler, IService { task.unlock(); } QueryId queryId = task.getDriverTaskId().getQueryId(); - Map<FragmentInstanceId, Set<DriverTask>> queryRelatedTasks = queryMap.get(queryId); + Map<FragmentInstanceId, Set<DriverTask>> queryRelatedTasks = queryMap.remove(queryId); if (queryRelatedTasks != null) { for (Set<DriverTask> fragmentRelatedTasks : queryRelatedTasks.values()) { if (fragmentRelatedTasks != null) {
