This is an automated email from the ASF dual-hosted git repository.

JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new cc2224b5444 [fix](streaming-job) keep isCanceled set when cancel runs 
on terminal task (#63427)
cc2224b5444 is described below

commit cc2224b544423e1ab814c287fec046c538f2f6b0
Author: wudi <[email protected]>
AuthorDate: Fri May 22 10:08:12 2026 +0800

    [fix](streaming-job) keep isCanceled set when cancel runs on terminal task 
(#63427)
    
    ### What problem does this PR solve?
    
    Streaming insert job (CDC source / JdbcSourceOffsetProvider) can become
    permanently stuck in PAUSED when a BE-side commit arrives after FE-side
    task timeout. Symptoms observed in production:
    
    - Job status PAUSED with empty ErrorMsg / JobRuntimeMsg.
    - Latest task PENDING and never scheduled (scheduler logs "do not need
    to schedule invalid task ... job status: PAUSED").
    - Previous task status SUCCESS but its ErrorMsg = "task failed cause
    timeout".
    - `auto resume` never recovers the job; only manual `RESUME JOB` works.
---
 .../insert/streaming/AbstractStreamingTask.java    |   8 +-
 .../insert/streaming/StreamingInsertJob.java       |   6 ++
 .../StreamingInsertJobLateCallbackTest.java        | 120 +++++++++++++++++++++
 3 files changed, 130 insertions(+), 4 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java
index 25da9ee90a6..62adf21daf6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java
@@ -140,15 +140,15 @@ public abstract class AbstractStreamingTask {
     }
 
     public void cancel(boolean needWaitCancelComplete) {
+        // Flip isCanceled even on terminal states so late BE callbacks 
short-circuit.
+        if (getIsCanceled().getAndSet(true)) {
+            return;
+        }
         if (TaskStatus.SUCCESS.equals(status) || 
TaskStatus.FAILED.equals(status)
                 || TaskStatus.CANCELED.equals(status)) {
             return;
         }
         status = TaskStatus.CANCELED;
-        if (getIsCanceled().get()) {
-            return;
-        }
-        getIsCanceled().getAndSet(true);
         this.errMsg = "task cancelled";
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index cf431fd7601..060901125c6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -1437,6 +1437,12 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         try {
             if (this.runningStreamTask != null
                     && this.runningStreamTask instanceof 
StreamingMultiTblTask) {
+                if (this.runningStreamTask.getIsCanceled().get()) {
+                    log.info("Streaming multi table job {} skip late commit 
offset on canceled task "
+                                    + "(expected: {}, actual: {})",
+                            getJobId(), this.runningStreamTask.getTaskId(), 
offsetRequest.getTaskId());
+                    return;
+                }
                 if (this.runningStreamTask.getTaskId() != 
offsetRequest.getTaskId()) {
                     throw new JobException("Task id mismatch when commit 
offset. expected: "
                             + this.runningStreamTask.getTaskId() + ", actual: 
" + offsetRequest.getTaskId());
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJobLateCallbackTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJobLateCallbackTest.java
new file mode 100644
index 00000000000..66c0679d1b1
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJobLateCallbackTest.java
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.extensions.insert.streaming;
+
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.job.cdc.request.CommitOffsetRequest;
+import org.apache.doris.job.common.JobStatus;
+import org.apache.doris.job.common.TaskStatus;
+import org.apache.doris.job.offset.jdbc.JdbcSourceOffsetProvider;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class StreamingInsertJobLateCallbackTest {
+
+    private static StreamingMultiTblTask newTask(long taskId, TaskStatus 
initialStatus) {
+        StreamingJobProperties jobProps = new StreamingJobProperties(new 
HashMap<>());
+        StreamingMultiTblTask task = new StreamingMultiTblTask(
+                0L, taskId, null, null, null, null, null, jobProps, null, 
null);
+        Deencapsulation.setField(task, "status", initialStatus);
+        return task;
+    }
+
+    @Test
+    public void testCancelMarksIsCanceledOnFailedTask() {
+        StreamingMultiTblTask task = newTask(1001L, TaskStatus.FAILED);
+
+        task.cancel(true);
+
+        Assert.assertTrue("isCanceled must flip even when task already FAILED",
+                task.getIsCanceled().get());
+        Assert.assertEquals("status preserved when already terminal",
+                TaskStatus.FAILED, task.getStatus());
+    }
+
+    @Test
+    public void testCancelMarksIsCanceledOnSuccessTask() {
+        StreamingMultiTblTask task = newTask(1002L, TaskStatus.SUCCESS);
+
+        task.cancel(true);
+
+        Assert.assertTrue(task.getIsCanceled().get());
+        Assert.assertEquals(TaskStatus.SUCCESS, task.getStatus());
+    }
+
+    @Test
+    public void testCancelTransitionsRunningToCanceled() {
+        StreamingMultiTblTask task = newTask(1003L, TaskStatus.RUNNING);
+
+        task.cancel(true);
+
+        Assert.assertTrue(task.getIsCanceled().get());
+        Assert.assertEquals(TaskStatus.CANCELED, task.getStatus());
+    }
+
+    @Test
+    public void testCancelIdempotent() {
+        StreamingMultiTblTask task = newTask(1004L, TaskStatus.RUNNING);
+
+        task.cancel(true);
+        Assert.assertEquals(TaskStatus.CANCELED, task.getStatus());
+        Assert.assertTrue(task.getIsCanceled().get());
+
+        Deencapsulation.setField(task, "errMsg", "first cancel");
+        task.cancel(true);
+        Assert.assertEquals("second cancel must early-return and leave state 
untouched",
+                "first cancel", Deencapsulation.getField(task, "errMsg"));
+    }
+
+    @Test
+    public void testCommitOffsetSkipsCanceledTask() throws Exception {
+        StreamingInsertJob job = 
Deencapsulation.newInstance(StreamingInsertJob.class);
+        Deencapsulation.setField(job, "lock", new 
ReentrantReadWriteLock(true));
+        Deencapsulation.setField(job, "jobId", 9001L);
+        Deencapsulation.setField(job, "jobName", "test_job");
+        Deencapsulation.setField(job, "jobStatus", JobStatus.PAUSED);
+
+        JdbcSourceOffsetProvider provider = 
Deencapsulation.newInstance(JdbcSourceOffsetProvider.class);
+        Deencapsulation.setField(job, "offsetProvider", provider);
+
+        StreamingMultiTblTask task = newTask(7777L, TaskStatus.FAILED);
+        // simulate the bug timeline: task already FAILED via onFail, then 
cancel marks isCanceled.
+        task.cancel(true);
+        Assert.assertTrue(task.getIsCanceled().get());
+
+        Deencapsulation.setField(job, "runningStreamTask", task);
+
+        CommitOffsetRequest req = new CommitOffsetRequest();
+        req.setJobId(9001L);
+        req.setTaskId(7777L);
+        req.setOffset("[{\"splitId\":\"binlog-split\"}]");
+        req.setScannedRows(123L);
+        req.setLoadBytes(456L);
+
+        // Should silently skip — no JobException, no Task.status flip back to 
SUCCESS,
+        // no successCallback side-effects.
+        job.commitOffset(req);
+
+        Assert.assertEquals("task status must stay terminal — late callback 
ignored",
+                TaskStatus.FAILED, task.getStatus());
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to