This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 253e59790d9 branch-4.0: [fix](streaming job) fix streaming job status 
change improper #56762 (#56811)
253e59790d9 is described below

commit 253e59790d9864f0b2d6a74d9ae837cd3715ff8d
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sun Oct 12 14:20:56 2025 +0800

    branch-4.0: [fix](streaming job) fix streaming job status change improper 
#56762 (#56811)
    
    Cherry-picked from #56762
    
    Co-authored-by: hui lai <[email protected]>
---
 .../org/apache/doris/job/base/AbstractJob.java     |   9 +-
 .../doris/job/base/AbstractJobStatusTest.java      | 220 +++++++++++++++++++++
 2 files changed, 228 insertions(+), 1 deletion(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
index b24c70cedbc..0f5ac748cde 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
@@ -297,6 +297,10 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
         checkJobParamsInternal();
     }
 
+    /**
+     * STOPPED status should not change to any status,
+     * any status can change to STOPPED.
+     */
     public void updateJobStatus(JobStatus newJobStatus) throws JobException {
         if (null == newJobStatus) {
             throw new IllegalArgumentException("jobStatus cannot be null");
@@ -306,11 +310,14 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
         }
         String errorMsg = String.format("Can't update job %s status to the %s 
status",
                 jobStatus.name(), newJobStatus.name());
+        if (newJobStatus.equals(JobStatus.PENDING) && 
jobStatus.equals(JobStatus.STOPPED)) {
+            throw new IllegalArgumentException(errorMsg);
+        }
         if (newJobStatus.equals(JobStatus.RUNNING)
                 && (!jobStatus.equals(JobStatus.PAUSED) && 
!jobStatus.equals(JobStatus.PENDING))) {
             throw new IllegalArgumentException(errorMsg);
         }
-        if (newJobStatus.equals(JobStatus.STOPPED) && 
!jobStatus.equals(JobStatus.RUNNING)) {
+        if (newJobStatus.equals(JobStatus.PAUSED) && 
jobStatus.equals(JobStatus.STOPPED)) {
             throw new IllegalArgumentException(errorMsg);
         }
         if (newJobStatus.equals(JobStatus.FINISHED)) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/job/base/AbstractJobStatusTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/job/base/AbstractJobStatusTest.java
new file mode 100644
index 00000000000..faf39164fc3
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/job/base/AbstractJobStatusTest.java
@@ -0,0 +1,220 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.base;
+
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.job.common.JobStatus;
+import org.apache.doris.job.common.TaskStatus;
+import org.apache.doris.job.common.TaskType;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.task.AbstractTask;
+import org.apache.doris.thrift.TRow;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+class AbstractJobStatusTest {
+
+    private static class DummyTask extends AbstractTask {
+        private final AtomicBoolean cancelledLogicCalled = new 
AtomicBoolean(false);
+
+        DummyTask(long taskId) {
+            setTaskId(taskId);
+            setStatus(TaskStatus.PENDING);
+        }
+
+        @Override
+        protected void closeOrReleaseResources() { }
+
+        @Override
+        protected void executeCancelLogic(boolean needWaitCancelComplete) {
+            cancelledLogicCalled.set(true);
+        }
+
+        @Override
+        public TRow getTvfInfo(String jobName) {
+            return null;
+        }
+
+        @Override
+        public void run() throws JobException {
+            // dummy implementation
+        }
+
+        boolean cancelLogicCalled() {
+            return cancelledLogicCalled.get();
+        }
+    }
+
+    private static class DummyJob extends AbstractJob<DummyTask, Void> {
+        private final List<DummyTask> history = new ArrayList<>();
+
+        DummyJob(JobStatus initial) {
+            super(1L, "job", initial, "db", "comment",
+                    UserIdentity.ROOT, createJobConfig());
+        }
+
+        private static JobExecutionConfiguration createJobConfig() {
+            JobExecutionConfiguration config = new JobExecutionConfiguration();
+            config.setExecuteType(JobExecuteType.ONE_TIME);
+            return config;
+        }
+
+        @Override
+        protected void checkJobParamsInternal() { }
+
+        @Override
+        public List<DummyTask> createTasks(TaskType taskType, Void ctx) {
+            DummyTask t = new DummyTask(100L);
+            history.add(t);
+            return Collections.singletonList(t);
+        }
+
+        @Override
+        public boolean isReadyForScheduling(Void ctx) {
+            return true;
+        }
+
+        @Override
+        public org.apache.doris.qe.ShowResultSetMetaData getTaskMetaData() {
+            return null;
+        }
+
+        @Override
+        public org.apache.doris.job.common.JobType getJobType() {
+            return null;
+        }
+
+        @Override
+        public List<DummyTask> queryTasks() {
+            return history;
+        }
+
+        @Override
+        public String formatMsgWhenExecuteQueueFull(Long taskId) {
+            return "";
+        }
+
+        @Override
+        public void write(java.io.DataOutput out) throws java.io.IOException {
+            // dummy implementation for Writable interface
+        }
+    }
+
+    /**
+     * Test pending status
+     */
+    @Test
+    void testPendingFromPaused() throws Exception {
+        DummyJob job = new DummyJob(JobStatus.PAUSED);
+        job.updateJobStatus(JobStatus.PENDING);
+        Assertions.assertEquals(JobStatus.PENDING, job.getJobStatus());
+    }
+
+    @Test
+    void testPendingFromRunning() throws Exception {
+        DummyJob job = new DummyJob(JobStatus.RUNNING);
+        job.updateJobStatus(JobStatus.PENDING);
+        Assertions.assertEquals(JobStatus.PENDING, job.getJobStatus());
+    }
+
+    @Test
+    void testPendingFromStoppedIsInvalid() {
+        DummyJob job = new DummyJob(JobStatus.STOPPED);
+        Assertions.assertThrows(IllegalArgumentException.class, () -> 
job.updateJobStatus(JobStatus.PENDING));
+    }
+
+    /**
+     * Test running status
+     */
+    @Test
+    void testRunningFromPending() throws Exception {
+        DummyJob job = new DummyJob(JobStatus.PENDING);
+        job.updateJobStatus(JobStatus.RUNNING);
+        Assertions.assertEquals(JobStatus.RUNNING, job.getJobStatus());
+    }
+
+    @Test
+    void testRunningFromPaused() throws Exception {
+        DummyJob job = new DummyJob(JobStatus.PAUSED);
+        job.updateJobStatus(JobStatus.RUNNING);
+        Assertions.assertEquals(JobStatus.RUNNING, job.getJobStatus());
+    }
+
+    /**
+     * Test pause status
+     */
+    @Test
+    void testPauseFromRunning() throws Exception {
+        DummyJob job = new DummyJob(JobStatus.PENDING);
+        job.updateJobStatus(JobStatus.RUNNING);
+        job.updateJobStatus(JobStatus.PAUSED);
+        Assertions.assertEquals(JobStatus.PAUSED, job.getJobStatus());
+    }
+
+    @Test
+    void testPauseFromPending() throws Exception {
+        DummyJob job = new DummyJob(JobStatus.PENDING);
+        job.updateJobStatus(JobStatus.PAUSED);
+        Assertions.assertEquals(JobStatus.PAUSED, job.getJobStatus());
+    }
+
+    @Test
+    void testPauseFromStoppedIsInvalid() {
+        DummyJob job = new DummyJob(JobStatus.STOPPED);
+        Assertions.assertThrows(IllegalArgumentException.class, () -> 
job.updateJobStatus(JobStatus.PAUSED));
+    }
+
+    /**
+     * Test stop status
+     */
+    @Test
+    void testStopFromPending() throws Exception {
+        DummyJob job = new DummyJob(JobStatus.PENDING);
+        job.updateJobStatus(JobStatus.STOPPED);
+        Assertions.assertEquals(JobStatus.STOPPED, job.getJobStatus());
+    }
+
+    @Test
+    void testStopFromPaused() throws Exception {
+        DummyJob job = new DummyJob(JobStatus.PAUSED);
+        job.updateJobStatus(JobStatus.STOPPED);
+        Assertions.assertEquals(JobStatus.STOPPED, job.getJobStatus());
+    }
+
+    @Test
+    void testStopFromRunning() throws Exception {
+        DummyJob job = new DummyJob(JobStatus.RUNNING);
+        job.updateJobStatus(JobStatus.STOPPED);
+        Assertions.assertEquals(JobStatus.STOPPED, job.getJobStatus());
+    }
+
+    @Test
+    void testFinishSetsFinishTime() throws Exception {
+        DummyJob job = new DummyJob(JobStatus.RUNNING);
+        long before = System.currentTimeMillis();
+        job.updateJobStatus(JobStatus.FINISHED);
+        Assertions.assertEquals(JobStatus.FINISHED, job.getJobStatus());
+        Assertions.assertTrue(job.getFinishTimeMs() >= before);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to