This is an automated email from the ASF dual-hosted git repository.
liaoxin01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a789c9941e8 [fix](job-manager) cancelTaskById should not be blocked by
unrelated streaming jobs (#62940)
a789c9941e8 is described below
commit a789c9941e847e7945e9849930b3dd4cf58badc6
Author: wudi <[email protected]>
AuthorDate: Thu May 14 23:11:27 2026 +0800
[fix](job-manager) cancelTaskById should not be blocked by unrelated
streaming jobs (#62940)
### What problem does this PR solve?
Problem Summary:
`JobManager.cancelTaskById` rejects all `CANCEL TASK` calls when any
streaming job exists in `jobMap`. The streaming-type check is placed
before the `jobName` match, so it fires on every iteration:
```java
for (T job : jobMap.values()) {
if
(job.getJobConfig().getExecuteType().equals(JobExecuteType.STREAMING)) {
throw new JobException("streaming job not support cancel task by
id");
}
if (job.getJobName().equals(jobName)) { ... }
}
```
If a single streaming job exists in `jobMap`, `CANCEL TASK FOR
<any-non-streaming-job>` throws "streaming job not support cancel task
by id" before ever matching the actual target.
### Fix
Move the streaming-type check inside the `jobName` match, so it only
fires when the matched job is actually a streaming job.
### Release note
Fix `CANCEL TASK` on non-streaming jobs incorrectly rejected when an
unrelated streaming job exists in the job map.
---
.../org/apache/doris/job/manager/JobManager.java | 6 ++--
.../apache/doris/job/manager/JobManagerTest.java | 39 ++++++++++++++++++++++
2 files changed, 42 insertions(+), 3 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index 9cc2857a754..23f51890da5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -424,10 +424,10 @@ public class JobManager<T extends AbstractJob<?, C>, C>
implements Writable {
*/
public void cancelTaskById(String jobName, Long taskId) throws
JobException {
for (T job : jobMap.values()) {
- if
(job.getJobConfig().getExecuteType().equals(JobExecuteType.STREAMING)) {
- throw new JobException("streaming job not support cancel task
by id");
- }
if (job.getJobName().equals(jobName)) {
+ if
(job.getJobConfig().getExecuteType().equals(JobExecuteType.STREAMING)) {
+ throw new JobException("streaming job not support cancel
task by id");
+ }
job.cancelTaskById(taskId);
job.logUpdateOperation();
return;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/job/manager/JobManagerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/job/manager/JobManagerTest.java
index 8a0f8f20305..0e2c07fd072 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/job/manager/JobManagerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/job/manager/JobManagerTest.java
@@ -19,6 +19,11 @@ package org.apache.doris.job.manager;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.job.base.AbstractJob;
+import org.apache.doris.job.base.JobExecuteType;
+import org.apache.doris.job.base.JobExecutionConfiguration;
+import org.apache.doris.job.exception.JobException;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.utframe.TestWithFeService;
@@ -30,6 +35,7 @@ import org.mockito.Mockito;
import java.io.IOException;
import java.util.HashSet;
+import java.util.Map;
public class JobManagerTest {
@Test
@@ -60,4 +66,37 @@ public class JobManagerTest {
}
}
}
+
+ private static AbstractJob mockJob(long id, String name, JobExecuteType
type) {
+ AbstractJob job = Mockito.mock(AbstractJob.class);
+ Mockito.when(job.getJobId()).thenReturn(id);
+ Mockito.when(job.getJobName()).thenReturn(name);
+ JobExecutionConfiguration cfg = new JobExecutionConfiguration();
+ cfg.setExecuteType(type);
+ Mockito.when(job.getJobConfig()).thenReturn(cfg);
+ return job;
+ }
+
+ @Test
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public void testCancelTaskByIdNotBlockedByOtherStreamingJob() throws
JobException {
+ JobManager manager = new JobManager();
+ AbstractJob streamingJob = mockJob(1L, "streaming_job",
JobExecuteType.STREAMING);
+ AbstractJob batchJob = mockJob(2L, "batch_job",
JobExecuteType.RECURRING);
+ Map<Long, AbstractJob> jobMap = (Map<Long, AbstractJob>)
Deencapsulation.getField(manager, "jobMap");
+ jobMap.put(1L, streamingJob);
+ jobMap.put(2L, batchJob);
+
+ // Cancelling the batch job must not be blocked by the unrelated
streaming job in jobMap.
+ manager.cancelTaskById("batch_job", 100L);
+ Mockito.verify(batchJob).cancelTaskById(100L);
+
+ // Cancelling the streaming job itself still rejected.
+ try {
+ manager.cancelTaskById("streaming_job", 100L);
+ Assert.fail("expected JobException for streaming job");
+ } catch (JobException e) {
+ Assert.assertTrue(e.getMessage().contains("streaming job not
support"));
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]