This is an automated email from the ASF dual-hosted git repository.

nathanma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c5024c76 [AMORO-2623] Avoid deadlock among TaskRuntime objects (#2790)
1c5024c76 is described below

commit 1c5024c7679f488c7eabec0637c16c2685962d9c
Author: majin1102 <[email protected]>
AuthorDate: Thu May 9 10:53:08 2024 +0800

    [AMORO-2623] Avoid deadlock among TaskRuntime objects (#2790)
    
    * fix AMORO-2623
    
    * spotless apply
    
    ---------
    
    Co-authored-by: Xavier Bai <[email protected]>
---
 .../org/apache/amoro/server/optimizing/OptimizingQueue.java | 13 +++++++++----
 .../org/apache/amoro/server/optimizing/TaskRuntime.java     |  3 +++
 2 files changed, 12 insertions(+), 4 deletions(-)

diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
index bbb23fe3a..069e3b3ab 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
@@ -196,7 +196,7 @@ public class OptimizingQueue extends PersistentBase {
   }
 
   private TaskRuntime fetchTask() {
-    return Optional.ofNullable(retryTaskQueue.poll()).orElseGet(() -> 
fetchScheduledTask());
+    return 
Optional.ofNullable(retryTaskQueue.poll()).orElseGet(this::fetchScheduledTask);
   }
 
   private TaskRuntime fetchScheduledTask() {
@@ -424,7 +424,7 @@ public class OptimizingQueue extends PersistentBase {
       } finally {
         lock.unlock();
       }
-      cancelTasks();
+      releaseResourcesIfNecessary();
     }
 
     @Override
@@ -471,8 +471,13 @@ public class OptimizingQueue extends PersistentBase {
       } finally {
         lock.unlock();
       }
-      // the cleanup of task should be done after unlock to avoid deadlock
-      if (this.status == OptimizingProcess.Status.FAILED) {
+    }
+
+    // the cleanup of task should be done after unlock to avoid deadlock
+    @Override
+    public void releaseResourcesIfNecessary() {
+      if (this.status == OptimizingProcess.Status.FAILED
+          || this.status == OptimizingProcess.Status.CLOSED) {
         cancelTasks();
       }
     }
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java
index 114d5d87e..5ebd5d9a5 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java
@@ -105,6 +105,7 @@ public class TaskRuntime extends StatedPersistentBase {
           token = null;
           threadId = -1;
         });
+    owner.releaseResourcesIfNecessary();
   }
 
   void reset() {
@@ -432,6 +433,8 @@ public class TaskRuntime extends StatedPersistentBase {
   public interface TaskOwner {
     void acceptResult(TaskRuntime taskRuntime);
 
+    void releaseResourcesIfNecessary();
+
     boolean isClosed();
   }
 }

Reply via email to