This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new eaa7d327c9a branch-4.0: [improve](job) Adding FailureReason after 
manually changing the status of a streaming job #57551 (#57612)
eaa7d327c9a is described below

commit eaa7d327c9a4307fb74be165a2dda384746e80ea
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Nov 7 09:50:46 2025 +0800

    branch-4.0: [improve](job) Adding FailureReason after manually changing the 
status of a streaming job #57551 (#57612)
    
    Cherry-picked from #57551
    
    Co-authored-by: wudi <[email protected]>
---
 .../job/extensions/insert/streaming/StreamingInsertJob.java    | 10 ++++++++++
 .../extensions/insert/streaming/StreamingJobSchedulerTask.java |  1 +
 .../src/main/java/org/apache/doris/job/manager/JobManager.java |  7 ++++++-
 .../doris/nereids/trees/plans/commands/PauseJobCommand.java    |  5 ++++-
 .../doris/nereids/trees/plans/commands/ResumeJobCommand.java   |  4 ++--
 5 files changed, 23 insertions(+), 4 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index a21a3c854fa..57eab5cb816 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -26,6 +26,7 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.InternalErrorCode;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.io.Text;
@@ -262,6 +263,12 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         }
     }
 
+    public void resetFailureInfo(FailureReason reason) {
+        this.setFailureReason(reason);
+        // Currently, only delayMsg is present here, which needs to be cleared 
when the status changes.
+        this.setJobRuntimeMsg("");
+    }
+
     @Override
     public void cancelAllTasks(boolean needWaitCancelComplete) throws 
JobException {
         lock.writeLock().lock();
@@ -355,6 +362,8 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
             offsetProvider.fetchRemoteMeta(originTvfProps);
         } catch (Exception ex) {
             log.warn("fetch remote meta failed, job id: {}", getJobId(), ex);
+            failureReason = new 
FailureReason(InternalErrorCode.GET_REMOTE_DATA_ERROR,
+                    "Failed to fetch meta, " + ex.getMessage());
         }
     }
 
@@ -709,6 +718,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
             } catch (AnalysisException e) {
                 log.warn("failed to get db id for streaming insert job {}, db 
name: {}, msg: {}",
                         getJobId(), getCurrentDbName(), e.getMessage());
+                failureReason = new FailureReason(InternalErrorCode.DB_ERR, 
"Failed to get db id, " + e.getMessage());
             }
         }
         return dbId;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
index 7f483a8f587..c3655e6697e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
@@ -88,6 +88,7 @@ public class StreamingJobSchedulerTask extends AbstractTask {
                 if (autoResumeCount < Long.MAX_VALUE) {
                     streamingInsertJob.setAutoResumeCount(autoResumeCount + 1);
                 }
+                streamingInsertJob.resetFailureInfo(null);
                 streamingInsertJob.updateJobStatus(JobStatus.PENDING);
                 return;
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index 95c38e0fa06..c62e94f8d64 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -36,6 +36,7 @@ import org.apache.doris.common.util.LogKey;
 import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.job.base.AbstractJob;
 import org.apache.doris.job.base.JobExecuteType;
+import org.apache.doris.job.common.FailureReason;
 import org.apache.doris.job.common.JobStatus;
 import org.apache.doris.job.common.JobType;
 import org.apache.doris.job.common.TaskType;
@@ -290,12 +291,16 @@ public class JobManager<T extends AbstractJob<?, C>, C> 
implements Writable {
         log.info("update job success, jobId: {}", job.getJobId());
     }
 
-    public void alterJobStatus(String jobName, JobStatus jobStatus) throws 
JobException {
+
+    public void alterJobStatus(String jobName, JobStatus jobStatus, 
FailureReason reason) throws JobException {
         for (T a : jobMap.values()) {
             if (a.getJobName().equals(jobName)) {
                 try {
                     checkSameStatus(a, jobStatus);
                     alterJobStatus(a.getJobId(), jobStatus);
+                    if (a instanceof StreamingInsertJob) {
+                        ((StreamingInsertJob) a).resetFailureInfo(reason);
+                    }
                 } catch (JobException e) {
                     throw new JobException("Alter job status error, jobName is 
%s, errorMsg is %s",
                             jobName, e.getMessage());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseJobCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseJobCommand.java
index 2b22a527293..7d71302fcf8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseJobCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseJobCommand.java
@@ -18,6 +18,8 @@
 package org.apache.doris.nereids.trees.plans.commands;
 
 import org.apache.doris.analysis.StmtType;
+import org.apache.doris.common.InternalErrorCode;
+import org.apache.doris.job.common.FailureReason;
 import org.apache.doris.job.common.JobStatus;
 import org.apache.doris.nereids.trees.plans.PlanType;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
@@ -39,7 +41,8 @@ public class PauseJobCommand extends AlterJobStatusCommand 
implements ForwardWit
 
     @Override
     public void doRun(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
-        ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(), 
JobStatus.PAUSED);
+        ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(), 
JobStatus.PAUSED,
+                new FailureReason(InternalErrorCode.MANUAL_PAUSE_ERR, "Job 
paused by user " + ctx.getQualifiedUser()));
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java
index 906f1c9d159..40e25dbdc12 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java
@@ -43,9 +43,9 @@ public class ResumeJobCommand extends AlterJobStatusCommand 
implements ForwardWi
     public void doRun(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
         AbstractJob job = 
ctx.getEnv().getJobManager().getJobByName(super.getJobName());
         if (job instanceof StreamingInsertJob) {
-            ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(), 
JobStatus.PENDING);
+            ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(), 
JobStatus.PENDING, null);
         } else {
-            ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(), 
JobStatus.RUNNING);
+            ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(), 
JobStatus.RUNNING, null);
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to