This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.1 by this push:
     new 037a9bdcb2 [To rel/1.1][IOTDB-5786] Fix potential deadlock in 
DriverScheduler
037a9bdcb2 is described below

commit 037a9bdcb20b88a9e5e2ff7ff9311a4471eb8795
Author: Liao Lanyu <[email protected]>
AuthorDate: Sat Apr 22 21:31:51 2023 +0800

    [To rel/1.1][IOTDB-5786] Fix potential deadlock in DriverScheduler
---
 .../db/mpp/execution/schedule/DriverScheduler.java | 116 ++++++++++-----------
 1 file changed, 56 insertions(+), 60 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
index 50862859e6..c581f67709 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
@@ -254,13 +254,8 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
       for (Set<DriverTask> fragmentRelatedTasks : queryRelatedTasks.values()) {
         if (fragmentRelatedTasks != null) {
           for (DriverTask task : fragmentRelatedTasks) {
-            task.lock();
-            try {
-              
task.setAbortCause(DriverTaskAbortedException.BY_QUERY_CASCADING_ABORTED);
-              clearDriverTask(task);
-            } finally {
-              task.unlock();
-            }
+            
task.setAbortCause(DriverTaskAbortedException.BY_QUERY_CASCADING_ABORTED);
+            clearDriverTask(task);
           }
         }
       }
@@ -279,13 +274,8 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
             if (task == null) {
               return;
             }
-            task.lock();
-            try {
-              
task.setAbortCause(DriverTaskAbortedException.BY_FRAGMENT_ABORT_CALLED);
-              clearDriverTask(task);
-            } finally {
-              task.unlock();
-            }
+            
task.setAbortCause(DriverTaskAbortedException.BY_FRAGMENT_ABORT_CALLED);
+            clearDriverTask(task);
           }
         }
       }
@@ -295,26 +285,31 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
   private void clearDriverTask(DriverTask task) {
     try (SetThreadName driverTaskName =
         new SetThreadName(task.getDriver().getDriverTaskId().getFullId())) {
-      DriverTaskStatus status = task.getStatus();
-      switch (status) {
-          // If it has been aborted, return directly
-        case ABORTED:
-          return;
-        case READY:
-          task.setStatus(DriverTaskStatus.ABORTED);
-          readyQueue.remove(task.getDriverTaskId());
-          break;
-        case BLOCKED:
-          task.setStatus(DriverTaskStatus.ABORTED);
-          blockedTasks.remove(task);
-          readyQueue.decreaseReservedSize();
-          break;
-        case RUNNING:
-          task.setStatus(DriverTaskStatus.ABORTED);
-          readyQueue.decreaseReservedSize();
-          break;
-        case FINISHED:
-          break;
+      try {
+        task.lock();
+        DriverTaskStatus status = task.getStatus();
+        switch (status) {
+            // If it has been aborted, return directly
+          case ABORTED:
+            return;
+          case READY:
+            task.setStatus(DriverTaskStatus.ABORTED);
+            readyQueue.remove(task.getDriverTaskId());
+            break;
+          case BLOCKED:
+            task.setStatus(DriverTaskStatus.ABORTED);
+            blockedTasks.remove(task);
+            readyQueue.decreaseReservedSize();
+            break;
+          case RUNNING:
+            task.setStatus(DriverTaskStatus.ABORTED);
+            readyQueue.decreaseReservedSize();
+            break;
+          case FINISHED:
+            break;
+        }
+      } finally {
+        task.unlock();
       }
 
       timeoutQueue.remove(task.getDriverTaskId());
@@ -333,26 +328,31 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
           queryMap.remove(task.getDriverTaskId().getQueryId());
         }
       }
-      if (task.getAbortCause() != null) {
-        try {
-          task.getDriver()
-              .failed(
-                  new DriverTaskAbortedException(
-                      task.getDriver().getDriverTaskId().getFullId(), 
task.getAbortCause()));
-        } catch (Exception e) {
-          logger.error("Clear DriverTask failed", e);
+      try {
+        task.lock();
+        if (task.getAbortCause() != null) {
+          try {
+            task.getDriver()
+                .failed(
+                    new DriverTaskAbortedException(
+                        task.getDriver().getDriverTaskId().getFullId(), 
task.getAbortCause()));
+          } catch (Exception e) {
+            logger.error("Clear DriverTask failed", e);
+          }
         }
-      }
-      if (task.getStatus() == DriverTaskStatus.ABORTED) {
-        try {
-          blockManager.forceDeregisterFragmentInstance(
-              new TFragmentInstanceId(
-                  task.getDriverTaskId().getQueryId().getId(),
-                  task.getDriverTaskId().getFragmentId().getId(),
-                  
task.getDriverTaskId().getFragmentInstanceId().getInstanceId()));
-        } catch (Exception e) {
-          logger.error("Clear DriverTask failed", e);
+        if (task.getStatus() == DriverTaskStatus.ABORTED) {
+          try {
+            blockManager.forceDeregisterFragmentInstance(
+                new TFragmentInstanceId(
+                    task.getDriverTaskId().getQueryId().getId(),
+                    task.getDriverTaskId().getFragmentId().getId(),
+                    
task.getDriverTaskId().getFragmentInstanceId().getInstanceId()));
+          } catch (Exception e) {
+            logger.error("Clear DriverTask failed", e);
+          }
         }
+      } finally {
+        task.unlock();
       }
     }
   }
@@ -488,6 +488,7 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
       } finally {
         task.unlock();
       }
+      // wrap this clearDriverTask to avoid that status is changed to Aborted 
during clearDriverTask
       task.lock();
       try {
         clearDriverTask(task);
@@ -510,10 +511,10 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
           logger.warn(
               "The task {} is aborted. All other tasks in the same query will 
be cancelled",
               task.getDriverTaskId());
-          clearDriverTask(task);
         } finally {
           task.unlock();
         }
+        clearDriverTask(task);
         QueryId queryId = task.getDriverTaskId().getQueryId();
         Map<FragmentInstanceId, Set<DriverTask>> queryRelatedTasks = 
queryMap.remove(queryId);
         if (queryRelatedTasks != null) {
@@ -524,13 +525,8 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
                   if (task.equals(otherTask)) {
                     continue;
                   }
-                  otherTask.lock();
-                  try {
-                    
otherTask.setAbortCause(DriverTaskAbortedException.BY_QUERY_CASCADING_ABORTED);
-                    clearDriverTask(otherTask);
-                  } finally {
-                    otherTask.unlock();
-                  }
+                  
otherTask.setAbortCause(DriverTaskAbortedException.BY_QUERY_CASCADING_ABORTED);
+                  clearDriverTask(otherTask);
                 }
               }
             }

Reply via email to