This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new e14538fcf55 branch-3.1: [fix](mtmv)fix mtmv may have repeat tasks 
after canceled #48830 (#51972)
e14538fcf55 is described below

commit e14538fcf55e72e28ebb4ed035b49eb05ddc4177
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Jun 20 14:55:08 2025 +0800

    branch-3.1: [fix](mtmv)fix mtmv may have repeat tasks after canceled #48830 
(#51972)
    
    Cherry-picked from #48830
    
    Co-authored-by: zhangdong <[email protected]>
---
 .../doris/job/extensions/insert/InsertTask.java    | 10 +++---
 .../apache/doris/job/extensions/mtmv/MTMVTask.java | 41 ++++++++++++++++++----
 .../org/apache/doris/job/task/AbstractTask.java    | 31 +++++++++++-----
 .../main/java/org/apache/doris/job/task/Task.java  |  7 ++--
 .../suites/mtmv_p0/test_task_mtmv.groovy           |  3 ++
 5 files changed, 70 insertions(+), 22 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
index ff18f611c1e..eececebe419 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
@@ -212,18 +212,18 @@ public class InsertTask extends AbstractTask {
     }
 
     @Override
-    public void onFail() throws JobException {
+    public boolean onFail() throws JobException {
         if (isCanceled.get()) {
-            return;
+            return false;
         }
         isFinished.set(true);
-        super.onFail();
+        return super.onFail();
     }
 
     @Override
-    public void onSuccess() throws JobException {
+    public boolean onSuccess() throws JobException {
         isFinished.set(true);
-        super.onSuccess();
+        return super.onSuccess();
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
index 198ba218918..bce0e0ba73a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
@@ -362,24 +362,53 @@ public class MTMVTask extends AbstractTask {
     }
 
     @Override
-    public synchronized void onFail() throws JobException {
+    public synchronized boolean onFail() throws JobException {
         LOG.info("mtmv task onFail, taskId: {}", super.getTaskId());
-        super.onFail();
+        boolean res = super.onFail();
+        if (!res) {
+            return false;
+        }
         after();
+        return true;
     }
 
     @Override
-    public synchronized void onSuccess() throws JobException {
+    public synchronized boolean onSuccess() throws JobException {
         if (LOG.isDebugEnabled()) {
             LOG.debug("mtmv task onSuccess, taskId: {}", super.getTaskId());
         }
-        super.onSuccess();
+        boolean res = super.onSuccess();
+        if (!res) {
+            return false;
+        }
         after();
+        return true;
     }
 
+    /**
+     * The reason for overriding the parent class is to add synchronized 
protection
+     */
     @Override
-    protected synchronized void executeCancelLogic(boolean 
needWaitCancelComplete) {
-        LOG.info("mtmv task cancel, taskId: {}", super.getTaskId());
+    public synchronized boolean cancel(boolean needWaitCancelComplete) throws 
JobException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("mtmv task cancel, taskId: {}", super.getTaskId());
+        }
+        return super.cancel(needWaitCancelComplete);
+    }
+
+    @Override
+    protected void executeCancelLogic(boolean needWaitCancelComplete) {
+        try {
+            // Mtmv is initialized in the before method.
+            // If the task has not yet run, the before method will not be 
used, so mtmv will be empty,
+            // which prevents the canceled task from being added to the 
history list
+            if (mtmv == null) {
+                mtmv = MTMVUtil.getMTMV(dbId, mtmvId);
+            }
+        } catch (UserException e) {
+            LOG.warn("executeCancelLogic failed:", e);
+            return;
+        }
         if (executor != null) {
             executor.cancel(new Status(TStatusCode.CANCELLED, "mtmv task 
cancelled"), needWaitCancelComplete);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
index b356bc58d32..a40696a8664 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
@@ -63,12 +63,16 @@ public abstract class AbstractTask implements Task {
     }
 
     @Override
-    public void onFail() throws JobException {
+    public boolean onFail() throws JobException {
+        if (TaskStatus.CANCELED.equals(status)) {
+            return false;
+        }
         status = TaskStatus.FAILED;
         if (!isCallable()) {
-            return;
+            return false;
         }
         Env.getCurrentEnv().getJobManager().getJob(jobId).onTaskFail(this);
+        return true;
     }
 
     @Override
@@ -109,21 +113,22 @@ public abstract class AbstractTask implements Task {
     protected abstract void closeOrReleaseResources();
 
     @Override
-    public void onSuccess() throws JobException {
+    public boolean onSuccess() throws JobException {
         if (TaskStatus.CANCELED.equals(status)) {
-            return;
+            return false;
         }
         status = TaskStatus.SUCCESS;
         setFinishTimeMs(System.currentTimeMillis());
         if (!isCallable()) {
-            return;
+            return false;
         }
         Job job = Env.getCurrentEnv().getJobManager().getJob(getJobId());
         if (null == job) {
             log.info("job is null, job id is {}", jobId);
-            return;
+            return false;
         }
         job.onTaskSuccess(this);
+        return true;
     }
 
     /**
@@ -135,10 +140,15 @@ public abstract class AbstractTask implements Task {
      *                      the original exception.
      */
     @Override
-    public void cancel(boolean needWaitCancelComplete) throws JobException {
+    public boolean cancel(boolean needWaitCancelComplete) throws JobException {
+        if (TaskStatus.SUCCESS.equals(status) || 
TaskStatus.FAILED.equals(status) || TaskStatus.CANCELED.equals(
+                status)) {
+            return false;
+        }
         try {
             status = TaskStatus.CANCELED;
             executeCancelLogic(needWaitCancelComplete);
+            return true;
         } catch (Exception e) {
             log.warn("cancel task failed, job id is {}, task id is {}", jobId, 
taskId, e);
             throw new JobException(e);
@@ -174,7 +184,12 @@ public abstract class AbstractTask implements Task {
             onFail();
             log.warn("execute task error, job id is {}, task id is {}", jobId, 
taskId, e);
         } finally {
-            closeOrReleaseResources();
+            // The cancel logic will call the closeOrReleased Resources method 
by itself.
+            // If it is also called here,
+            // it may result in the inability to obtain relevant information 
when canceling the task
+            if (!TaskStatus.CANCELED.equals(status)) {
+                closeOrReleaseResources();
+            }
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java
index d184f647075..487c62b8934 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java
@@ -44,7 +44,7 @@ public interface Task {
      * This method is called when the task fails to execute successfully.
      * Implementations can use this method to handle any failure scenarios.
      */
-    void onFail() throws JobException;
+    boolean onFail() throws JobException;
 
     /**
      * This method is called when the task fails to execute successfully, with 
an additional error message.
@@ -58,15 +58,16 @@ public interface Task {
      * This method is called when the task executes successfully.
      * Implementations can use this method to handle successful execution 
scenarios.
      */
-    void onSuccess() throws JobException;
+    boolean onSuccess() throws JobException;
 
     /**
      * This method is called to cancel the execution of the task.
      * Implementations should define the necessary steps to cancel the task.
      *
      * @param needWaitCancelComplete Do we need to wait for the cancellation 
to be completed.
+     * @return if cancel success
      */
-    void cancel(boolean needWaitCancelComplete) throws JobException;
+    boolean cancel(boolean needWaitCancelComplete) throws JobException;
 
     /**
      * get info for tvf `tasks`
diff --git a/regression-test/suites/mtmv_p0/test_task_mtmv.groovy 
b/regression-test/suites/mtmv_p0/test_task_mtmv.groovy
index 72356b45a79..41859ddeefe 100644
--- a/regression-test/suites/mtmv_p0/test_task_mtmv.groovy
+++ b/regression-test/suites/mtmv_p0/test_task_mtmv.groovy
@@ -57,6 +57,9 @@ suite("test_task_mtmv") {
            log.info("cancel error msg: " + e.getMessage())
            assertTrue(e.getMessage().contains("no running task"));
         }
+    def tasksAfterCancel = sql """ select TaskId from tasks('type'='mv') where 
MvName = '${mvName}';"""
+   // should only has one task after cancel
+    assertEquals(1, tasksAfterCancel.size());
     sql """drop materialized view if exists ${mvName};"""
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to