This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 19c20247f466eecb12a18a70040e3653d6326953
Author: Calvin Kirs <[email protected]>
AuthorDate: Wed May 8 18:41:58 2024 +0800

    [Fix](job)Reclaim resources held by finished tasks (#34506)
    
    closeOrReleaseResources(), which is responsible for safely closing or 
releasing the stmtExecutor, command, and ctx objects. The method follows a null 
check pattern to avoid NullPointerExceptions and ensures that resources are 
properly cleaned up when they are no longer needed. This improves code 
readability and maintains a consistent approach to resource management.
---
 .../doris/job/extensions/insert/InsertTask.java    | 15 ++++++--
 .../apache/doris/job/extensions/mtmv/MTMVTask.java | 26 ++++++++++----
 .../org/apache/doris/job/task/AbstractTask.java    | 40 +++++++++++++++++++++-
 3 files changed, 71 insertions(+), 10 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
index 6e6f59758b4..0fe2a8364aa 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
@@ -166,7 +166,19 @@ public class InsertTask extends AbstractTask {
         }
 
         super.before();
+    }
 
+    @Override
+    protected void closeOrReleaseResources() {
+        if (null != stmtExecutor) {
+            stmtExecutor = null;
+        }
+        if (null != command) {
+            command = null;
+        }
+        if (null != ctx) {
+            ctx = null;
+        }
     }
 
     protected TUniqueId generateQueryId(String taskIdString) {
@@ -202,7 +214,7 @@ public class InsertTask extends AbstractTask {
     }
 
     @Override
-    public void cancel() throws JobException {
+    protected void executeCancelLogic() {
         if (isFinished.get() || isCanceled.get()) {
             return;
         }
@@ -210,7 +222,6 @@ public class InsertTask extends AbstractTask {
         if (null != stmtExecutor) {
             stmtExecutor.cancel();
         }
-        super.cancel();
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
index 8924b229b21..efcdc4f564e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
@@ -219,7 +219,7 @@ public class MTMVTask extends AbstractTask {
     }
 
     private void exec(ConnectContext ctx, Set<Long> refreshPartitionIds,
-            Map<TableIf, String> tableWithPartKey)
+                      Map<TableIf, String> tableWithPartKey)
             throws Exception {
         TUniqueId queryId = generateQueryId();
         lastQueryId = DebugUtil.printId(queryId);
@@ -252,9 +252,8 @@ public class MTMVTask extends AbstractTask {
     }
 
     @Override
-    public synchronized void cancel() throws JobException {
+    protected synchronized void  executeCancelLogic() {
         LOG.info("mtmv task cancel, taskId: {}", super.getTaskId());
-        super.cancel();
         if (executor != null) {
             executor.cancel();
         }
@@ -380,10 +379,23 @@ public class MTMVTask extends AbstractTask {
                     .addMTMVTaskResult(new 
TableNameInfo(mtmv.getQualifiedDbName(), mtmv.getName()), this, relation,
                             partitionSnapshots);
         }
-        mtmv = null;
-        relation = null;
-        executor = null;
-        partitionSnapshots = null;
+
+    }
+
+    @Override
+    protected void closeOrReleaseResources() {
+        if (null != mtmv) {
+            mtmv = null;
+        }
+        if (null != executor) {
+            executor = null;
+        }
+        if (null != relation) {
+            relation = null;
+        }
+        if (null != partitionSnapshots) {
+            partitionSnapshots = null;
+        }
     }
 
     private Map<TableIf, String> getIncrementalTableMap() throws 
AnalysisException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
index 7bd2e58f87e..25803085e95 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
@@ -96,6 +96,18 @@ public abstract class AbstractTask implements Task {
         return false;
     }
 
+    /**
+     * Closes or releases all allocated resources such as database 
connections, file streams, or any other
+     * external system handles that were utilized during the task execution. 
This method is invoked
+     * unconditionally, ensuring that resources are properly managed whether 
the task completes
+     * successfully, fails, or is canceled. It is crucial for preventing 
resource leaks and maintaining
+     * the overall health and efficiency of the application.
+     * <p>
+     * Note: Implementations of this method should handle potential exceptions 
internally and log them
+     * appropriately to avoid interrupting the normal flow of cleanup 
operations.
+     */
+    protected abstract void closeOrReleaseResources();
+
     @Override
     public void onSuccess() throws JobException {
         if (TaskStatus.CANCELED.equals(status)) {
@@ -114,11 +126,35 @@ public abstract class AbstractTask implements Task {
         job.onTaskSuccess(this);
     }
 
+    /**
+     * Cancels the ongoing task, updating its status to {@link 
TaskStatus#CANCELED} and releasing associated resources.
+     * This method encapsulates the core cancellation logic, calling the 
abstract method
+     * {@link #executeCancelLogic()} for task-specific actions.
+     *
+     * @throws JobException If an error occurs during the cancellation 
process, a new JobException is thrown wrapping
+     *                      the original exception.
+     */
     @Override
     public void cancel() throws JobException {
-        status = TaskStatus.CANCELED;
+        try {
+            executeCancelLogic();
+            status = TaskStatus.CANCELED;
+        } catch (Exception e) {
+            log.warn("cancel task failed, job id is {}, task id is {}", jobId, 
taskId, e);
+            throw new JobException(e);
+        } finally {
+            closeOrReleaseResources();
+        }
     }
 
+    /**
+     * Abstract method for implementing the task-specific cancellation logic.
+     * Subclasses must override this method to provide their own 
implementation of how a task should be canceled.
+     *
+     * @throws Exception Any exception that might occur during the 
cancellation process in the subclass.
+     */
+    protected abstract void executeCancelLogic() throws Exception;
+
     @Override
     public void before() throws JobException {
         status = TaskStatus.RUNNING;
@@ -134,6 +170,8 @@ public abstract class AbstractTask implements Task {
             this.errMsg = e.getMessage();
             onFail();
             log.warn("execute task error, job id is {}, task id is {}", jobId, 
taskId, e);
+        } finally {
+            closeOrReleaseResources();
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to