This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 2a12251b833 [branch-2.1][Feat](Job)After a job is paused, it can be
manually triggered to execute. (#40180)
2a12251b833 is described below
commit 2a12251b8331a2ce6e189b5bbd6e385b87d71668
Author: Calvin Kirs <[email protected]>
AuthorDate: Sat Aug 31 19:24:53 2024 +0800
[branch-2.1][Feat](Job)After a job is paused, it can be manually triggered
to execute. (#40180)
…
pick (#39565)
---
.../org/apache/doris/job/base/AbstractJob.java | 26 +++++++++++++++++-----
.../doris/job/extensions/insert/InsertTask.java | 8 ++++++-
.../apache/doris/job/scheduler/JobScheduler.java | 5 ++---
3 files changed, 29 insertions(+), 10 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
index 3f595d6daf5..94a0b0146cd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
@@ -212,8 +212,9 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
}
public List<T> commonCreateTasks(TaskType taskType, C taskContext) {
- if (!getJobStatus().equals(JobStatus.RUNNING)) {
- log.warn("job is not running, job id is {}", jobId);
+ if (!canCreateTask(taskType)) {
+ log.info("job is not ready for scheduling, job id is {},job status
is {}, taskType is {}", jobId,
+ jobStatus, taskType);
return new ArrayList<>();
}
if (!isReadyForScheduling(taskContext)) {
@@ -235,6 +236,19 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
}
}
+ private boolean canCreateTask(TaskType taskType) {
+ JobStatus currentJobStatus = getJobStatus();
+
+ switch (taskType) {
+ case SCHEDULED:
+ return currentJobStatus.equals(JobStatus.RUNNING);
+ case MANUAL:
+ return currentJobStatus.equals(JobStatus.RUNNING) ||
currentJobStatus.equals(JobStatus.PAUSED);
+ default:
+ throw new IllegalArgumentException("Unsupported TaskType: " +
taskType);
+ }
+ }
+
public void initTasks(Collection<? extends T> tasks, TaskType taskType) {
tasks.forEach(task -> {
task.setTaskType(taskType);
@@ -307,7 +321,7 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
@Override
public void onTaskFail(T task) throws JobException {
failedTaskCount.incrementAndGet();
- updateJobStatusIfEnd(false);
+ updateJobStatusIfEnd(false, task.getTaskType());
runningTasks.remove(task);
logUpdateOperation();
}
@@ -315,16 +329,16 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
@Override
public void onTaskSuccess(T task) throws JobException {
succeedTaskCount.incrementAndGet();
- updateJobStatusIfEnd(true);
+ updateJobStatusIfEnd(true, task.getTaskType());
runningTasks.remove(task);
logUpdateOperation();
}
- private void updateJobStatusIfEnd(boolean taskSuccess) throws JobException
{
+ private void updateJobStatusIfEnd(boolean taskSuccess, TaskType taskType)
throws JobException {
JobExecuteType executeType = getJobConfig().getExecuteType();
- if (executeType.equals(JobExecuteType.MANUAL)) {
+ if (executeType.equals(JobExecuteType.MANUAL) ||
taskType.equals(TaskType.MANUAL)) {
return;
}
switch (executeType) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
index 0fe2a8364aa..ee5abed8392 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
@@ -58,6 +58,7 @@ public class InsertTask extends AbstractTask {
new Column("Status", ScalarType.createStringType()),
new Column("ErrorMsg", ScalarType.createStringType()),
new Column("CreateTime", ScalarType.createStringType()),
+ new Column("StartTime", ScalarType.createStringType()),
new Column("FinishTime", ScalarType.createStringType()),
new Column("TrackingUrl", ScalarType.createStringType()),
new Column("LoadStatistic", ScalarType.createStringType()),
@@ -247,6 +248,8 @@ public class InsertTask extends AbstractTask {
trow.addToColumnValue(new TCell().setStringVal(errorMsg));
// create time
trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
+ trow.addToColumnValue(new TCell().setStringVal(null ==
getStartTimeMs() ? ""
+ : TimeUtils.longToTimeString(getStartTimeMs())));
// load end time
trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(getFinishTimeMs())));
// tracking url
@@ -274,7 +277,10 @@ public class InsertTask extends AbstractTask {
trow.addToColumnValue(new TCell().setStringVal(getStatus().name()));
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
- trow.addToColumnValue(new TCell().setStringVal(""));
+ trow.addToColumnValue(new TCell().setStringVal(null ==
getStartTimeMs() ? ""
+ : TimeUtils.longToTimeString(getStartTimeMs())));
+ trow.addToColumnValue(new TCell().setStringVal(null ==
getFinishTimeMs() ? ""
+ : TimeUtils.longToTimeString(getFinishTimeMs())));
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new
TCell().setStringVal(userIdentity.getQualifiedUser()));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
index 2100511d22b..5ba88c6e3ce 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
@@ -188,10 +188,9 @@ public class JobScheduler<T extends AbstractJob<?, C>, C>
implements Closeable {
clearEndJob(job);
continue;
}
- if (!job.getJobStatus().equals(JobStatus.RUNNING) &&
!job.getJobConfig().checkIsTimerJob()) {
- continue;
+ if (job.getJobStatus().equals(JobStatus.RUNNING) &&
job.getJobConfig().checkIsTimerJob()) {
+ cycleTimerJobScheduler(job, lastTimeWindowMs);
}
- cycleTimerJobScheduler(job, lastTimeWindowMs);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]