This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new eaa7d327c9a branch-4.0: [improve](job) Adding FailureReason after
manually changing the status of a streaming job #57551 (#57612)
eaa7d327c9a is described below
commit eaa7d327c9a4307fb74be165a2dda384746e80ea
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Nov 7 09:50:46 2025 +0800
branch-4.0: [improve](job) Adding FailureReason after manually changing the
status of a streaming job #57551 (#57612)
Cherry-picked from #57551
Co-authored-by: wudi <[email protected]>
---
.../job/extensions/insert/streaming/StreamingInsertJob.java | 10 ++++++++++
.../extensions/insert/streaming/StreamingJobSchedulerTask.java | 1 +
.../src/main/java/org/apache/doris/job/manager/JobManager.java | 7 ++++++-
.../doris/nereids/trees/plans/commands/PauseJobCommand.java | 5 ++++-
.../doris/nereids/trees/plans/commands/ResumeJobCommand.java | 4 ++--
5 files changed, 23 insertions(+), 4 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index a21a3c854fa..57eab5cb816 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -26,6 +26,7 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
@@ -262,6 +263,12 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
}
}
+ public void resetFailureInfo(FailureReason reason) {
+ this.setFailureReason(reason);
+ // Currently, only delayMsg is present here, which needs to be cleared
when the status changes.
+ this.setJobRuntimeMsg("");
+ }
+
@Override
public void cancelAllTasks(boolean needWaitCancelComplete) throws
JobException {
lock.writeLock().lock();
@@ -355,6 +362,8 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
offsetProvider.fetchRemoteMeta(originTvfProps);
} catch (Exception ex) {
log.warn("fetch remote meta failed, job id: {}", getJobId(), ex);
+ failureReason = new
FailureReason(InternalErrorCode.GET_REMOTE_DATA_ERROR,
+ "Failed to fetch meta, " + ex.getMessage());
}
}
@@ -709,6 +718,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
} catch (AnalysisException e) {
log.warn("failed to get db id for streaming insert job {}, db
name: {}, msg: {}",
getJobId(), getCurrentDbName(), e.getMessage());
+ failureReason = new FailureReason(InternalErrorCode.DB_ERR,
"Failed to get db id, " + e.getMessage());
}
}
return dbId;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
index 7f483a8f587..c3655e6697e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
@@ -88,6 +88,7 @@ public class StreamingJobSchedulerTask extends AbstractTask {
if (autoResumeCount < Long.MAX_VALUE) {
streamingInsertJob.setAutoResumeCount(autoResumeCount + 1);
}
+ streamingInsertJob.resetFailureInfo(null);
streamingInsertJob.updateJobStatus(JobStatus.PENDING);
return;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index 95c38e0fa06..c62e94f8d64 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -36,6 +36,7 @@ import org.apache.doris.common.util.LogKey;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.base.JobExecuteType;
+import org.apache.doris.job.common.FailureReason;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.TaskType;
@@ -290,12 +291,16 @@ public class JobManager<T extends AbstractJob<?, C>, C>
implements Writable {
log.info("update job success, jobId: {}", job.getJobId());
}
- public void alterJobStatus(String jobName, JobStatus jobStatus) throws
JobException {
+
+ public void alterJobStatus(String jobName, JobStatus jobStatus,
FailureReason reason) throws JobException {
for (T a : jobMap.values()) {
if (a.getJobName().equals(jobName)) {
try {
checkSameStatus(a, jobStatus);
alterJobStatus(a.getJobId(), jobStatus);
+ if (a instanceof StreamingInsertJob) {
+ ((StreamingInsertJob) a).resetFailureInfo(reason);
+ }
} catch (JobException e) {
throw new JobException("Alter job status error, jobName is
%s, errorMsg is %s",
jobName, e.getMessage());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseJobCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseJobCommand.java
index 2b22a527293..7d71302fcf8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseJobCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseJobCommand.java
@@ -18,6 +18,8 @@
package org.apache.doris.nereids.trees.plans.commands;
import org.apache.doris.analysis.StmtType;
+import org.apache.doris.common.InternalErrorCode;
+import org.apache.doris.job.common.FailureReason;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
@@ -39,7 +41,8 @@ public class PauseJobCommand extends AlterJobStatusCommand
implements ForwardWit
@Override
public void doRun(ConnectContext ctx, StmtExecutor executor) throws
Exception {
- ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(),
JobStatus.PAUSED);
+ ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(),
JobStatus.PAUSED,
+ new FailureReason(InternalErrorCode.MANUAL_PAUSE_ERR, "Job
paused by user " + ctx.getQualifiedUser()));
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java
index 906f1c9d159..40e25dbdc12 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java
@@ -43,9 +43,9 @@ public class ResumeJobCommand extends AlterJobStatusCommand
implements ForwardWi
public void doRun(ConnectContext ctx, StmtExecutor executor) throws
Exception {
AbstractJob job =
ctx.getEnv().getJobManager().getJobByName(super.getJobName());
if (job instanceof StreamingInsertJob) {
- ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(),
JobStatus.PENDING);
+ ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(),
JobStatus.PENDING, null);
} else {
- ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(),
JobStatus.RUNNING);
+ ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(),
JobStatus.RUNNING, null);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]