This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 253e59790d9 branch-4.0: [fix](streaming job) fix streaming job status
change improper #56762 (#56811)
253e59790d9 is described below
commit 253e59790d9864f0b2d6a74d9ae837cd3715ff8d
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sun Oct 12 14:20:56 2025 +0800
branch-4.0: [fix](streaming job) fix streaming job status change improper
#56762 (#56811)
Cherry-picked from #56762
Co-authored-by: hui lai <[email protected]>
---
.../org/apache/doris/job/base/AbstractJob.java | 9 +-
.../doris/job/base/AbstractJobStatusTest.java | 220 +++++++++++++++++++++
2 files changed, 228 insertions(+), 1 deletion(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
index b24c70cedbc..0f5ac748cde 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
@@ -297,6 +297,10 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
checkJobParamsInternal();
}
+ /**
+ * STOPPED status should not change to any status,
+ * any status can change to STOPPED.
+ */
public void updateJobStatus(JobStatus newJobStatus) throws JobException {
if (null == newJobStatus) {
throw new IllegalArgumentException("jobStatus cannot be null");
@@ -306,11 +310,14 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
}
String errorMsg = String.format("Can't update job %s status to the %s
status",
jobStatus.name(), newJobStatus.name());
+ if (newJobStatus.equals(JobStatus.PENDING) &&
jobStatus.equals(JobStatus.STOPPED)) {
+ throw new IllegalArgumentException(errorMsg);
+ }
if (newJobStatus.equals(JobStatus.RUNNING)
&& (!jobStatus.equals(JobStatus.PAUSED) &&
!jobStatus.equals(JobStatus.PENDING))) {
throw new IllegalArgumentException(errorMsg);
}
- if (newJobStatus.equals(JobStatus.STOPPED) &&
!jobStatus.equals(JobStatus.RUNNING)) {
+ if (newJobStatus.equals(JobStatus.PAUSED) &&
jobStatus.equals(JobStatus.STOPPED)) {
throw new IllegalArgumentException(errorMsg);
}
if (newJobStatus.equals(JobStatus.FINISHED)) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/job/base/AbstractJobStatusTest.java
b/fe/fe-core/src/test/java/org/apache/doris/job/base/AbstractJobStatusTest.java
new file mode 100644
index 00000000000..faf39164fc3
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/job/base/AbstractJobStatusTest.java
@@ -0,0 +1,220 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.base;
+
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.job.common.JobStatus;
+import org.apache.doris.job.common.TaskStatus;
+import org.apache.doris.job.common.TaskType;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.task.AbstractTask;
+import org.apache.doris.thrift.TRow;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+class AbstractJobStatusTest {
+
+ private static class DummyTask extends AbstractTask {
+ private final AtomicBoolean cancelledLogicCalled = new
AtomicBoolean(false);
+
+ DummyTask(long taskId) {
+ setTaskId(taskId);
+ setStatus(TaskStatus.PENDING);
+ }
+
+ @Override
+ protected void closeOrReleaseResources() { }
+
+ @Override
+ protected void executeCancelLogic(boolean needWaitCancelComplete) {
+ cancelledLogicCalled.set(true);
+ }
+
+ @Override
+ public TRow getTvfInfo(String jobName) {
+ return null;
+ }
+
+ @Override
+ public void run() throws JobException {
+ // dummy implementation
+ }
+
+ boolean cancelLogicCalled() {
+ return cancelledLogicCalled.get();
+ }
+ }
+
+ private static class DummyJob extends AbstractJob<DummyTask, Void> {
+ private final List<DummyTask> history = new ArrayList<>();
+
+ DummyJob(JobStatus initial) {
+ super(1L, "job", initial, "db", "comment",
+ UserIdentity.ROOT, createJobConfig());
+ }
+
+ private static JobExecutionConfiguration createJobConfig() {
+ JobExecutionConfiguration config = new JobExecutionConfiguration();
+ config.setExecuteType(JobExecuteType.ONE_TIME);
+ return config;
+ }
+
+ @Override
+ protected void checkJobParamsInternal() { }
+
+ @Override
+ public List<DummyTask> createTasks(TaskType taskType, Void ctx) {
+ DummyTask t = new DummyTask(100L);
+ history.add(t);
+ return Collections.singletonList(t);
+ }
+
+ @Override
+ public boolean isReadyForScheduling(Void ctx) {
+ return true;
+ }
+
+ @Override
+ public org.apache.doris.qe.ShowResultSetMetaData getTaskMetaData() {
+ return null;
+ }
+
+ @Override
+ public org.apache.doris.job.common.JobType getJobType() {
+ return null;
+ }
+
+ @Override
+ public List<DummyTask> queryTasks() {
+ return history;
+ }
+
+ @Override
+ public String formatMsgWhenExecuteQueueFull(Long taskId) {
+ return "";
+ }
+
+ @Override
+ public void write(java.io.DataOutput out) throws java.io.IOException {
+ // dummy implementation for Writable interface
+ }
+ }
+
+ /**
+ * Test pending status
+ */
+ @Test
+ void testPendingFromPaused() throws Exception {
+ DummyJob job = new DummyJob(JobStatus.PAUSED);
+ job.updateJobStatus(JobStatus.PENDING);
+ Assertions.assertEquals(JobStatus.PENDING, job.getJobStatus());
+ }
+
+ @Test
+ void testPendingFromRunning() throws Exception {
+ DummyJob job = new DummyJob(JobStatus.RUNNING);
+ job.updateJobStatus(JobStatus.PENDING);
+ Assertions.assertEquals(JobStatus.PENDING, job.getJobStatus());
+ }
+
+ @Test
+ void testPendingFromStoppedIsInvalid() {
+ DummyJob job = new DummyJob(JobStatus.STOPPED);
+ Assertions.assertThrows(IllegalArgumentException.class, () ->
job.updateJobStatus(JobStatus.PENDING));
+ }
+
+ /**
+ * Test running status
+ */
+ @Test
+ void testRunningFromPending() throws Exception {
+ DummyJob job = new DummyJob(JobStatus.PENDING);
+ job.updateJobStatus(JobStatus.RUNNING);
+ Assertions.assertEquals(JobStatus.RUNNING, job.getJobStatus());
+ }
+
+ @Test
+ void testRunningFromPaused() throws Exception {
+ DummyJob job = new DummyJob(JobStatus.PAUSED);
+ job.updateJobStatus(JobStatus.RUNNING);
+ Assertions.assertEquals(JobStatus.RUNNING, job.getJobStatus());
+ }
+
+ /**
+ * Test pause status
+ */
+ @Test
+ void testPauseFromRunning() throws Exception {
+ DummyJob job = new DummyJob(JobStatus.PENDING);
+ job.updateJobStatus(JobStatus.RUNNING);
+ job.updateJobStatus(JobStatus.PAUSED);
+ Assertions.assertEquals(JobStatus.PAUSED, job.getJobStatus());
+ }
+
+ @Test
+ void testPauseFromPending() throws Exception {
+ DummyJob job = new DummyJob(JobStatus.PENDING);
+ job.updateJobStatus(JobStatus.PAUSED);
+ Assertions.assertEquals(JobStatus.PAUSED, job.getJobStatus());
+ }
+
+ @Test
+ void testPauseFromStoppedIsInvalid() {
+ DummyJob job = new DummyJob(JobStatus.STOPPED);
+ Assertions.assertThrows(IllegalArgumentException.class, () ->
job.updateJobStatus(JobStatus.PAUSED));
+ }
+
+ /**
+ * Test stop status
+ */
+ @Test
+ void testStopFromPending() throws Exception {
+ DummyJob job = new DummyJob(JobStatus.PENDING);
+ job.updateJobStatus(JobStatus.STOPPED);
+ Assertions.assertEquals(JobStatus.STOPPED, job.getJobStatus());
+ }
+
+ @Test
+ void testStopFromPaused() throws Exception {
+ DummyJob job = new DummyJob(JobStatus.PAUSED);
+ job.updateJobStatus(JobStatus.STOPPED);
+ Assertions.assertEquals(JobStatus.STOPPED, job.getJobStatus());
+ }
+
+ @Test
+ void testStopFromRunning() throws Exception {
+ DummyJob job = new DummyJob(JobStatus.RUNNING);
+ job.updateJobStatus(JobStatus.STOPPED);
+ Assertions.assertEquals(JobStatus.STOPPED, job.getJobStatus());
+ }
+
+ @Test
+ void testFinishSetsFinishTime() throws Exception {
+ DummyJob job = new DummyJob(JobStatus.RUNNING);
+ long before = System.currentTimeMillis();
+ job.updateJobStatus(JobStatus.FINISHED);
+ Assertions.assertEquals(JobStatus.FINISHED, job.getJobStatus());
+ Assertions.assertTrue(job.getFinishTimeMs() >= before);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]