This is an automated email from the ASF dual-hosted git repository.
JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new cc2224b5444 [fix](streaming-job) keep isCanceled set when cancel runs
on terminal task (#63427)
cc2224b5444 is described below
commit cc2224b544423e1ab814c287fec046c538f2f6b0
Author: wudi <[email protected]>
AuthorDate: Fri May 22 10:08:12 2026 +0800
[fix](streaming-job) keep isCanceled set when cancel runs on terminal task
(#63427)
### What problem does this PR solve?
Streaming insert job (CDC source / JdbcSourceOffsetProvider) can become
permanently stuck in PAUSED when a BE-side commit arrives after FE-side
task timeout. Symptoms observed in production:
- Job status PAUSED with empty ErrorMsg / JobRuntimeMsg.
- Latest task PENDING and never scheduled (scheduler logs "do not need
to schedule invalid task ... job status: PAUSED").
- Previous task status SUCCESS but its ErrorMsg = "task failed cause
timeout".
- `auto resume` never recovers the job; only manual `RESUME JOB` works.
---
.../insert/streaming/AbstractStreamingTask.java | 8 +-
.../insert/streaming/StreamingInsertJob.java | 6 ++
.../StreamingInsertJobLateCallbackTest.java | 120 +++++++++++++++++++++
3 files changed, 130 insertions(+), 4 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java
index 25da9ee90a6..62adf21daf6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java
@@ -140,15 +140,15 @@ public abstract class AbstractStreamingTask {
}
public void cancel(boolean needWaitCancelComplete) {
+ // Flip isCanceled even on terminal states so late BE callbacks
short-circuit.
+ if (getIsCanceled().getAndSet(true)) {
+ return;
+ }
if (TaskStatus.SUCCESS.equals(status) ||
TaskStatus.FAILED.equals(status)
|| TaskStatus.CANCELED.equals(status)) {
return;
}
status = TaskStatus.CANCELED;
- if (getIsCanceled().get()) {
- return;
- }
- getIsCanceled().getAndSet(true);
this.errMsg = "task cancelled";
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index cf431fd7601..060901125c6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -1437,6 +1437,12 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
try {
if (this.runningStreamTask != null
&& this.runningStreamTask instanceof
StreamingMultiTblTask) {
+ if (this.runningStreamTask.getIsCanceled().get()) {
+ log.info("Streaming multi table job {} skip late commit
offset on canceled task "
+ + "(expected: {}, actual: {})",
+ getJobId(), this.runningStreamTask.getTaskId(),
offsetRequest.getTaskId());
+ return;
+ }
if (this.runningStreamTask.getTaskId() !=
offsetRequest.getTaskId()) {
throw new JobException("Task id mismatch when commit
offset. expected: "
+ this.runningStreamTask.getTaskId() + ", actual:
" + offsetRequest.getTaskId());
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJobLateCallbackTest.java
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJobLateCallbackTest.java
new file mode 100644
index 00000000000..66c0679d1b1
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJobLateCallbackTest.java
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.extensions.insert.streaming;
+
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.job.cdc.request.CommitOffsetRequest;
+import org.apache.doris.job.common.JobStatus;
+import org.apache.doris.job.common.TaskStatus;
+import org.apache.doris.job.offset.jdbc.JdbcSourceOffsetProvider;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class StreamingInsertJobLateCallbackTest {
+
+ private static StreamingMultiTblTask newTask(long taskId, TaskStatus
initialStatus) {
+ StreamingJobProperties jobProps = new StreamingJobProperties(new
HashMap<>());
+ StreamingMultiTblTask task = new StreamingMultiTblTask(
+ 0L, taskId, null, null, null, null, null, jobProps, null,
null);
+ Deencapsulation.setField(task, "status", initialStatus);
+ return task;
+ }
+
+ @Test
+ public void testCancelMarksIsCanceledOnFailedTask() {
+ StreamingMultiTblTask task = newTask(1001L, TaskStatus.FAILED);
+
+ task.cancel(true);
+
+ Assert.assertTrue("isCanceled must flip even when task already FAILED",
+ task.getIsCanceled().get());
+ Assert.assertEquals("status preserved when already terminal",
+ TaskStatus.FAILED, task.getStatus());
+ }
+
+ @Test
+ public void testCancelMarksIsCanceledOnSuccessTask() {
+ StreamingMultiTblTask task = newTask(1002L, TaskStatus.SUCCESS);
+
+ task.cancel(true);
+
+ Assert.assertTrue(task.getIsCanceled().get());
+ Assert.assertEquals(TaskStatus.SUCCESS, task.getStatus());
+ }
+
+ @Test
+ public void testCancelTransitionsRunningToCanceled() {
+ StreamingMultiTblTask task = newTask(1003L, TaskStatus.RUNNING);
+
+ task.cancel(true);
+
+ Assert.assertTrue(task.getIsCanceled().get());
+ Assert.assertEquals(TaskStatus.CANCELED, task.getStatus());
+ }
+
+ @Test
+ public void testCancelIdempotent() {
+ StreamingMultiTblTask task = newTask(1004L, TaskStatus.RUNNING);
+
+ task.cancel(true);
+ Assert.assertEquals(TaskStatus.CANCELED, task.getStatus());
+ Assert.assertTrue(task.getIsCanceled().get());
+
+ Deencapsulation.setField(task, "errMsg", "first cancel");
+ task.cancel(true);
+ Assert.assertEquals("second cancel must early-return and leave state
untouched",
+ "first cancel", Deencapsulation.getField(task, "errMsg"));
+ }
+
+ @Test
+ public void testCommitOffsetSkipsCanceledTask() throws Exception {
+ StreamingInsertJob job =
Deencapsulation.newInstance(StreamingInsertJob.class);
+ Deencapsulation.setField(job, "lock", new
ReentrantReadWriteLock(true));
+ Deencapsulation.setField(job, "jobId", 9001L);
+ Deencapsulation.setField(job, "jobName", "test_job");
+ Deencapsulation.setField(job, "jobStatus", JobStatus.PAUSED);
+
+ JdbcSourceOffsetProvider provider =
Deencapsulation.newInstance(JdbcSourceOffsetProvider.class);
+ Deencapsulation.setField(job, "offsetProvider", provider);
+
+ StreamingMultiTblTask task = newTask(7777L, TaskStatus.FAILED);
+ // simulate the bug timeline: task already FAILED via onFail, then
cancel marks isCanceled.
+ task.cancel(true);
+ Assert.assertTrue(task.getIsCanceled().get());
+
+ Deencapsulation.setField(job, "runningStreamTask", task);
+
+ CommitOffsetRequest req = new CommitOffsetRequest();
+ req.setJobId(9001L);
+ req.setTaskId(7777L);
+ req.setOffset("[{\"splitId\":\"binlog-split\"}]");
+ req.setScannedRows(123L);
+ req.setLoadBytes(456L);
+
+ // Should silently skip — no JobException, no Task.status flip back to
SUCCESS,
+ // no successCallback side-effects.
+ job.commitOffset(req);
+
+ Assert.assertEquals("task status must stay terminal — late callback
ignored",
+ TaskStatus.FAILED, task.getStatus());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]