This is an automated email from the ASF dual-hosted git repository.
zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 08c78a11359 [Feature](Job)Support manual and refactor some execution
logic (#26082)
08c78a11359 is described below
commit 08c78a11359e3d0c9e6bc89cf32d4d9456c6c4d5
Author: Calvin Kirs <[email protected]>
AuthorDate: Tue Oct 31 20:35:55 2023 +0800
[Feature](Job)Support manual and refactor some execution logic (#26082)
Supports manually triggered JOBs and Tasks
Optimize JOB&TASK display logic
Refactor the executor to support context passing
---
fe/fe-core/src/main/cup/sql_parser.cup | 6 +-
.../org/apache/doris/analysis/CreateJobStmt.java | 36 +++++-----
.../org/apache/doris/analysis/ShowJobStmt.java | 11 +--
.../org/apache/doris/analysis/ShowJobTaskStmt.java | 4 +-
.../main/java/org/apache/doris/catalog/Env.java | 5 +-
.../java/org/apache/doris/qe/ShowExecutor.java | 7 +-
.../apache/doris/scheduler/constants/JobType.java | 7 +-
.../apache/doris/scheduler/constants/TaskType.java | 16 ++++-
.../doris/scheduler/disruptor/TaskDisruptor.java | 34 +++++++--
.../doris/scheduler/disruptor/TaskHandler.java | 26 ++++---
.../scheduler/executor/AbstractJobExecutor.java | 54 ++++++++++++++
.../doris/scheduler/executor/JobExecutor.java | 9 +--
.../doris/scheduler/executor/SqlJobExecutor.java | 28 +++-----
.../java/org/apache/doris/scheduler/job/Job.java | 82 +++++++++++++---------
.../org/apache/doris/scheduler/job/JobTask.java | 37 +++++++++-
.../doris/scheduler/manager/JobTaskManager.java | 17 +++--
.../doris/scheduler/manager/TimerJobManager.java | 75 +++++++++++++++-----
.../scheduler/registry/PersistentJobRegister.java | 13 ++++
.../doris/scheduler/registry/TimerJobRegister.java | 5 ++
.../apache/doris/scheduler/disruptor/JobTest.java | 3 +-
.../scheduler/disruptor/TaskDisruptorTest.java | 13 +++-
.../scheduler/disruptor/TimerJobManagerTest.java | 14 ++--
22 files changed, 352 insertions(+), 150 deletions(-)
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup
b/fe/fe-core/src/main/cup/sql_parser.cup
index 80a71b7490d..5933fcffbcb 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -2551,17 +2551,17 @@ resource_desc ::=
create_job_stmt ::=
KW_CREATE KW_JOB job_label:jobLabel KW_ON KW_SCHEDULER KW_EVERY
INTEGER_LITERAL:time_interval ident:time_unit opt_job_starts:startsTime
opt_job_ends:endsTime opt_comment:comment KW_DO stmt:executeSql
{:
- CreateJobStmt stmt = new
CreateJobStmt(jobLabel,null,false,time_interval,time_unit, startsTime,
endsTime,comment,executeSql);
+ CreateJobStmt stmt = new
CreateJobStmt(jobLabel,"RECURRING",null,time_interval,time_unit, startsTime,
endsTime,comment,executeSql);
RESULT = stmt;
:}
| KW_CREATE KW_JOB job_label:jobLabel KW_ON KW_SCHEDULER KW_STREAMING
KW_AT STRING_LITERAL:atTime opt_comment:comment KW_DO stmt:executeSql
{:
- CreateJobStmt stmt = new
CreateJobStmt(jobLabel,atTime,true,null,null,null,null,comment,executeSql);
+ CreateJobStmt stmt = new
CreateJobStmt(jobLabel,"STREAMING",atTime,null,null,null,null,comment,executeSql);
RESULT = stmt;
:}
| KW_CREATE KW_JOB job_label:jobLabel KW_ON KW_SCHEDULER KW_AT
STRING_LITERAL:atTime opt_comment:comment KW_DO stmt:executeSql
{:
- CreateJobStmt stmt = new
CreateJobStmt(jobLabel,atTime,false,null,null,null,null,comment,executeSql);
+ CreateJobStmt stmt = new
CreateJobStmt(jobLabel,"ONE_TIME",atTime,null,null,null,null,comment,executeSql);
RESULT = stmt;
:}
;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
index 999d630153d..57f976712c9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
@@ -28,6 +28,7 @@ import org.apache.doris.qe.ConnectContext;
import org.apache.doris.scheduler.common.IntervalUnit;
import org.apache.doris.scheduler.constants.JobCategory;
import org.apache.doris.scheduler.constants.JobStatus;
+import org.apache.doris.scheduler.constants.JobType;
import org.apache.doris.scheduler.executor.SqlJobExecutor;
import org.apache.doris.scheduler.job.Job;
@@ -41,22 +42,21 @@ import java.util.HashSet;
/**
* syntax:
* CREATE
- * [DEFINER = user]
- * JOB
- * event_name
- * ON SCHEDULE schedule
- * [COMMENT 'string']
- * DO event_body;
+ * [DEFINER = user]
+ * JOB
+ * event_name
+ * ON SCHEDULE schedule
+ * [COMMENT 'string']
+ * DO event_body;
* schedule: {
- * [STREAMING] AT timestamp
- * | EVERY interval
- * [STARTS timestamp ]
- * [ENDS timestamp ]
+ * [STREAMING] AT timestamp
+ * | EVERY interval
+ * [STARTS timestamp ]
+ * [ENDS timestamp ]
* }
* interval:
- * quantity { DAY | HOUR | MINUTE |
- * WEEK | SECOND }
- *
+ * quantity { DAY | HOUR | MINUTE |
+ * WEEK | SECOND }
*/
@Slf4j
public class CreateJobStmt extends DdlStmt {
@@ -90,7 +90,7 @@ public class CreateJobStmt extends DdlStmt {
private static HashSet<String> supportStmtClassNamesCache = new
HashSet<>(16);
- public CreateJobStmt(LabelName labelName, String onceJobStartTimestamp,
Boolean isStreamingJob,
+ public CreateJobStmt(LabelName labelName, String jobTypeName, String
onceJobStartTimestamp,
Long interval, String intervalTimeUnit,
String startsTimeStamp, String endsTimeStamp, String
comment, StatementBase doStmt) {
this.labelName = labelName;
@@ -102,7 +102,8 @@ public class CreateJobStmt extends DdlStmt {
this.comment = comment;
this.stmt = doStmt;
this.job = new Job();
- job.setStreamingJob(isStreamingJob);
+ JobType jobType = JobType.valueOf(jobTypeName.toUpperCase());
+ job.setJobType(jobType);
}
private String parseExecuteSql(String sql) throws AnalysisException {
@@ -136,7 +137,7 @@ public class CreateJobStmt extends DdlStmt {
job.setTimezone(timezone);
job.setComment(comment);
//todo support user define
- job.setUser("root");
+ job.setUser(ConnectContext.get().getQualifiedUser());
job.setJobStatus(JobStatus.RUNNING);
job.setJobCategory(JobCategory.SQL);
analyzerSqlStmt();
@@ -172,7 +173,6 @@ public class CreateJobStmt extends DdlStmt {
private void analyzerCycleJob() throws UserException {
- job.setCycleJob(true);
if (null == interval) {
throw new AnalysisException("interval is null");
}
@@ -214,8 +214,6 @@ public class CreateJobStmt extends DdlStmt {
private void analyzerOnceTimeJob() throws UserException {
- job.setCycleJob(false);
-
job.setIntervalMs(0L);
long executeAtTimeMillis =
TimeUtils.timeStringToLong(onceJobStartTimestamp);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobStmt.java
index 7ad3ce343c5..42fb1c508fc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobStmt.java
@@ -47,19 +47,12 @@ public class ShowJobStmt extends ShowStmt {
private static final ImmutableList<String> TITLE_NAMES =
new ImmutableList.Builder<String>()
.add("Id")
- .add("Db")
.add("Name")
.add("Definer")
- .add("TimeZone")
.add("ExecuteType")
- .add("ExecuteAt")
- .add("ExecuteInterval")
- .add("ExecuteIntervalUnit")
- .add("Starts")
- .add("Ends")
+ .add("RecurringStrategy")
.add("Status")
- .add("LastExecuteFinishTime")
- .add("ErrorMsg")
+ .add("lastExecuteTaskStatus")
.add("CreateTime")
.add("Comment")
.build();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java
index 118530341ff..db3fb2ef3cd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java
@@ -41,8 +41,9 @@ public class ShowJobTaskStmt extends ShowStmt {
private static final ImmutableList<String> TITLE_NAMES =
new ImmutableList.Builder<String>()
- .add("JobId")
.add("TaskId")
+ .add("JobId")
+ .add("JobName")
.add("CreateTime")
.add("StartTime")
.add("EndTime")
@@ -50,6 +51,7 @@ public class ShowJobTaskStmt extends ShowStmt {
.add("ExecuteSql")
.add("Result")
.add("ErrorMsg")
+ .add("TaskType")
.build();
@Getter
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 9383c8b9266..f1cdf47b33f 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -355,6 +355,8 @@ public class Env {
private TimerJobManager timerJobManager;
private TransientTaskManager transientTaskManager;
private JobTaskManager jobTaskManager;
+
+ private TaskDisruptor taskDisruptor;
private MasterDaemon labelCleaner; // To clean old LabelInfo,
ExportJobInfos
private MasterDaemon txnCleaner; // To clean aborted or timeout txns
private Daemon feDiskUpdater; // Update fe disk info
@@ -629,7 +631,7 @@ public class Env {
this.jobTaskManager = new JobTaskManager();
this.timerJobManager = new TimerJobManager();
this.transientTaskManager = new TransientTaskManager();
- TaskDisruptor taskDisruptor = new TaskDisruptor(this.timerJobManager,
this.transientTaskManager);
+ this.taskDisruptor = new TaskDisruptor(this.timerJobManager,
this.transientTaskManager);
this.timerJobManager.setDisruptor(taskDisruptor);
this.transientTaskManager.setDisruptor(taskDisruptor);
this.persistentJobRegister = new TimerJobRegister(timerJobManager);
@@ -1532,6 +1534,7 @@ public class Env {
publishVersionDaemon.start();
// Start txn cleaner
txnCleaner.start();
+ taskDisruptor.start();
timerJobManager.start();
// Alter
getAlterInstance().start();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index 8c72d260876..2aadab78852 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -1428,14 +1428,15 @@ public class ShowExecutor {
resultSet = new ShowResultSet(showJobTaskStmt.getMetaData(), rows);
return;
}
- long jobId = jobs.get(0).getJobId();
+ Job job = jobs.get(0);
+ long jobId = job.getJobId();
List<JobTask> jobTasks =
Env.getCurrentEnv().getJobTaskManager().getJobTasks(jobId);
if (CollectionUtils.isEmpty(jobTasks)) {
resultSet = new ShowResultSet(showJobTaskStmt.getMetaData(), rows);
return;
}
- for (JobTask job : jobTasks) {
- rows.add(job.getShowInfo());
+ for (JobTask jobTask : jobTasks) {
+ rows.add(jobTask.getShowInfo(job.getJobName()));
}
resultSet = new ShowResultSet(showJobTaskStmt.getMetaData(), rows);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobType.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobType.java
index 4f4467c989a..58f681c4061 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobType.java
@@ -29,5 +29,10 @@ public enum JobType {
/**
* JOB_TYPE_STREAMING is used to identify the streaming job.
*/
- STREAMING
+ STREAMING,
+
+ /**
+ * The job will be executed manually and need to be triggered by the user.
+ */
+ MANUAL
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/TaskType.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/TaskType.java
index 525dbe8adaa..996d72584ff 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/TaskType.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/TaskType.java
@@ -18,7 +18,19 @@
package org.apache.doris.scheduler.constants;
public enum TaskType {
- TimerJobTask,
+ /**
+ * Usually don't require persistence and are used in various asynchronous
tasks, such as export tasks.
+ * the life cycle of this kind of task is not managed by JOB-scheduler.
+ */
+ TRANSIENT_TASK,
- TransientTask
+ /**
+ * Tasks generated by scheduled jobs.
+ */
+ SCHEDULER_JOB_TASK,
+
+ /**
+ * Tasks generated by manual jobs.
+ */
+ MANUAL_JOB_TASK
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
index a8d98831f21..0e3f8e618d7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
@@ -18,7 +18,6 @@
package org.apache.doris.scheduler.disruptor;
import org.apache.doris.common.Config;
-import org.apache.doris.common.CustomThreadFactory;
import org.apache.doris.scheduler.constants.TaskType;
import org.apache.doris.scheduler.manager.TimerJobManager;
import org.apache.doris.scheduler.manager.TransientTaskManager;
@@ -29,6 +28,7 @@ import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
+import com.lmax.disruptor.util.DaemonThreadFactory;
import lombok.extern.slf4j.Slf4j;
import java.io.Closeable;
@@ -47,10 +47,13 @@ import java.util.concurrent.TimeUnit;
@Slf4j
public class TaskDisruptor implements Closeable {
- private final Disruptor<TaskEvent> disruptor;
+ private Disruptor<TaskEvent> disruptor;
+
+ private TimerJobManager timerJobManager;
+ private TransientTaskManager transientTaskManager;
private static final int DEFAULT_RING_BUFFER_SIZE =
Config.async_task_queen_size;
- private static int consumerThreadCount =
Config.async_task_consumer_thread_num;
+ private static final int consumerThreadCount =
Config.async_task_consumer_thread_num;
/**
* The default timeout for {@link #close()} in seconds.
@@ -75,7 +78,12 @@ public class TaskDisruptor implements Closeable {
};
public TaskDisruptor(TimerJobManager timerJobManager, TransientTaskManager
transientTaskManager) {
- ThreadFactory producerThreadFactory = new
CustomThreadFactory("task-disruptor-producer");
+ this.timerJobManager = timerJobManager;
+ this.transientTaskManager = transientTaskManager;
+ }
+
+ public void start() {
+ ThreadFactory producerThreadFactory = DaemonThreadFactory.INSTANCE;
disruptor = new Disruptor<>(TaskEvent.FACTORY,
DEFAULT_RING_BUFFER_SIZE, producerThreadFactory,
ProducerType.SINGLE, new BlockingWaitStrategy());
WorkHandler<TaskEvent>[] workers = new
TaskHandler[consumerThreadCount];
@@ -88,16 +96,29 @@ public class TaskDisruptor implements Closeable {
/**
* Publishes a job to the disruptor.
+ * Default task type is {@link TaskType#SCHEDULER_JOB_TASK}
*
* @param jobId job id
*/
public void tryPublish(Long jobId, Long taskId) {
+ this.tryPublish(jobId, taskId, TaskType.SCHEDULER_JOB_TASK);
+ }
+
+
+ /**
+ * Publishes a job task to the disruptor.
+ *
+ * @param jobId job id, describe which job this task belongs to
+ * @param taskId task id, it's linked to job id, we can get job detail
by task id
+ * @param taskType {@link TaskType}
+ */
+ public void tryPublish(Long jobId, Long taskId, TaskType taskType) {
if (isClosed) {
log.info("tryPublish failed, disruptor is closed, jobId: {}",
jobId);
return;
}
try {
- disruptor.publishEvent(TRANSLATOR, jobId, taskId,
TaskType.TimerJobTask);
+ disruptor.publishEvent(TRANSLATOR, jobId, taskId, taskType);
} catch (Exception e) {
log.error("tryPublish failed, jobId: {}", jobId, e);
}
@@ -105,6 +126,7 @@ public class TaskDisruptor implements Closeable {
/**
* Publishes a task to the disruptor.
+ * Default task type is {@link TaskType#TRANSIENT_TASK}
*
* @param taskId task id
*/
@@ -114,7 +136,7 @@ public class TaskDisruptor implements Closeable {
return;
}
try {
- disruptor.publishEvent(TRANSLATOR, taskId, 0L,
TaskType.TransientTask);
+ disruptor.publishEvent(TRANSLATOR, taskId, 0L,
TaskType.TRANSIENT_TASK);
} catch (Exception e) {
log.error("tryPublish failed, taskId: {}", taskId, e);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
index 9daf3b7ee45..6005fa1bd40 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
@@ -19,6 +19,7 @@ package org.apache.doris.scheduler.disruptor;
import org.apache.doris.catalog.Env;
import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.scheduler.constants.JobType;
import org.apache.doris.scheduler.exception.JobException;
import org.apache.doris.scheduler.executor.TransientTaskExecutor;
import org.apache.doris.scheduler.job.ExecutorResult;
@@ -70,13 +71,15 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
@Override
public void onEvent(TaskEvent event) {
switch (event.getTaskType()) {
- case TimerJobTask:
+ case SCHEDULER_JOB_TASK:
+ case MANUAL_JOB_TASK:
onTimerJobTaskHandle(event);
break;
- case TransientTask:
+ case TRANSIENT_TASK:
onTransientTaskHandle(event);
break;
default:
+ log.warn("unknown task type: {}", event.getTaskType());
break;
}
}
@@ -90,7 +93,11 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
public void onTimerJobTaskHandle(TaskEvent taskEvent) {
long jobId = taskEvent.getId();
long taskId = taskEvent.getTaskId();
- long createTimeMs = jobTaskManager.pollPrepareTaskByTaskId(jobId,
taskId);
+ JobTask jobTask = jobTaskManager.pollPrepareTaskByTaskId(jobId,
taskId);
+ if (jobTask == null) {
+ log.warn("jobTask is null, maybe it's cancel, jobId: {}, taskId:
{}", jobId, taskId);
+ return;
+ }
Job job = timerJobManager.getJob(jobId);
if (job == null) {
log.info("job is null, jobId: {}", jobId);
@@ -102,12 +109,12 @@ public class TaskHandler implements
WorkHandler<TaskEvent> {
}
log.debug("job is running, eventJobId: {}", jobId);
- JobTask jobTask = new JobTask(jobId, taskId, createTimeMs);
+
try {
jobTask.setStartTimeMs(System.currentTimeMillis());
- ExecutorResult result = job.getExecutor().execute(job);
+ ExecutorResult result = job.getExecutor().execute(job,
jobTask.getContextData());
job.setLatestCompleteExecuteTimeMs(System.currentTimeMillis());
- if (job.isCycleJob()) {
+ if (job.getJobType().equals(JobType.RECURRING)) {
updateJobStatusIfPastEndTime(job);
} else {
// one time job should be finished after execute
@@ -117,7 +124,7 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
log.warn("Job execute failed, jobId: {}, result is null",
jobId);
jobTask.setErrorMsg("Job execute failed, result is null");
jobTask.setIsSuccessful(false);
- timerJobManager.pauseJob(jobId);
+ timerJobManager.setJobLatestStatus(jobId, false);
return;
}
String resultStr = GsonUtils.GSON.toJson(result.getResult());
@@ -126,14 +133,12 @@ public class TaskHandler implements
WorkHandler<TaskEvent> {
if (!result.isSuccess()) {
log.warn("Job execute failed, jobId: {}, msg : {}", jobId,
result.getExecutorSql());
jobTask.setErrorMsg(result.getErrorMsg());
- timerJobManager.pauseJob(jobId);
}
jobTask.setExecuteSql(result.getExecutorSql());
} catch (Exception e) {
log.warn("Job execute failed, jobId: {}, msg : {}", jobId,
e.getMessage());
jobTask.setErrorMsg(e.getMessage());
jobTask.setIsSuccessful(false);
- timerJobManager.pauseJob(jobId);
}
jobTask.setEndTimeMs(System.currentTimeMillis());
if (null == jobTaskManager) {
@@ -141,6 +146,7 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
}
boolean isPersistent = job.getJobCategory().isPersistent();
jobTaskManager.addJobTask(jobTask, isPersistent);
+ timerJobManager.setJobLatestStatus(jobId, jobTask.getIsSuccessful());
}
public void onTransientTaskHandle(TaskEvent taskEvent) {
@@ -165,7 +171,7 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
}
private void updateOnceTimeJobStatus(Job job) {
- if (job.isStreamingJob()) {
+ if (job.getJobType().equals(JobType.STREAMING)) {
timerJobManager.putOneJobToQueen(job.getJobId());
return;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/AbstractJobExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/AbstractJobExecutor.java
new file mode 100644
index 00000000000..7a70c963bd1
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/AbstractJobExecutor.java
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.scheduler.executor;
+
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.scheduler.job.Job;
+import org.apache.doris.thrift.TUniqueId;
+
+import lombok.Getter;
+
+import java.util.UUID;
+
+@Getter
+public abstract class AbstractJobExecutor<T, C> implements JobExecutor<T, C> {
+
+ protected ConnectContext createContext(Job job) {
+ ConnectContext ctx = new ConnectContext();
+ ctx.setEnv(Env.getCurrentEnv());
+
ctx.setCluster(ClusterNamespace.getClusterNameFromFullName(job.getDbName()));
+ ctx.setDatabase(job.getDbName());
+ ctx.setQualifiedUser(job.getUser());
+
ctx.setCurrentUserIdentity(UserIdentity.createAnalyzedUserIdentWithIp(job.getUser(),
"%"));
+ ctx.getState().reset();
+ ctx.setThreadLocalInfo();
+ return ctx;
+ }
+
+ protected String generateTaskId() {
+ return UUID.randomUUID().toString();
+ }
+
+ protected TUniqueId generateQueryId(String taskIdString) {
+ UUID taskId = UUID.fromString(taskIdString);
+ return new TUniqueId(taskId.getMostSignificantBits(),
taskId.getLeastSignificantBits());
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/JobExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/JobExecutor.java
index a6f2e10306f..40aebc8f6ad 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/JobExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/JobExecutor.java
@@ -28,18 +28,19 @@ import org.apache.doris.scheduler.job.Job;
* We use Gson to serialize and deserialize JobExecutor. so the implementation
of JobExecutor needs to be serializable.
* You can see @org.apache.doris.persist.gson.GsonUtils.java for details.When
you implement JobExecutor,pls make sure
* you can serialize and deserialize it.
- *
- * @param <T> The result type of the event job execution.
*/
@FunctionalInterface
-public interface JobExecutor<T> {
+public interface JobExecutor<T, C> {
/**
* Executes the event job and returns the result.
* Exceptions will be caught internally, so there is no need to define or
throw them separately.
*
+ * @param job The event job to execute.
+ * @param dataContext The data context of the event job. if you need to
pass parameters to the event job,
+ * you can use it.
* @return The result of the event job execution.
*/
- ExecutorResult execute(Job job) throws JobException;
+ ExecutorResult<T> execute(Job job, C dataContext) throws JobException;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java
index 3df2a6fd9a2..546eac9a768 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java
@@ -17,9 +17,6 @@
package org.apache.doris.scheduler.executor;
-import org.apache.doris.analysis.UserIdentity;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.scheduler.exception.JobException;
@@ -32,15 +29,15 @@ import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
-import java.util.UUID;
+import java.util.Map;
/**
* we use this executor to execute sql job
*/
+@Getter
@Slf4j
-public class SqlJobExecutor implements JobExecutor {
+public class SqlJobExecutor extends AbstractJobExecutor<String, Map<String,
Object>> {
- @Getter
@Setter
@SerializedName(value = "sql")
private String sql;
@@ -50,24 +47,14 @@ public class SqlJobExecutor implements JobExecutor {
}
@Override
- public ExecutorResult<String> execute(Job job) throws JobException {
- ConnectContext ctx = new ConnectContext();
- ctx.setEnv(Env.getCurrentEnv());
-
ctx.setCluster(ClusterNamespace.getClusterNameFromFullName(job.getDbName()));
- ctx.setDatabase(job.getDbName());
- ctx.setQualifiedUser(job.getUser());
-
ctx.setCurrentUserIdentity(UserIdentity.createAnalyzedUserIdentWithIp(job.getUser(),
"%"));
- ctx.getState().reset();
- ctx.setThreadLocalInfo();
- String taskIdString = UUID.randomUUID().toString();
- UUID taskId = UUID.fromString(taskIdString);
- TUniqueId queryId = new TUniqueId(taskId.getMostSignificantBits(),
taskId.getLeastSignificantBits());
- ctx.setQueryId(queryId);
+ public ExecutorResult<String> execute(Job job, Map<String, Object>
dataContext) throws JobException {
+ ConnectContext ctx = createContext(job);
+ String taskIdString = generateTaskId();
+ TUniqueId queryId = generateQueryId(taskIdString);
try {
StmtExecutor executor = new StmtExecutor(ctx, sql);
executor.execute(queryId);
String result = convertExecuteResult(ctx, taskIdString);
-
return new ExecutorResult<>(result, true, null, sql);
} catch (Exception e) {
log.warn("execute sql job failed, job id :{}, sql: {}, error: {}",
job.getJobId(), sql, e);
@@ -88,4 +75,5 @@ public class SqlJobExecutor implements JobExecutor {
return "queryId:" + queryId + ",affectedRows : " +
ctx.getState().getAffectedRows() + ", warningRows: "
+ ctx.getState().getWarningRows() + ",infoMsg" +
ctx.getState().getInfoMessage();
}
+
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java
index 6d39f1cd8d9..7f3fd0884b7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java
@@ -80,6 +80,9 @@ public class Job implements Writable {
@SerializedName("jobStatus")
private JobStatus jobStatus;
+ @SerializedName("jobType")
+ private JobType jobType = JobType.RECURRING;
+
/**
* The executor of the job.
*
@@ -93,12 +96,6 @@ public class Job implements Writable {
@SerializedName("user")
private String user;
- @SerializedName("isCycleJob")
- private boolean isCycleJob = false;
-
- @SerializedName("isStreamingJob")
- private boolean isStreamingJob = false;
-
@SerializedName("intervalMs")
private Long intervalMs = 0L;
@SerializedName("startTimeMs")
@@ -129,6 +126,8 @@ public class Job implements Writable {
@SerializedName("createTimeMs")
private Long createTimeMs = System.currentTimeMillis();
+ private Boolean lastExecuteTaskStatus;
+
@SerializedName("comment")
private String comment;
@@ -206,6 +205,18 @@ public class Job implements Writable {
}
public void checkJobParam() throws DdlException {
+ if (null == jobCategory) {
+ throw new DdlException("jobCategory must be set");
+ }
+ if (null == executor) {
+ throw new DdlException("Job executor must be set");
+ }
+ if (null == jobType) {
+ throw new DdlException("Job type must be set");
+ }
+ if (jobType.equals(JobType.MANUAL)) {
+ return;
+ }
if (startTimeMs != 0L && startTimeMs < System.currentTimeMillis()) {
throw new DdlException("startTimeMs must be greater than current
time");
}
@@ -221,15 +232,10 @@ public class Job implements Writable {
if (null != intervalUnit && null != originInterval) {
this.intervalMs = intervalUnit.getParameterValue(originInterval);
}
- if (isCycleJob && (intervalMs == null || intervalMs <= 0L)) {
+ if (jobType.equals(JobType.RECURRING) && (intervalMs == null ||
intervalMs <= 0L)) {
throw new DdlException("cycle job must set intervalMs");
}
- if (null == jobCategory) {
- throw new DdlException("jobCategory must be set");
- }
- if (null == executor) {
- throw new DdlException("Job executor must be set");
- }
+
}
@@ -246,33 +252,41 @@ public class Job implements Writable {
public List<String> getShowInfo() {
List<String> row = Lists.newArrayList();
row.add(String.valueOf(jobId));
- row.add(dbName);
- if (jobCategory.equals(JobCategory.MTMV)) {
- row.add(baseName);
- }
row.add(jobName);
row.add(user);
- row.add(timezone);
- if (isCycleJob) {
- row.add(JobType.RECURRING.name());
- } else {
- if (isStreamingJob) {
- row.add(JobType.STREAMING.name());
- } else {
- row.add(JobType.ONE_TIME.name());
- }
- }
- row.add(isCycleJob ? "null" : TimeUtils.longToTimeString(startTimeMs));
- row.add(isCycleJob ? originInterval.toString() : "null");
- row.add(isCycleJob ? intervalUnit.name() : "null");
- row.add(isCycleJob && startTimeMs > 0 ?
TimeUtils.longToTimeString(startTimeMs) : "null");
- row.add(isCycleJob && endTimeMs > 0 ?
TimeUtils.longToTimeString(endTimeMs) : "null");
+ row.add(jobType.name());
+
+ row.add(convertRecurringStrategyToString());
row.add(jobStatus.name());
- row.add(latestCompleteExecuteTimeMs <= 0L ? "null" :
TimeUtils.longToTimeString(latestCompleteExecuteTimeMs));
- row.add(errMsg == null ? "null" : errMsg);
+ row.add(null == lastExecuteTaskStatus ? "null" :
lastExecuteTaskStatus.toString());
row.add(createTimeMs <= 0L ? "null" :
TimeUtils.longToTimeString(createTimeMs));
row.add(comment == null ? "null" : comment);
return row;
}
+ private String convertRecurringStrategyToString() {
+ if (jobType.equals(JobType.MANUAL)) {
+ return "MANUAL TRIGGER";
+ }
+ switch (jobType) {
+ case ONE_TIME:
+ return "AT " + TimeUtils.longToTimeString(startTimeMs);
+ case RECURRING:
+ String result = "EVERY " + originInterval + " " +
intervalUnit.name();
+ if (startTimeMs > 0) {
+ result += " STARTS " +
TimeUtils.longToTimeString(startTimeMs);
+ }
+ if (endTimeMs > 0) {
+ result += " ENDS " + TimeUtils.longToTimeString(endTimeMs);
+ }
+ return result;
+ case STREAMING:
+ return "STREAMING" + (startTimeMs > 0 ? " AT " +
TimeUtils.longToTimeString(startTimeMs) : "");
+ case MANUAL:
+ return "MANUAL TRIGGER";
+ default:
+ return "UNKNOWN";
+ }
+ }
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java
index 04fb552e74b..1f8aac58285 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java
@@ -21,10 +21,12 @@ import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.scheduler.constants.TaskType;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
import java.io.DataInput;
import java.io.DataOutput;
@@ -32,7 +34,8 @@ import java.io.IOException;
import java.util.List;
@Data
-public class JobTask implements Writable {
+@Slf4j
+public class JobTask<T> implements Writable {
@SerializedName("jobId")
private Long jobId;
@@ -54,6 +57,21 @@ public class JobTask implements Writable {
@SerializedName("errorMsg")
private String errorMsg;
+ @SerializedName("contextDataStr")
+ private String contextDataStr;
+
+ @SerializedName("taskType")
+ private TaskType taskType = TaskType.SCHEDULER_JOB_TASK;
+
+ /**
+ * Some parameters specific to the current task that need to be used to
execute the task
+ * eg: sql task, sql it's: select * from table where id = 1 order by id
desc limit ${limit} offset ${offset}
+ * contextData is a map, key1 is limit, value is 10,key2 is offset, value
is 1
+ * when execute the task, we will replace the ${limit} to 10, ${offset} to
1
+ * so to execute sql is: select * from table where id = 1 order by id desc
limit 10 offset 1.
+ */
+ private T contextData;
+
public JobTask(Long jobId, Long taskId, Long createTimeMs) {
//it's enough to use nanoTime to identify a task
this.taskId = taskId;
@@ -61,10 +79,22 @@ public class JobTask implements Writable {
this.createTimeMs = createTimeMs;
}
- public List<String> getShowInfo() {
+ public JobTask(Long jobId, Long taskId, Long createTimeMs, T contextData) {
+ this(jobId, taskId, createTimeMs);
+ this.contextData = contextData;
+ try {
+ this.contextDataStr = GsonUtils.GSON.toJson(contextData);
+ } catch (Exception e) {
+ this.contextDataStr = null;
+ log.error("contextData serialize failed, jobId: {}, taskId: {}",
jobId, taskId, e);
+ }
+ }
+
+ public List<String> getShowInfo(String jobName) {
List<String> row = Lists.newArrayList();
- row.add(String.valueOf(jobId));
row.add(String.valueOf(taskId));
+ row.add(String.valueOf(jobId));
+ row.add(jobName);
if (null != createTimeMs) {
row.add(TimeUtils.longToTimeString(createTimeMs));
}
@@ -90,6 +120,7 @@ public class JobTask implements Writable {
} else {
row.add(errorMsg);
}
+ row.add(taskType.name());
return row;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java
index 7c739ba4603..6f7cd839074 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java
@@ -51,17 +51,20 @@ public class JobTaskManager implements Writable {
* used to record the start time of the task to be executed
* will clear when the task is executed
*/
- private static ConcurrentHashMap<Long, Map<Long, Long>>
prepareTaskCreateMsMap = new ConcurrentHashMap<>(16);
+ private static ConcurrentHashMap<Long, Map<Long, JobTask>>
prepareTaskCreateMsMap = new ConcurrentHashMap<>(16);
- public static void addPrepareTaskStartTime(Long jobId, Long taskId, Long
startTime) {
+ public static void addPrepareTask(JobTask jobTask) {
+ long jobId = jobTask.getJobId();
+ long taskId = jobTask.getTaskId();
prepareTaskCreateMsMap.computeIfAbsent(jobId, k -> new HashMap<>());
- prepareTaskCreateMsMap.get(jobId).put(taskId, startTime);
+ prepareTaskCreateMsMap.get(jobId).put(taskId, jobTask);
}
- public static Long pollPrepareTaskByTaskId(Long jobId, Long taskId) {
- if (!prepareTaskCreateMsMap.containsKey(jobId)) {
- // if the job is not in the map, return current time
- return System.currentTimeMillis();
+ public static JobTask pollPrepareTaskByTaskId(Long jobId, Long taskId) {
+ if (!prepareTaskCreateMsMap.containsKey(jobId) ||
!prepareTaskCreateMsMap.get(jobId).containsKey(taskId)) {
+ // if the job is not in the map, return new JobTask
+ // return new JobTask(jobId, taskId, System.currentTimeMillis());
fixme
+ return null;
}
return prepareTaskCreateMsMap.get(jobId).remove(taskId);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java
index c7a728cf049..33ac7d5f940 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java
@@ -19,7 +19,6 @@ package org.apache.doris.scheduler.manager;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
-import org.apache.doris.common.CustomThreadFactory;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.io.Writable;
@@ -28,8 +27,11 @@ import org.apache.doris.common.util.LogKey;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.scheduler.constants.JobCategory;
import org.apache.doris.scheduler.constants.JobStatus;
+import org.apache.doris.scheduler.constants.JobType;
+import org.apache.doris.scheduler.constants.TaskType;
import org.apache.doris.scheduler.disruptor.TaskDisruptor;
import org.apache.doris.scheduler.job.Job;
+import org.apache.doris.scheduler.job.JobTask;
import org.apache.doris.scheduler.job.TimerJobTask;
import io.netty.util.HashedWheelTimer;
@@ -88,13 +90,12 @@ public class TimerJobManager implements Closeable, Writable
{
}
public void start() {
- dorisTimer = new HashedWheelTimer(new
CustomThreadFactory("hashed-wheel-timer"),
- 1, TimeUnit.SECONDS, 660);
+ dorisTimer = new HashedWheelTimer(1, TimeUnit.SECONDS, 660);
dorisTimer.start();
Long currentTimeMs = System.currentTimeMillis();
jobMap.forEach((jobId, job) -> {
Long nextExecuteTimeMs = findFistExecuteTime(currentTimeMs,
job.getStartTimeMs(),
- job.getIntervalMs(), job.isCycleJob());
+ job.getIntervalMs(), job.getJobType());
job.setNextExecuteTimeMs(nextExecuteTimeMs);
});
batchSchedulerTasks();
@@ -152,18 +153,18 @@ public class TimerJobManager implements Closeable,
Writable {
}
private void initAndSchedulerJob(Job job) {
- if (!job.getJobStatus().equals(JobStatus.RUNNING)) {
+ if (!job.getJobStatus().equals(JobStatus.RUNNING) ||
job.getJobType().equals(JobType.MANUAL)) {
return;
}
Long currentTimeMs = System.currentTimeMillis();
Long nextExecuteTimeMs = findFistExecuteTime(currentTimeMs,
job.getStartTimeMs(),
- job.getIntervalMs(), job.isCycleJob());
+ job.getIntervalMs(), job.getJobType());
job.setNextExecuteTimeMs(nextExecuteTimeMs);
if (job.getNextExecuteTimeMs() < lastBatchSchedulerTimestamp) {
List<Long> executeTimestamp = findTasksBetweenTime(job,
lastBatchSchedulerTimestamp,
- job.getNextExecuteTimeMs());
+ job.getNextExecuteTimeMs(), job.getJobType());
if (!executeTimestamp.isEmpty()) {
for (Long timestamp : executeTimestamp) {
putOneTask(job.getJobId(), timestamp);
@@ -172,7 +173,7 @@ public class TimerJobManager implements Closeable, Writable
{
}
}
- private Long findFistExecuteTime(Long currentTimeMs, Long startTimeMs,
Long intervalMs, boolean isCycleJob) {
+ private Long findFistExecuteTime(Long currentTimeMs, Long startTimeMs,
Long intervalMs, JobType jobType) {
// if job not delay, first execute time is start time
if (startTimeMs != 0L && startTimeMs > currentTimeMs) {
return startTimeMs;
@@ -182,13 +183,30 @@ public class TimerJobManager implements Closeable,
Writable {
return currentTimeMs;
}
// if it's cycle job and not set start tine, first execute time is
current time + interval
- if (isCycleJob && startTimeMs == 0L) {
+ if (jobType.equals(JobType.RECURRING) && startTimeMs == 0L) {
return currentTimeMs + intervalMs;
}
// if it's not cycle job and already delay, first execute time is
current time
return currentTimeMs;
}
+ public <T> boolean immediateExecuteTask(Long jobId, T taskContextData)
throws DdlException {
+ Job job = jobMap.get(jobId);
+ if (job == null) {
+ log.warn("immediateExecuteTask failed, jobId: {} not exist",
jobId);
+ return false;
+ }
+ if (!job.getJobStatus().equals(JobStatus.RUNNING)) {
+ log.warn("immediateExecuteTask failed, jobId: {} is not running",
jobId);
+ return false;
+ }
+ JobTask jobTask = createInitialTask(jobId, taskContextData);
+ jobTask.setTaskType(TaskType.MANUAL_JOB_TASK);
+ JobTaskManager.addPrepareTask(jobTask);
+ disruptor.tryPublish(jobId, jobTask.getTaskId(),
TaskType.MANUAL_JOB_TASK);
+ return true;
+ }
+
public void unregisterJob(Long jobId) {
jobMap.remove(jobId);
}
@@ -204,6 +222,14 @@ public class TimerJobManager implements Closeable,
Writable {
pauseJob(job);
}
+ public void setJobLatestStatus(long jobId, boolean status) {
+ Job job = jobMap.get(jobId);
+ if (jobMap.get(jobId) == null) {
+ log.warn("pauseJob failed, jobId: {} not exist", jobId);
+ }
+ job.setLastExecuteTaskStatus(status);
+ }
+
public void stopJob(String dbName, String jobName, JobCategory
jobCategory) throws DdlException {
Optional<Job> optionalJob = findJob(dbName, jobName, jobCategory);
@@ -326,14 +352,14 @@ public class TimerJobManager implements Closeable,
Writable {
executeJobIdsWithinLastTenMinutesWindow();
}
- private List<Long> findTasksBetweenTime(Job job, Long endTimeEndWindow,
Long nextExecuteTime) {
+ private List<Long> findTasksBetweenTime(Job job, Long endTimeEndWindow,
Long nextExecuteTime, JobType jobType) {
List<Long> jobExecuteTimes = new ArrayList<>();
- if (!job.isCycleJob() && (nextExecuteTime < endTimeEndWindow)) {
+ if (!jobType.equals(JobType.RECURRING) && (nextExecuteTime <
endTimeEndWindow)) {
jobExecuteTimes.add(nextExecuteTime);
return jobExecuteTimes;
}
- if (job.isCycleJob() && (nextExecuteTime > endTimeEndWindow)) {
+ if (jobType.equals(JobType.RECURRING) && (nextExecuteTime >
endTimeEndWindow)) {
return new ArrayList<>();
}
while (endTimeEndWindow >= nextExecuteTime) {
@@ -360,11 +386,11 @@ public class TimerJobManager implements Closeable,
Writable {
return;
}
jobMap.forEach((k, v) -> {
- if (v.isRunning() && (v.getNextExecuteTimeMs()
+ if (!v.getJobType().equals(JobType.MANUAL) && v.isRunning() &&
(v.getNextExecuteTimeMs()
+ v.getIntervalMs() < lastBatchSchedulerTimestamp)) {
List<Long> executeTimes = findTasksBetweenTime(
v, lastBatchSchedulerTimestamp,
- v.getNextExecuteTimeMs());
+ v.getNextExecuteTimeMs(), v.getJobType());
if (!executeTimes.isEmpty()) {
for (Long executeTime : executeTimes) {
putOneTask(v.getJobId(), executeTime);
@@ -402,7 +428,8 @@ public class TimerJobManager implements Closeable, Writable
{
log.info("putOneTask failed, scheduler is closed, jobId: {}",
jobId);
return;
}
- long taskId = System.nanoTime();
+ JobTask jobTask = createAsyncInitialTask(jobId, startExecuteTime);
+ long taskId = jobTask.getTaskId();
TimerJobTask task = new TimerJobTask(jobId, taskId, startExecuteTime,
disruptor);
long delay = getDelaySecond(task.getStartTimestamp());
Timeout timeout = dorisTimer.newTimeout(task, delay, TimeUnit.SECONDS);
@@ -412,13 +439,13 @@ public class TimerJobManager implements Closeable,
Writable {
}
if (jobTimeoutMap.containsKey(task.getJobId())) {
jobTimeoutMap.get(task.getJobId()).put(task.getTaskId(), timeout);
- JobTaskManager.addPrepareTaskStartTime(jobId, taskId,
startExecuteTime);
+ JobTaskManager.addPrepareTask(jobTask);
return;
}
Map<Long, Timeout> timeoutMap = new ConcurrentHashMap<>();
timeoutMap.put(task.getTaskId(), timeout);
jobTimeoutMap.put(task.getJobId(), timeoutMap);
- JobTaskManager.addPrepareTaskStartTime(jobId, taskId,
startExecuteTime);
+ JobTaskManager.addPrepareTask(jobTask);
}
// cancel all task for one job
@@ -488,9 +515,19 @@ public class TimerJobManager implements Closeable,
Writable {
}
public void putOneJobToQueen(Long jobId) {
+ JobTask jobTask = createInitialTask(jobId, null);
+ JobTaskManager.addPrepareTask(jobTask);
+ disruptor.tryPublish(jobId, jobTask.getTaskId());
+ }
+
+ private JobTask createAsyncInitialTask(long jobId, long createTimeMs) {
+ long taskId = System.nanoTime();
+ return new JobTask(jobId, taskId, createTimeMs);
+ }
+
+ private <T> JobTask createInitialTask(long jobId, T context) {
long taskId = System.nanoTime();
- JobTaskManager.addPrepareTaskStartTime(jobId, taskId,
System.currentTimeMillis());
- disruptor.tryPublish(jobId, taskId);
+ return new JobTask(jobId, taskId, System.currentTimeMillis(), context);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/PersistentJobRegister.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/PersistentJobRegister.java
index 4ee17bb8df0..f1a901299f7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/PersistentJobRegister.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/PersistentJobRegister.java
@@ -110,6 +110,19 @@ public interface PersistentJobRegister {
Long registerJob(Job job) throws DdlException;
+
+ /**
+ * execute job task immediately,this method will not change job status and
don't affect scheduler job
+ * this task type should set to {@link
org.apache.doris.scheduler.constants.TaskType#MANUAL_JOB_TASK}
+ *
+ * @param jobId job id
+ * @param contextData if you need to pass parameters to the task,
+ * @param <T> context data type
+ * @return true if execute success, false if execute failed,
+ * if job is not exist or job is not running, or job not support manual
execute, return false
+ */
+ <T> boolean immediateExecuteTask(Long jobId, T contextData) throws
DdlException;
+
List<Job> getJobs(String dbFullName, String jobName, JobCategory
jobCategory, PatternMatcher matcher);
/**
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/TimerJobRegister.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/TimerJobRegister.java
index 5ce802eff72..f8ab59e5d54 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/TimerJobRegister.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/TimerJobRegister.java
@@ -68,6 +68,11 @@ public class TimerJobRegister implements
PersistentJobRegister {
return timerJobManager.registerJob(job);
}
+ @Override
+ public <T> boolean immediateExecuteTask(Long jobId, T data) throws
DdlException {
+ return timerJobManager.immediateExecuteTask(jobId, data);
+ }
+
@Override
public void pauseJob(Long jobId) {
timerJobManager.pauseJob(jobId);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/JobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/JobTest.java
index ad5f740907c..57fbfda6cf9 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/JobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/JobTest.java
@@ -19,6 +19,7 @@ package org.apache.doris.scheduler.disruptor;
import org.apache.doris.scheduler.common.IntervalUnit;
import org.apache.doris.scheduler.constants.JobCategory;
+import org.apache.doris.scheduler.constants.JobType;
import org.apache.doris.scheduler.executor.SqlJobExecutor;
import org.apache.doris.scheduler.job.Job;
@@ -42,7 +43,7 @@ public class JobTest {
public static void init() {
SqlJobExecutor sqlJobExecutor = new SqlJobExecutor("insert into test
values(1);");
job = new Job("insertTest", 1000L, System.currentTimeMillis(),
System.currentTimeMillis() + 100000, sqlJobExecutor);
- job.setCycleJob(true);
+ job.setJobType(JobType.RECURRING);
job.setComment("test");
job.setOriginInterval(10L);
job.setIntervalUnit(IntervalUnit.SECOND);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TaskDisruptorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TaskDisruptorTest.java
index 1036285c47b..b8482b11fff 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TaskDisruptorTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TaskDisruptorTest.java
@@ -19,9 +19,12 @@ package org.apache.doris.scheduler.disruptor;
import org.apache.doris.catalog.Env;
import org.apache.doris.scheduler.constants.JobCategory;
+import org.apache.doris.scheduler.exception.JobException;
import org.apache.doris.scheduler.executor.JobExecutor;
import org.apache.doris.scheduler.job.ExecutorResult;
import org.apache.doris.scheduler.job.Job;
+import org.apache.doris.scheduler.job.JobTask;
+import org.apache.doris.scheduler.manager.JobTaskManager;
import org.apache.doris.scheduler.manager.TimerJobManager;
import org.apache.doris.scheduler.manager.TransientTaskManager;
@@ -56,26 +59,30 @@ public class TaskDisruptorTest {
@BeforeEach
public void init() {
taskDisruptor = new TaskDisruptor(timerJobManager,
transientTaskManager);
+ taskDisruptor.start();
}
@Test
void testPublishEventAndConsumer() {
Job job = new Job("test", 6000L, null,
null, new TestExecutor());
+ JobTask jobTask = new JobTask(job.getJobId(), 1L,
System.currentTimeMillis());
+ JobTaskManager.addPrepareTask(jobTask);
job.setJobCategory(JobCategory.COMMON);
new Expectations() {{
timerJobManager.getJob(anyLong);
result = job;
}};
taskDisruptor.tryPublish(job.getJobId(), 1L);
- Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() ->
testEventExecuteFlag);
+ Awaitility.await().atMost(3, TimeUnit.SECONDS).until(() ->
testEventExecuteFlag);
Assertions.assertTrue(testEventExecuteFlag);
}
- class TestExecutor implements JobExecutor<Boolean> {
+ class TestExecutor implements JobExecutor<Boolean, String> {
+
@Override
- public ExecutorResult execute(Job job) {
+ public ExecutorResult<Boolean> execute(Job job, String dataContext)
throws JobException {
testEventExecuteFlag = true;
return new ExecutorResult(true, true, null, "null");
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerJobManagerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerJobManagerTest.java
index 3e912b8fd85..5fcf242fd13 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerJobManagerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerJobManagerTest.java
@@ -21,9 +21,12 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import org.apache.doris.persist.EditLog;
import org.apache.doris.scheduler.constants.JobCategory;
+import org.apache.doris.scheduler.constants.JobType;
+import org.apache.doris.scheduler.exception.JobException;
import org.apache.doris.scheduler.executor.JobExecutor;
import org.apache.doris.scheduler.job.ExecutorResult;
import org.apache.doris.scheduler.job.Job;
+import org.apache.doris.scheduler.job.JobTask;
import org.apache.doris.scheduler.manager.TimerJobManager;
import org.apache.doris.scheduler.manager.TransientTaskManager;
@@ -51,16 +54,18 @@ public class TimerJobManagerTest {
private static AtomicInteger testExecuteCount = new AtomicInteger(0);
Job job = new Job("test", 6000L, null,
null, new TestExecutor());
+ JobTask jobTask = new JobTask(job.getJobId(), 1L,
System.currentTimeMillis());
@BeforeEach
public void init() {
- job.setCycleJob(true);
+ job.setJobType(JobType.RECURRING);
job.setJobCategory(JobCategory.COMMON);
testExecuteCount.set(0);
timerJobManager = new TimerJobManager();
TransientTaskManager transientTaskManager = new TransientTaskManager();
TaskDisruptor taskDisruptor = new TaskDisruptor(this.timerJobManager,
transientTaskManager);
this.timerJobManager.setDisruptor(taskDisruptor);
+ taskDisruptor.start();
timerJobManager.start();
}
@@ -153,7 +158,7 @@ public class TimerJobManagerTest {
long startTimestamp = System.currentTimeMillis() + 3000L;
job.setIntervalMs(0L);
job.setStartTimeMs(startTimestamp);
- job.setCycleJob(false);
+ job.setJobType(JobType.ONE_TIME);
timerJobManager.registerJob(job);
//consider the time of the first execution and give some buffer time
Awaitility.await().atMost(14, TimeUnit.SECONDS).until(() ->
System.currentTimeMillis()
@@ -166,9 +171,10 @@ public class TimerJobManagerTest {
timerJobManager.close();
}
- class TestExecutor implements JobExecutor<Boolean> {
+ class TestExecutor implements JobExecutor<Boolean, String> {
+
@Override
- public ExecutorResult execute(Job job) {
+ public ExecutorResult<Boolean> execute(Job job, String dataContext)
throws JobException {
log.info("test execute count:{}",
testExecuteCount.incrementAndGet());
return new ExecutorResult<>(true, true, null, "");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]