This is an automated email from the ASF dual-hosted git repository.
nathanma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 1c5024c76 [AMORO-2623] Avoid deadlock among TaskRuntime objects (#2790)
1c5024c76 is described below
commit 1c5024c7679f488c7eabec0637c16c2685962d9c
Author: majin1102 <[email protected]>
AuthorDate: Thu May 9 10:53:08 2024 +0800
[AMORO-2623] Avoid deadlock among TaskRuntime objects (#2790)
* fix AMORO-2623
* spotless apply
---------
Co-authored-by: Xavier Bai <[email protected]>
---
.../org/apache/amoro/server/optimizing/OptimizingQueue.java | 13 +++++++++----
.../org/apache/amoro/server/optimizing/TaskRuntime.java | 3 +++
2 files changed, 12 insertions(+), 4 deletions(-)
diff --git
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
index bbb23fe3a..069e3b3ab 100644
---
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
+++
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
@@ -196,7 +196,7 @@ public class OptimizingQueue extends PersistentBase {
}
private TaskRuntime fetchTask() {
- return Optional.ofNullable(retryTaskQueue.poll()).orElseGet(() ->
fetchScheduledTask());
+ return
Optional.ofNullable(retryTaskQueue.poll()).orElseGet(this::fetchScheduledTask);
}
private TaskRuntime fetchScheduledTask() {
@@ -424,7 +424,7 @@ public class OptimizingQueue extends PersistentBase {
} finally {
lock.unlock();
}
- cancelTasks();
+ releaseResourcesIfNecessary();
}
@Override
@@ -471,8 +471,13 @@ public class OptimizingQueue extends PersistentBase {
} finally {
lock.unlock();
}
- // the cleanup of task should be done after unlock to avoid deadlock
- if (this.status == OptimizingProcess.Status.FAILED) {
+ }
+
+ // the cleanup of task should be done after unlock to avoid deadlock
+ @Override
+ public void releaseResourcesIfNecessary() {
+ if (this.status == OptimizingProcess.Status.FAILED
+ || this.status == OptimizingProcess.Status.CLOSED) {
cancelTasks();
}
}
diff --git
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java
index 114d5d87e..5ebd5d9a5 100644
---
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java
+++
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java
@@ -105,6 +105,7 @@ public class TaskRuntime extends StatedPersistentBase {
token = null;
threadId = -1;
});
+ owner.releaseResourcesIfNecessary();
}
void reset() {
@@ -432,6 +433,8 @@ public class TaskRuntime extends StatedPersistentBase {
public interface TaskOwner {
void acceptResult(TaskRuntime taskRuntime);
+ void releaseResourcesIfNecessary();
+
boolean isClosed();
}
}