This is an automated email from the ASF dual-hosted git repository.
zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new cbf1f8620a4 [Feature](job)support cancel task and fix log invalid
(#27703)
cbf1f8620a4 is described below
commit cbf1f8620a4be7117d81edaf0a130d5a5ac19c67
Author: Calvin Kirs <[email protected]>
AuthorDate: Wed Dec 6 10:44:09 2023 +0800
[Feature](job)support cancel task and fix log invalid (#27703)
- Running task can be show and fix cancel fail
- When the insert task scheduling cycle is reached, if there are still
tasks running, the scheduling of this task will be canceled at this time.
- refactor job status changes SQL
- Fix timer job window error
- Support cancel task
---
.../Create/CREATE-JOB.md | 48 ++++++++++++--
.../main/java/org/apache/doris/common/Config.java | 13 +++-
fe/fe-core/src/main/cup/sql_parser.cup | 34 ++++++----
...{ResumeJobStmt.java => AlterJobStatusStmt.java} | 52 +++++++--------
.../apache/doris/analysis/CancelJobTaskStmt.java | 72 ++++++++++++++++++++
.../org/apache/doris/analysis/CreateJobStmt.java | 30 ++++++++-
.../org/apache/doris/analysis/PauseJobStmt.java | 69 -------------------
.../org/apache/doris/analysis/StopJobStmt.java | 49 --------------
.../main/java/org/apache/doris/catalog/Env.java | 2 +-
.../org/apache/doris/job/base/AbstractJob.java | 77 ++++++++++++++++++----
.../main/java/org/apache/doris/job/base/Job.java | 21 ++++--
.../org/apache/doris/job/common/IntervalUnit.java | 3 +-
.../org/apache/doris/job/common/TaskStatus.java | 4 +-
.../apache/doris/job/disruptor/TimerJobEvent.java | 4 +-
.../job/executor/DefaultTaskExecutorHandler.java | 4 +-
.../doris/job/executor/DispatchTaskHandler.java | 22 +++++--
.../doris/job/executor/TimerJobSchedulerTask.java | 14 ++--
.../doris/job/extensions/insert/InsertJob.java | 24 +++----
.../doris/job/extensions/insert/InsertTask.java | 30 ++++++++-
.../apache/doris/job/extensions/mtmv/MTMVJob.java | 7 +-
.../org/apache/doris/job/manager/JobManager.java | 54 ++++++++-------
.../job/manager/TaskDisruptorGroupManager.java | 10 +--
.../apache/doris/job/manager/TaskTokenManager.java | 4 +-
.../apache/doris/job/scheduler/JobScheduler.java | 57 ++++++++++------
.../org/apache/doris/job/task/AbstractTask.java | 26 +++++---
.../main/java/org/apache/doris/job/task/Task.java | 2 +-
.../java/org/apache/doris/mtmv/MTMVJobManager.java | 6 +-
.../trees/plans/commands/InsertExecutor.java | 4 ++
.../plans/commands/InsertIntoTableCommand.java | 3 +
.../main/java/org/apache/doris/qe/DdlExecutor.java | 26 +++-----
.../java/org/apache/doris/qe/ShowExecutor.java | 2 +-
.../java/org/apache/doris/qe/StmtExecutor.java | 3 +
.../job/base/JobExecutionConfigurationTest.java | 14 ++--
regression-test/pipeline/p0/conf/fe.conf | 4 ++
.../suites/job_p0/test_base_insert_job.groovy | 60 +++++++++++++----
35 files changed, 529 insertions(+), 325 deletions(-)
diff --git
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md
index c06d48a4d01..1202bbff423 100644
---
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md
+++
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md
@@ -34,12 +34,28 @@ CREATE JOB
### Description
Doris Job 是根据既定计划运行的任务,用于在特定时间或指定时间间隔触发预定义的操作,从而帮助我们自动执行一些任务。从功能上来讲,它类似于操作系统上的
-定时任务(如:Linux 中的 cron、Windows 中的计划任务)。但 Doris 的 Job 调度可以精确到秒级。
+定时任务(如:Linux 中的 cron、Windows 中的计划任务)。
-Job 有两种类型:`ONE_TIME` 和 `BATCH`。其中 `ONE_TIME` 类型的 Job 会在指定的时间点触发,它主要用于一次性任务,而
`BATCH` 类型的 Job 会在指定的时间间隔内循环触发。
-主要用于周期性执行的任务。
+Job 有两种类型:`ONE_TIME` 和 `RECURRING`。其中 `ONE_TIME` 类型的 Job
会在指定的时间点触发,它主要用于一次性任务,而 `RECURRING` 类型的 Job 会在指定的时间间隔内循环触发, 此方式主要用于周期性执行的任务。
+`RECURRING` 类型的 Job 可指定开始时间,结束时间,即 `STARTS\ENDS`, 如果不指定开始时间,则默认首次执行时间为当前时间 +
一次调度周期。如果指定结束时间,则 task 执行完成如果达到结束时间(或超过,或下次执行周期会超过结束时间)则更新为FINISHED状态,此时不会再产生
Task。
+
+JOB
共4种状态(`RUNNING`,`STOPPED`,`PAUSED`,`FINISHED`,),初始状态为RUNNING,RUNNING状态的JOB会根据既定的调度周期去生成
TASK 执行,Job 执行完成达到结束时间则状态变更为 `FINISHED`.
+
+RUNNING 状态的JOB 可以被 pause,即暂停,此时不会再生成 Task。
+
+PAUSE状态的 JOB 可以通过 RESUME 操作来恢复运行,更改为RUNNING状态。
+
+STOP 状态的 JOB 由用户主动触发,此时会 Cancel 正在运行中的作业,然后删除 JOB。
+
+Finished 状态的 JOB 会保留在系统中 24 H,24H 后会被删除。
+
+JOB 只描述作业信息, 执行会生成 TASK, TASK 状态分为 PENDING,RUNNING,SUCCEESS,FAILED,CANCELED
+PENDING 表示到达触发时间了但是等待资源 RUN, 分配到资源后状态变更为RUNNING ,执行成功/失败即变更为 SUCCESS/FAILED.
+CANCELED 即取消状态 ,TASK持久化最终状态,即SUCCESS/FAILED,其他状态运行中可以查到,但是如果重启则不可见。TASK只保留最新的
100 条记录。
+
+- 目前仅支持 ***ADMIN*** 权限执行此操作。
+- 目前仅支持 ***INSERT 内表***
-目前仅支持 ***ADMIN*** 权限执行此操作。
语法:
@@ -59,8 +75,7 @@ schedule: {
}
interval:
- quantity { DAY | HOUR | MINUTE |
- WEEK | SECOND }
+ quantity { WEEK | DAY | HOUR | MINUTE }
```
一条有效的 Job 语句必须包含以下内容
@@ -88,7 +103,7 @@ SCHEDULER 语句用于定义作业的执行时间,频率以及持续时间,
- interval
- 用于指定作业执行频率,它可以是天、小时、分钟、秒、周。例如:` 1 DAY` 表示每天执行一次,` 1 HOUR` 表示每小时执行一次,` 1
MINUTE` 表示每分钟执行一次,` 1 WEEK` 表示每周执行一次,` 1 SECOND` 表示每秒执行一次。
+ 用于指定作业执行频率,它可以是天、小时、分钟、周。例如:` 1 DAY` 表示每天执行一次,` 1 HOUR` 表示每小时执行一次,` 1
MINUTE` 表示每分钟执行一次,` 1 WEEK` 表示每周执行一次。
- STARTS timestamp
@@ -122,6 +137,25 @@ CREATE JOB my_job ON SCHEDULE EVERY 1 DAY STARTS
'2020-01-01 00:00:00' DO INSERT
```sql
CREATE JOB my_job ON SCHEDULER EVERY 1 DAY STARTS '2020-01-01 00:00:00' ENDS
'2020-01-01 00:10:00' DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2
create_time >= days_add(now(),-1);
```
+### INSERT JOB
+目前仅支持 ***INSERT 内表***
+
+### CONFIG
+
+fe.conf
+
+- job_dispatch_timer_job_thread_num, 用于分发定时任务的线程数, 默认值2,如果含有大量周期执行任务,可以调大这个参数。
+
+- job_dispatch_timer_job_queue_size, 任务堆积时用于存放定时任务的队列大小,默认值 1024.
如果有大量任务同一时间触发,可以调大这个参数。否则会导致队列满,提交任务会进入阻塞状态,从而导致后续任务无法提交。
+
+- finished_job_cleanup_threshold_time_hour, 用于清理已完成的任务的时间阈值,单位为小时,默认值为24小时。
+
+- job_insert_task_consumer_thread_num = 10;用于执行 Insert 任务的线程数, 值应该大于0,否则默认为5.
+
+### Best Practice
+
+合理的进行 Job 的管理,避免大量的 Job 同时触发,导致任务堆积,从而影响系统的正常运行。
+任务的执行间隔应该设置在一个合理的范围,至少应该大于任务执行时间。
### Keywords
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index d6be0fb9190..d350a44c9fa 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1570,7 +1570,7 @@ public class Config extends ConfigBase {
*/
@ConfField(description = {"用于分发定时任务的线程数",
"The number of threads used to dispatch timer job."})
- public static int job_dispatch_timer_job_thread_num = 5;
+ public static int job_dispatch_timer_job_thread_num = 2;
/**
* The number of timer jobs that can be queued.
@@ -1582,6 +1582,10 @@ public class Config extends ConfigBase {
@ConfField(description = {"任务堆积时用于存放定时任务的队列大小", "The number of timer jobs
that can be queued."})
public static int job_dispatch_timer_job_queue_size = 1024;
+ @ConfField(description = {"finished 状态的 job 最长保存时间,超过这个时间将会被删除, 单位:小时",
+ "The longest time to save the job in finished status, it will be
deleted after this time. Unit: hour"})
+ public static int finished_job_cleanup_threshold_time_hour = 24;
+
@ConfField(description = {"用于执行 Insert 任务的线程数,值应该大于0,否则默认为5",
"The number of threads used to consume Insert tasks, "
+ "the value should be greater than 0, if it is <=0,
default is 5."})
@@ -1592,6 +1596,13 @@ public class Config extends ConfigBase {
+ "the value should be greater than 0, if it is <=0,
default is 5."})
public static int job_mtmv_task_consumer_thread_num = 10;
+ /* job test config */
+ /**
+ * If set to true, we will allow the interval unit to be set to second,
when creating a recurring job.
+ */
+ @ConfField
+ public static boolean enable_job_schedule_second_for_test = false;
+
/*---------------------- JOB CONFIG END------------------------*/
/**
* The number of async tasks that can be queued. @See TaskDisruptor
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup
b/fe/fe-core/src/main/cup/sql_parser.cup
index 6334f802073..bb6c09e1d42 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -688,7 +688,7 @@ nonterminal StatementBase stmt, show_stmt, show_param,
help_stmt, load_stmt,
create_routine_load_stmt, pause_routine_load_stmt,
resume_routine_load_stmt, stop_routine_load_stmt,
show_routine_load_stmt, show_routine_load_task_stmt,
show_create_routine_load_stmt, show_create_load_stmt,
show_create_reporitory_stmt,
describe_stmt, alter_stmt, unset_var_stmt,
- create_job_stmt,pause_job_stmt,resume_job_stmt,stop_job_stmt,show_job_stmt,
+
create_job_stmt,pause_job_stmt,resume_job_stmt,stop_job_stmt,show_job_stmt,cancel_job_task_stmt,
use_stmt, kill_stmt, drop_stmt, recover_stmt, grant_stmt, revoke_stmt,
create_stmt, set_stmt, sync_stmt, cancel_stmt, cancel_param, delete_stmt,
switch_stmt, transaction_stmt, unsupported_stmt, export_stmt, admin_stmt,
truncate_stmt,
import_columns_stmt, import_delete_on_stmt, import_sequence_stmt,
import_where_stmt, install_plugin_stmt, uninstall_plugin_stmt,
@@ -1157,6 +1157,8 @@ stmt ::=
{: RESULT = stmt; :}
| pause_job_stmt : stmt
{: RESULT = stmt; :}
+ | cancel_job_task_stmt : stmt
+ {: RESULT = stmt; :}
| show_job_stmt : stmt
{: RESULT = stmt; :}
| stop_job_stmt : stmt
@@ -2589,13 +2591,7 @@ create_job_stmt ::=
{:
RESULT = endTime;
:}
- ;
-resume_job_stmt ::=
- KW_RESUME KW_JOB KW_FOR job_label:jobLabel
- {:
- RESULT = new ResumeJobStmt(jobLabel);
- :}
- ;
+ ;
show_job_stmt ::=
KW_SHOW KW_JOBS
{:
@@ -2623,18 +2619,30 @@ show_job_stmt ::=
:}
;
pause_job_stmt ::=
- KW_PAUSE KW_JOB KW_FOR job_label:jobLabel
+ KW_PAUSE KW_JOB opt_wild_where
{:
- RESULT = new PauseJobStmt(jobLabel);
+ RESULT = new
AlterJobStatusStmt(parser.where,org.apache.doris.job.common.JobStatus.PAUSED);
:}
;
stop_job_stmt ::=
- KW_STOP KW_JOB KW_FOR job_label:jobLabel
+ KW_DROP KW_JOB opt_wild_where
+ {:
+ RESULT = new
AlterJobStatusStmt(parser.where,org.apache.doris.job.common.JobStatus.STOPPED);
+ :}
+ ;
+resume_job_stmt ::=
+ KW_RESUME KW_JOB opt_wild_where
+ {:
+ RESULT = new
AlterJobStatusStmt(parser.where,org.apache.doris.job.common.JobStatus.RUNNING);
+ :}
+ ;
+cancel_job_task_stmt ::=
+ KW_CANCEL KW_TASK opt_wild_where
{:
- RESULT = new StopJobStmt(jobLabel);
+ RESULT = new CancelJobTaskStmt(parser.where);
:}
- ;
+ ;
// Routine load statement
create_routine_load_stmt ::=
KW_CREATE KW_ROUTINE KW_LOAD job_label:jobLabel optional_on_ident:tableName
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ResumeJobStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterJobStatusStmt.java
similarity index 51%
rename from
fe/fe-core/src/main/java/org/apache/doris/analysis/ResumeJobStmt.java
rename to
fe/fe-core/src/main/java/org/apache/doris/analysis/AlterJobStatusStmt.java
index 725d24f47af..09f135d2fb0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ResumeJobStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterJobStatusStmt.java
@@ -17,47 +17,47 @@
package org.apache.doris.analysis;
-import org.apache.doris.cluster.ClusterNamespace;
-import org.apache.doris.common.ErrorCode;
-import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
+import org.apache.doris.job.common.JobStatus;
import com.google.common.base.Strings;
+import lombok.Getter;
-public class ResumeJobStmt extends DdlStmt {
+public class AlterJobStatusStmt extends DdlStmt {
- private final LabelName labelName;
+ private Expr expr;
- private String db;
+ private static final String columnName = "jobName";
- public ResumeJobStmt(LabelName labelName) {
- this.labelName = labelName;
- }
-
- public boolean isAll() {
- return labelName == null;
- }
+ @Getter
+ private String jobName;
- public String getName() {
- return labelName.getLabelName();
- }
+ @Getter
+ private JobStatus jobStatus;
- public String getDbFullName() {
- return db;
+ public AlterJobStatusStmt(Expr whereClause, JobStatus jobStatus) {
+ this.expr = whereClause;
+ this.jobStatus = jobStatus;
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
CreateJobStmt.checkAuth();
- if (labelName != null) {
- labelName.analyze(analyzer);
- db = labelName.getDbName();
- } else {
- if (Strings.isNullOrEmpty(analyzer.getDefaultDb())) {
- ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
- }
- db = ClusterNamespace.getFullName(analyzer.getClusterName(),
analyzer.getDefaultDb());
+ String inputCol = ((SlotRef) expr.getChild(0)).getColumnName();
+ if (!inputCol.equalsIgnoreCase(columnName)) {
+ throw new AnalysisException("Current not support " + inputCol);
}
+ if (!(expr.getChild(1) instanceof StringLiteral)) {
+ throw new AnalysisException("Value must is string");
+ }
+
+ String inputValue = expr.getChild(1).getStringValue();
+ if (Strings.isNullOrEmpty(inputValue)) {
+ throw new AnalysisException("Value can't is null");
+ }
+ this.jobName = inputValue;
+
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelJobTaskStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelJobTaskStmt.java
new file mode 100644
index 00000000000..ea6febe1630
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelJobTaskStmt.java
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+//
https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/StringLiteral.java
+// and modified by Doris
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
+
+import lombok.Getter;
+
+public class CancelJobTaskStmt extends DdlStmt {
+
+ @Getter
+ private String jobName;
+
+ @Getter
+ private Long taskId;
+
+ private Expr expr;
+
+ private static final String jobNameKey = "jobName";
+
+ private static final String taskIdKey = "taskId";
+
+ public CancelJobTaskStmt(Expr whereExpr) {
+ this.expr = whereExpr;
+ }
+
+ @Override
+ public void analyze(Analyzer analyzer) throws UserException {
+ super.analyze(analyzer);
+ CreateJobStmt.checkAuth();
+ CompoundPredicate compoundPredicate = (CompoundPredicate) expr;
+ if (!compoundPredicate.getOp().equals(CompoundPredicate.Operator.AND))
{
+ throw new AnalysisException("Only allow compound predicate with
operator AND");
+ }
+ String jobNameInput = ((SlotRef)
compoundPredicate.getChildren().get(0).getChild(0)).getColumnName();
+ if (!jobNameKey.equalsIgnoreCase(jobNameInput)) {
+ throw new AnalysisException("Current not support " + jobNameInput);
+ }
+
+ if (!(compoundPredicate.getChildren().get(0).getChild(1) instanceof
StringLiteral)) {
+ throw new AnalysisException("JobName value must is string");
+ }
+ this.jobName =
compoundPredicate.getChildren().get(0).getChild(1).getStringValue();
+ String taskIdInput = ((SlotRef)
compoundPredicate.getChildren().get(1).getChild(0)).getColumnName();
+ if (!taskIdKey.equalsIgnoreCase(taskIdInput)) {
+ throw new AnalysisException("Current not support " + taskIdInput);
+ }
+ if (!(compoundPredicate.getChildren().get(1).getChild(1) instanceof
IntLiteral)) {
+ throw new AnalysisException("task id value must is large int");
+ }
+ this.taskId = ((IntLiteral)
compoundPredicate.getChildren().get(1).getChild(1)).getLongValue();
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
index a1c008fb95e..1a44ee4f90c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
@@ -19,6 +19,7 @@ package org.apache.doris.analysis;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
@@ -39,6 +40,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.util.HashSet;
+import java.util.Set;
/**
* syntax:
@@ -66,7 +68,7 @@ public class CreateJobStmt extends DdlStmt {
private StatementBase doStmt;
@Getter
- private AbstractJob<?> jobInstance;
+ private AbstractJob jobInstance;
private final LabelName labelName;
@@ -83,6 +85,13 @@ public class CreateJobStmt extends DdlStmt {
private final String comment;
private JobExecuteType executeType;
+ // exclude job name prefix, which is used by inner job
+ private final Set<String> excludeJobNamePrefix = new HashSet<>();
+
+ {
+ excludeJobNamePrefix.add("inner_mtmv_");
+ }
+
private static final ImmutableSet<Class<? extends DdlStmt>>
supportStmtSuperClass
= new ImmutableSet.Builder<Class<? extends
DdlStmt>>().add(InsertStmt.class)
.add(UpdateStmt.class).build();
@@ -126,7 +135,15 @@ public class CreateJobStmt extends DdlStmt {
timerDefinition.setInterval(interval);
}
if (null != intervalTimeUnit) {
-
timerDefinition.setIntervalUnit(IntervalUnit.valueOf(intervalTimeUnit.toUpperCase()));
+ IntervalUnit intervalUnit =
IntervalUnit.fromString(intervalTimeUnit.toUpperCase());
+ if (null == intervalUnit) {
+ throw new AnalysisException("interval time unit can not be " +
intervalTimeUnit);
+ }
+ if (intervalUnit.equals(IntervalUnit.SECOND)
+ && !Config.enable_job_schedule_second_for_test) {
+ throw new AnalysisException("interval time unit can not be
second");
+ }
+ timerDefinition.setIntervalUnit(intervalUnit);
}
if (null != startsTimeStamp) {
timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(startsTimeStamp));
@@ -134,6 +151,7 @@ public class CreateJobStmt extends DdlStmt {
if (null != endsTimeStamp) {
timerDefinition.setEndTimeMs(TimeUtils.timeStringToLong(endsTimeStamp));
}
+ checkJobName(labelName.getLabelName());
jobExecutionConfiguration.setTimerDefinition(timerDefinition);
job.setJobConfig(jobExecutionConfiguration);
@@ -151,6 +169,14 @@ public class CreateJobStmt extends DdlStmt {
jobInstance = job;
}
+ private void checkJobName(String jobName) throws AnalysisException {
+ for (String prefix : excludeJobNamePrefix) {
+ if (jobName.startsWith(prefix)) {
+ throw new AnalysisException("job name can not start with " +
prefix);
+ }
+ }
+ }
+
protected static void checkAuth() throws AnalysisException {
if
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(),
PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
"ADMIN");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/PauseJobStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/PauseJobStmt.java
deleted file mode 100644
index c399fc37cca..00000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/PauseJobStmt.java
+++ /dev/null
@@ -1,69 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.analysis;
-
-import org.apache.doris.cluster.ClusterNamespace;
-import org.apache.doris.common.ErrorCode;
-import org.apache.doris.common.ErrorReport;
-import org.apache.doris.common.UserException;
-
-import com.google.common.base.Strings;
-
-/**
- * syntax:
- * PAUSE JOB FOR [database.]name
- * we can pause a job by jobName
- * it's only running job can be paused, and it will be paused immediately
- * paused job can be resumed by RESUME EVENT FOR jobName
- */
-public class PauseJobStmt extends DdlStmt {
-
- private final LabelName labelName;
- private String db;
-
- public PauseJobStmt(LabelName labelName) {
- this.labelName = labelName;
- }
-
- public boolean isAll() {
- return labelName == null;
- }
-
- public String getName() {
- return labelName.getLabelName();
- }
-
- public String getDbFullName() {
- return db;
- }
-
- @Override
- public void analyze(Analyzer analyzer) throws UserException {
- super.analyze(analyzer);
- CreateJobStmt.checkAuth();
- if (labelName != null) {
- labelName.analyze(analyzer);
- db = labelName.getDbName();
- } else {
- if (Strings.isNullOrEmpty(analyzer.getDefaultDb())) {
- ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
- }
- db = ClusterNamespace.getFullName(analyzer.getClusterName(),
analyzer.getDefaultDb());
- }
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/StopJobStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/StopJobStmt.java
deleted file mode 100644
index afef3ef01b5..00000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StopJobStmt.java
+++ /dev/null
@@ -1,49 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.analysis;
-
-import org.apache.doris.common.UserException;
-
-/**
- * syntax:
- * STOP JOB FOR [database.]name
- * only run job can be stopped, and stopped job can't be resumed
- */
-public class StopJobStmt extends DdlStmt {
-
- private final LabelName labelName;
-
- public StopJobStmt(LabelName labelName) {
- this.labelName = labelName;
- }
-
- public String getName() {
- return labelName.getLabelName();
- }
-
- public String getDbFullName() {
- return labelName.getDbName();
- }
-
- @Override
- public void analyze(Analyzer analyzer) throws UserException {
- super.analyze(analyzer);
- CreateJobStmt.checkAuth();
- labelName.analyze(analyzer);
- }
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 2e5874fed12..61379c1b28e 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -360,7 +360,7 @@ public class Env {
private MetastoreEventsProcessor metastoreEventsProcessor;
private ExportTaskRegister exportTaskRegister;
- private JobManager<? extends AbstractJob> jobManager;
+ private JobManager<? extends AbstractJob<?, ?>, ?> jobManager;
private TransientTaskManager transientTaskManager;
private MasterDaemon labelCleaner; // To clean old LabelInfo,
ExportJobInfos
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
index ca98756f6d3..83f02326d82 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
@@ -26,6 +26,7 @@ import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.TaskStatus;
+import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.persist.gson.GsonUtils;
@@ -36,6 +37,7 @@ import org.apache.doris.thrift.TRow;
import com.google.common.collect.ImmutableList;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
+import lombok.extern.log4j.Log4j2;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.RandomUtils;
@@ -43,10 +45,14 @@ import java.io.DataInput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Comparator;
import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
@Data
-public abstract class AbstractJob<T extends AbstractTask> implements Job<T>,
Writable {
+@Log4j2
+public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T,
C>, Writable {
@SerializedName(value = "jid")
private Long jobId;
@@ -75,6 +81,9 @@ public abstract class AbstractJob<T extends AbstractTask>
implements Job<T>, Wri
@SerializedName(value = "sql")
String executeSql;
+ @SerializedName(value = "ftm")
+ private long finishTimeMs;
+
private List<T> runningTasks = new ArrayList<>();
@Override
@@ -106,7 +115,43 @@ public abstract class AbstractJob<T extends AbstractTask>
implements Job<T>, Wri
throw new JobException("no running task");
}
runningTasks.stream().filter(task ->
task.getTaskId().equals(taskId)).findFirst()
- .orElseThrow(() -> new JobException("no task id:" +
taskId)).cancel();
+ .orElseThrow(() -> new JobException("no task id: " +
taskId)).cancel();
+ if (jobConfig.getExecuteType().equals(JobExecuteType.ONE_TIME)) {
+ updateJobStatus(JobStatus.FINISHED);
+ }
+ }
+
+ public List<T> queryAllTasks() {
+ List<T> tasks = new ArrayList<>();
+ if (CollectionUtils.isEmpty(runningTasks)) {
+ return queryTasks();
+ }
+
+ List<T> historyTasks = queryTasks();
+ if (CollectionUtils.isNotEmpty(historyTasks)) {
+ tasks.addAll(historyTasks);
+ }
+ Set<Long> loadTaskIds =
tasks.stream().map(AbstractTask::getTaskId).collect(Collectors.toSet());
+ runningTasks.forEach(task -> {
+ if (!loadTaskIds.contains(task.getTaskId())) {
+ tasks.add(task);
+ }
+ });
+ Comparator<T> taskComparator =
Comparator.comparingLong(T::getCreateTimeMs).reversed();
+ tasks.sort(taskComparator);
+ return tasks;
+ }
+
+ public List<T> commonCreateTasks(TaskType taskType, C taskContext) {
+ if (!getJobStatus().equals(JobStatus.RUNNING)) {
+ log.warn("job is not running, job id is {}", jobId);
+ return new ArrayList<>();
+ }
+ if (!isReadyForScheduling(taskContext)) {
+ log.info("job is not ready for scheduling, job id is {}", jobId);
+ return new ArrayList<>();
+ }
+ return createTasks(taskType, taskContext);
}
public void initTasks(List<? extends AbstractTask> tasks) {
@@ -133,21 +178,26 @@ public abstract class AbstractJob<T extends AbstractTask>
implements Job<T>, Wri
checkJobParamsInternal();
}
- public void updateJobStatus(JobStatus newJobStatus) {
+ public void updateJobStatus(JobStatus newJobStatus) throws JobException {
if (null == newJobStatus) {
throw new IllegalArgumentException("jobStatus cannot be null");
}
+ String errorMsg = String.format("Can't update job %s status to the %s
status",
+ jobStatus.name(), newJobStatus.name());
if (jobStatus == newJobStatus) {
- throw new IllegalArgumentException(String.format("Can't update job
%s status to the %s status",
- jobStatus.name(), this.jobStatus.name()));
+ throw new IllegalArgumentException(errorMsg);
}
if (newJobStatus.equals(JobStatus.RUNNING) &&
!jobStatus.equals(JobStatus.PAUSED)) {
- throw new IllegalArgumentException(String.format("Can't update job
%s status to the %s status",
- jobStatus.name(), this.jobStatus.name()));
+ throw new IllegalArgumentException(errorMsg);
}
if (newJobStatus.equals(JobStatus.STOPPED) &&
!jobStatus.equals(JobStatus.RUNNING)) {
- throw new IllegalArgumentException(String.format("Can't update job
%s status to the %s status",
- jobStatus.name(), this.jobStatus.name()));
+ throw new IllegalArgumentException(errorMsg);
+ }
+ if (newJobStatus.equals(JobStatus.FINISHED)) {
+ this.finishTimeMs = System.currentTimeMillis();
+ }
+ if (JobStatus.PAUSED.equals(newJobStatus)) {
+ cancelAllTasks();
}
jobStatus = newJobStatus;
}
@@ -157,25 +207,26 @@ public abstract class AbstractJob<T extends AbstractTask>
implements Job<T>, Wri
public static AbstractJob readFields(DataInput in) throws IOException {
String jsonJob = Text.readString(in);
- AbstractJob<?> job = GsonUtils.GSON.fromJson(jsonJob,
AbstractJob.class);
+ AbstractJob job = GsonUtils.GSON.fromJson(jsonJob, AbstractJob.class);
job.setRunningTasks(new ArrayList<>());
return job;
}
@Override
- public void onTaskFail(T task) {
+ public void onTaskFail(T task) throws JobException {
updateJobStatusIfEnd();
runningTasks.remove(task);
}
@Override
- public void onTaskSuccess(T task) {
+ public void onTaskSuccess(T task) throws JobException {
updateJobStatusIfEnd();
runningTasks.remove(task);
}
- private void updateJobStatusIfEnd() {
+
+ private void updateJobStatusIfEnd() throws JobException {
JobExecuteType executeType = getJobConfig().getExecuteType();
if (executeType.equals(JobExecuteType.MANUAL)) {
return;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
index a530ce3b2a0..ee352a0f417 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
@@ -33,23 +33,29 @@ import java.util.List;
* The job status is used to control the execution of the job.
*
* @param <T> The type of task associated with the job, extending AbstractTask.
+ * <C> The type of task context associated with the job
*/
-public interface Job<T extends AbstractTask> {
+public interface Job<T extends AbstractTask, C> {
/**
* Creates a list of tasks of the specified type for this job.
+ * you can set task context for task,
+ * eg: insert task, execute sql is insert into table select * from table1
limit ${limit}
+ * every task context is different, eg: limit 1000, limit 2000,you can set
task context to 1000,2000
+ * it's used by manual task or streaming task
*
- * @param taskType The type of tasks to create.
+ * @param taskType The type of tasks to create. @See TaskType
+ * @param taskContext The context of tasks to create.
* @return A list of tasks.
*/
- List<T> createTasks(TaskType taskType);
+ List<T> createTasks(TaskType taskType, C taskContext);
/**
* Cancels the task with the specified taskId.
*
* @param taskId The ID of the task to cancel.
* @throws JobException If the task is not in the running state, it may
have already
- * finished and cannot be cancelled.
+ * finished and cannot be cancelled.
*/
void cancelTaskById(long taskId) throws JobException;
@@ -60,7 +66,7 @@ public interface Job<T extends AbstractTask> {
*
* @return True if the job is ready for scheduling, false otherwise.
*/
- boolean isReadyForScheduling();
+ boolean isReadyForScheduling(C taskContext);
/**
* Retrieves the metadata for the job, which is used to display job
information.
@@ -103,17 +109,18 @@ public interface Job<T extends AbstractTask> {
*
* @param task The failed task.
*/
- void onTaskFail(T task);
+ void onTaskFail(T task) throws JobException;
/**
* Notifies the job when a task execution is successful.
*
* @param task The successful task.
*/
- void onTaskSuccess(T task);
+ void onTaskSuccess(T task) throws JobException;
/**
* get the job's show info, which is used to sql show the job information
+ *
* @return List<String> job common show info
*/
List<String> getShowInfo();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/common/IntervalUnit.java
b/fe/fe-core/src/main/java/org/apache/doris/job/common/IntervalUnit.java
index 4c576e986fa..dba324a81c4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/common/IntervalUnit.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/common/IntervalUnit.java
@@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Function;
public enum IntervalUnit {
-
SECOND("second", 0L, TimeUnit.SECONDS::toMillis),
MINUTE("minute", 0L, TimeUnit.MINUTES::toMillis),
HOUR("hour", 0L, TimeUnit.HOURS::toMillis),
@@ -57,7 +56,7 @@ public enum IntervalUnit {
return Arrays.stream(IntervalUnit.values())
.filter(config -> config.getUnit().equals(name))
.findFirst()
- .orElseThrow(() -> new IllegalArgumentException("Unknown
configuration " + name));
+ .orElseThrow(() -> new IllegalArgumentException("Unknown
configuration interval " + name));
}
public Long getIntervalMs(Long interval) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/common/TaskStatus.java
b/fe/fe-core/src/main/java/org/apache/doris/job/common/TaskStatus.java
index b4040d31e08..4cb401f28dd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/common/TaskStatus.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/common/TaskStatus.java
@@ -19,8 +19,8 @@ package org.apache.doris.job.common;
public enum TaskStatus {
PENDING,
- CANCEL,
+ CANCELED,
RUNNING,
SUCCESS,
- FAILD;
+ FAILED;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TimerJobEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TimerJobEvent.java
index 218fefd0419..65654c225fe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TimerJobEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TimerJobEvent.java
@@ -23,12 +23,12 @@ import com.lmax.disruptor.EventFactory;
import lombok.Data;
@Data
-public class TimerJobEvent<T extends AbstractJob<?>> {
+public class TimerJobEvent<T extends AbstractJob> {
private T job;
- public static <T extends AbstractJob<?>> EventFactory<TimerJobEvent<T>>
factory() {
+ public static <T extends AbstractJob> EventFactory<TimerJobEvent<T>>
factory() {
return TimerJobEvent::new;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DefaultTaskExecutorHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DefaultTaskExecutorHandler.java
index 0e5911a4e85..a07e248af6d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DefaultTaskExecutorHandler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DefaultTaskExecutorHandler.java
@@ -21,7 +21,7 @@ import org.apache.doris.job.disruptor.ExecuteTaskEvent;
import org.apache.doris.job.task.AbstractTask;
import com.lmax.disruptor.WorkHandler;
-import lombok.extern.slf4j.Slf4j;
+import lombok.extern.log4j.Log4j2;
/**
* DefaultTaskExecutor is an implementation of the TaskExecutor interface.
@@ -30,7 +30,7 @@ import lombok.extern.slf4j.Slf4j;
* It executes a given AbstractTask by acquiring a semaphore token from the
TaskTokenManager
* and releasing it after the task execution.
*/
-@Slf4j
+@Log4j2
public class DefaultTaskExecutorHandler<T extends AbstractTask> implements
WorkHandler<ExecuteTaskEvent<T>> {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
index 852ba13415b..cb0a393d917 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
@@ -26,8 +26,8 @@ import org.apache.doris.job.disruptor.TimerJobEvent;
import org.apache.doris.job.task.AbstractTask;
import com.lmax.disruptor.WorkHandler;
-import jline.internal.Log;
-import lombok.extern.slf4j.Slf4j;
+import lombok.extern.log4j.Log4j2;
+import org.apache.commons.collections.CollectionUtils;
import java.util.List;
import java.util.Map;
@@ -37,8 +37,8 @@ import java.util.Map;
* when job is ready for scheduling and job status is running
* we will create task and publish to task disruptor @see
DefaultTaskExecutorHandler
*/
-@Slf4j
-public class DispatchTaskHandler<T extends AbstractJob<?>> implements
WorkHandler<TimerJobEvent<T>> {
+@Log4j2
+public class DispatchTaskHandler<T extends AbstractJob> implements
WorkHandler<TimerJobEvent<T>> {
private final Map<JobType, TaskDisruptor<T>> disruptorMap;
@@ -50,19 +50,27 @@ public class DispatchTaskHandler<T extends AbstractJob<?>>
implements WorkHandle
@Override
public void onEvent(TimerJobEvent<T> event) {
try {
+ log.info("dispatch timer job, job id is {}, job name is {}",
event.getJob().getJobId(),
+ event.getJob().getJobName());
if (null == event.getJob()) {
log.info("job is null,may be job is deleted, ignore");
return;
}
- if (event.getJob().isReadyForScheduling() &&
event.getJob().getJobStatus() == JobStatus.RUNNING) {
- List<? extends AbstractTask> tasks =
event.getJob().createTasks(TaskType.SCHEDULED);
+ if (event.getJob().isReadyForScheduling(null) &&
event.getJob().getJobStatus() == JobStatus.RUNNING) {
+ List<? extends AbstractTask> tasks =
event.getJob().commonCreateTasks(TaskType.SCHEDULED, null);
+ if (CollectionUtils.isEmpty(tasks)) {
+ log.warn("job is ready for scheduling, but create task is
empty, skip scheduler,"
+ + "job id is {}," + " job name is {}",
event.getJob().getJobId(),
+ event.getJob().getJobName());
+ return;
+ }
JobType jobType = event.getJob().getJobType();
for (AbstractTask task : tasks) {
disruptorMap.get(jobType).publishEvent(task,
event.getJob().getJobConfig());
}
}
} catch (Exception e) {
- Log.warn("dispatch timer job error, task id is {}",
event.getJob().getJobId(), e);
+ log.warn("dispatch timer job error, task id is {}",
event.getJob().getJobId(), e);
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java
index 9bd4cbfeae2..74efe49beb1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java
@@ -18,15 +18,15 @@
package org.apache.doris.job.executor;
import org.apache.doris.job.base.AbstractJob;
+import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.disruptor.TaskDisruptor;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
-import jline.internal.Log;
-import lombok.extern.slf4j.Slf4j;
+import lombok.extern.log4j.Log4j2;
-@Slf4j
-public class TimerJobSchedulerTask<T extends AbstractJob<?>> implements
TimerTask {
+@Log4j2
+public class TimerJobSchedulerTask<T extends AbstractJob> implements TimerTask
{
private TaskDisruptor dispatchDisruptor;
@@ -40,9 +40,13 @@ public class TimerJobSchedulerTask<T extends AbstractJob<?>>
implements TimerTas
@Override
public void run(Timeout timeout) {
try {
+ if (!JobStatus.RUNNING.equals(job.getJobStatus())) {
+ log.info("job status is not running, job id is {}, skip
dispatch", this.job.getJobId());
+ return;
+ }
dispatchDisruptor.publishEvent(this.job);
} catch (Exception e) {
- Log.warn("dispatch timer job error, task id is {}",
this.job.getJobId(), e);
+ log.warn("dispatch timer job error, task id is {}",
this.job.getJobId(), e);
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
index 619a5c7fded..968f1413528 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
@@ -47,11 +47,12 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
@Data
@Slf4j
-public class InsertJob extends AbstractJob<InsertTask> {
+public class InsertJob extends AbstractJob<InsertTask, Map> {
public static final ImmutableList<Column> SCHEMA = ImmutableList.of(
new Column("Id", ScalarType.createStringType()),
@@ -87,11 +88,11 @@ public class InsertJob extends AbstractJob<InsertTask> {
ConcurrentLinkedQueue<Long> taskIdList;
// max save task num, do we need to config it?
- private static final int MAX_SAVE_TASK_NUM = 50;
-
+ private static final int MAX_SAVE_TASK_NUM = 100;
@Override
- public List<InsertTask> createTasks(TaskType taskType) {
+ public List<InsertTask> createTasks(TaskType taskType, Map taskContext) {
+ //nothing need to do in insert job
InsertTask task = new InsertTask(null, getCurrentDbName(),
getExecuteSql(), getCreateUser());
task.setJobId(getJobId());
task.setTaskType(taskType);
@@ -123,15 +124,15 @@ public class InsertJob extends AbstractJob<InsertTask> {
super.cancelTaskById(taskId);
}
-
@Override
- public void cancelAllTasks() throws JobException {
- super.cancelAllTasks();
+ public boolean isReadyForScheduling(Map taskContext) {
+ return CollectionUtils.isEmpty(getRunningTasks());
}
+
@Override
- public boolean isReadyForScheduling() {
- return true;
+ public void cancelAllTasks() throws JobException {
+ super.cancelAllTasks();
}
@@ -162,8 +163,9 @@ public class InsertJob extends AbstractJob<InsertTask> {
InsertTask task;
try {
task = new InsertTask(loadJob.getLabel(),
loadJob.getDb().getFullName(), null, getCreateUser());
+ task.setCreateTimeMs(loadJob.getCreateTimestamp());
} catch (MetaNotFoundException e) {
- log.warn("load job not found,job id is {}", loadJob.getId());
+ log.warn("load job not found, job id is {}", loadJob.getId());
return;
}
task.setJobId(getJobId());
@@ -195,7 +197,7 @@ public class InsertJob extends AbstractJob<InsertTask> {
}
@Override
- public void onTaskSuccess(InsertTask task) {
+ public void onTaskSuccess(InsertTask task) throws JobException {
super.onTaskSuccess(task);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
index eb319f4d5e7..69cdfe26b4b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
@@ -41,7 +41,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import lombok.Getter;
import lombok.Setter;
-import lombok.extern.slf4j.Slf4j;
+import lombok.extern.log4j.Log4j2;
import java.util.ArrayList;
import java.util.List;
@@ -52,7 +52,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
/**
* todo implement this later
*/
-@Slf4j
+@Log4j2
public class InsertTask extends AbstractTask {
public static final ImmutableList<Column> SCHEMA = ImmutableList.of(
@@ -144,8 +144,14 @@ public class InsertTask extends AbstractTask {
@Override
public void run() throws JobException {
try {
+ if (isCanceled.get()) {
+ log.info("task has been canceled, task id is {}", getTaskId());
+ return;
+ }
command.run(ctx, stmtExecutor);
} catch (Exception e) {
+ log.warn("execute insert task error, job id is {}, task id is
{},sql is {}", getJobId(),
+ getTaskId(), sql, e);
throw new JobException(e);
}
}
@@ -177,7 +183,7 @@ public class InsertTask extends AbstractTask {
@Override
public List<String> getShowInfo() {
if (null == loadJob) {
- return new ArrayList<>();
+ return getPendingTaskShowInfo();
}
List<String> jobInfo = Lists.newArrayList();
// jobId
@@ -258,4 +264,22 @@ public class InsertTask extends AbstractTask {
return trow;
}
+ // if task not start, load job is null,return pending task show info
+ private List<String> getPendingTaskShowInfo() {
+ List<String> datas = new ArrayList<>();
+
+ datas.add(String.valueOf(getTaskId()));
+ datas.add(getJobId() + "_" + getTaskId());
+ datas.add(getStatus().name());
+ datas.add(FeConstants.null_string);
+ datas.add(FeConstants.null_string);
+ datas.add(FeConstants.null_string);
+ datas.add(TimeUtils.longToTimeString(getCreateTimeMs()));
+ datas.add(FeConstants.null_string);
+ datas.add(FeConstants.null_string);
+ datas.add(FeConstants.null_string);
+ datas.add(userIdentity.getQualifiedUser());
+ return datas;
+ }
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java
index c00f97cd79f..6321679b6b2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java
@@ -47,8 +47,9 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
-public class MTMVJob extends AbstractJob<MTMVTask> {
+public class MTMVJob extends AbstractJob<MTMVTask, Map> {
private static final Logger LOG = LogManager.getLogger(MTMVJob.class);
private static final ShowResultSetMetaData JOB_META_DATA =
ShowResultSetMetaData.builder()
@@ -109,7 +110,7 @@ public class MTMVJob extends AbstractJob<MTMVTask> {
}
@Override
- public List<MTMVTask> createTasks(TaskType taskType) {
+ public List<MTMVTask> createTasks(TaskType taskType, Map taskContext) {
MTMVTask task = new MTMVTask(dbId, mtmvId);
task.setTaskType(taskType);
ArrayList<MTMVTask> tasks = new ArrayList<>();
@@ -119,7 +120,7 @@ public class MTMVJob extends AbstractJob<MTMVTask> {
}
@Override
- public boolean isReadyForScheduling() {
+ public boolean isReadyForScheduling(Map taskContext) {
return CollectionUtils.isEmpty(getRunningTasks());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index c7d04cdd282..776af152c69 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -29,7 +29,7 @@ import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.scheduler.JobScheduler;
import org.apache.doris.job.task.AbstractTask;
-import lombok.extern.slf4j.Slf4j;
+import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils;
import java.io.DataInput;
@@ -39,8 +39,8 @@ import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
-@Slf4j
-public class JobManager<T extends AbstractJob<?>> implements Writable {
+@Log4j2
+public class JobManager<T extends AbstractJob<?, C>, C> implements Writable {
private final ConcurrentHashMap<Long, T> jobMap = new
ConcurrentHashMap<>(32);
@@ -54,9 +54,9 @@ public class JobManager<T extends AbstractJob<?>> implements
Writable {
public void registerJob(T job) throws JobException {
job.checkJobParams();
- checkJobNameExist(job.getJobName(), job.getJobType());
+ checkJobNameExist(job.getJobName());
if (jobMap.get(job.getJobId()) != null) {
- throw new JobException("job id exist,jobId:" + job.getJobId());
+ throw new JobException("job id exist, jobId:" + job.getJobId());
}
Env.getCurrentEnv().getEditLog().logCreateJob(job);
jobMap.put(job.getJobId(), job);
@@ -65,9 +65,9 @@ public class JobManager<T extends AbstractJob<?>> implements
Writable {
}
- private void checkJobNameExist(String jobName, JobType type) throws
JobException {
- if (jobMap.values().stream().anyMatch(a ->
a.getJobName().equals(jobName) && a.getJobType().equals(type))) {
- throw new JobException("job name exist,jobName:" + jobName);
+ private void checkJobNameExist(String jobName) throws JobException {
+ if (jobMap.values().stream().anyMatch(a ->
a.getJobName().equals(jobName))) {
+ throw new JobException("job name exist, jobName:" + jobName);
}
}
@@ -79,13 +79,13 @@ public class JobManager<T extends AbstractJob<?>>
implements Writable {
jobMap.remove(jobId);
}
- public void unregisterJob(String jobName, JobType jobType) throws
JobException {
+ public void unregisterJob(String jobName) throws JobException {
for (T a : jobMap.values()) {
- if (a.getJobName().equals(jobName) &&
a.getJobType().equals(jobType)) {
+ if (a.getJobName().equals(jobName)) {
try {
unregisterJob(a.getJobId());
} catch (JobException e) {
- throw new JobException("unregister job error,jobName:" +
jobName);
+ throw new JobException("unregister job error, jobName:" +
jobName);
}
}
}
@@ -98,13 +98,17 @@ public class JobManager<T extends AbstractJob<?>>
implements Writable {
Env.getCurrentEnv().getEditLog().logUpdateJob(jobMap.get(jobId));
}
- public void alterJobStatus(String jobName, JobStatus jobStatus, JobType
jobType) throws JobException {
+ public void alterJobStatus(String jobName, JobStatus jobStatus) throws
JobException {
for (T a : jobMap.values()) {
- if (a.getJobName().equals(jobName) &&
jobType.equals(a.getJobType())) {
+ if (a.getJobName().equals(jobName)) {
try {
+ if (jobStatus.equals(JobStatus.STOPPED)) {
+ unregisterJob(a.getJobId());
+ return;
+ }
alterJobStatus(a.getJobId(), jobStatus);
} catch (JobException e) {
- throw new JobException("unregister job error,jobName:" +
jobName);
+ throw new JobException("unregister job error, jobName:" +
jobName);
}
}
}
@@ -112,7 +116,7 @@ public class JobManager<T extends AbstractJob<?>>
implements Writable {
private void checkJobExist(Long jobId) throws JobException {
if (null == jobMap.get(jobId)) {
- throw new JobException("job not exist,jobId:" + jobId);
+ throw new JobException("job not exist, jobId:" + jobId);
}
}
@@ -129,6 +133,7 @@ public class JobManager<T extends AbstractJob<?>>
implements Writable {
/**
* query jobs by job type
+ *
* @param jobTypes @JobType
* @return List<AbstractJob> job list
*/
@@ -156,12 +161,12 @@ public class JobManager<T extends AbstractJob<?>>
implements Writable {
public List<? extends AbstractTask> queryTasks(Long jobId) throws
JobException {
checkJobExist(jobId);
- return jobMap.get(jobId).queryTasks();
+ return jobMap.get(jobId).queryAllTasks();
}
- public void triggerJob(long jobId) throws JobException {
+ public void triggerJob(long jobId, C context) throws JobException {
checkJobExist(jobId);
- jobScheduler.schedulerInstantJob(jobMap.get(jobId), TaskType.MANUAL);
+ jobScheduler.schedulerInstantJob(jobMap.get(jobId), TaskType.MANUAL,
context);
}
public void replayCreateJob(T job) {
@@ -191,11 +196,14 @@ public class JobManager<T extends AbstractJob<?>>
implements Writable {
.add("msg", "replay delete scheduler job").build());
}
- void cancelTask(Long jobId, Long taskId) throws JobException {
- checkJobExist(jobId);
- if (null == jobMap.get(jobId).getRunningTasks()) {
- throw new JobException("task not exist,taskId:" + taskId);
+ public void cancelTaskById(String jobName, Long taskId) throws
JobException {
+ for (T job : jobMap.values()) {
+ if (job.getJobName().equals(jobName)) {
+ job.cancelTaskById(taskId);
+ return;
+ }
}
+ throw new JobException("job not exist, jobName:" + jobName);
}
@Override
@@ -205,7 +213,7 @@ public class JobManager<T extends AbstractJob<?>>
implements Writable {
try {
job.write(out);
} catch (IOException e) {
- log.error("write job error,jobId:" + jobId, e);
+ log.error("write job error, jobId:" + jobId, e);
}
});
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java
index d07a109fc53..4e31e467013 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java
@@ -46,7 +46,7 @@ public class TaskDisruptorGroupManager<T extends
AbstractTask> {
private final Map<JobType, TaskDisruptor<T>> disruptorMap = new
EnumMap<>(JobType.class);
@Getter
- private TaskDisruptor<TimerJobEvent<AbstractJob<?>>> dispatchDisruptor;
+ private TaskDisruptor<TimerJobEvent<AbstractJob>> dispatchDisruptor;
private static final int DEFAULT_RING_BUFFER_SIZE = 1024;
@@ -76,14 +76,14 @@ public class TaskDisruptorGroupManager<T extends
AbstractTask> {
}
private void registerDispatchDisruptor() {
- EventFactory<TimerJobEvent<AbstractJob<T>>> dispatchEventFactory =
TimerJobEvent.factory();
+ EventFactory<TimerJobEvent<AbstractJob>> dispatchEventFactory =
TimerJobEvent.factory();
ThreadFactory dispatchThreadFactory = new
CustomThreadFactory("dispatch-task");
WorkHandler[] dispatchTaskExecutorHandlers = new
WorkHandler[DISPATCH_TIMER_JOB_CONSUMER_THREAD_NUM];
for (int i = 0; i < DISPATCH_TIMER_JOB_CONSUMER_THREAD_NUM; i++) {
dispatchTaskExecutorHandlers[i] = new
DispatchTaskHandler(this.disruptorMap);
}
- EventTranslatorVararg<TimerJobEvent<AbstractJob<T>>> eventTranslator =
- (event, sequence, args) -> event.setJob((AbstractJob<T>)
args[0]);
+ EventTranslatorVararg<TimerJobEvent<AbstractJob>> eventTranslator =
+ (event, sequence, args) -> event.setJob((AbstractJob) args[0]);
this.dispatchDisruptor = new TaskDisruptor<>(dispatchEventFactory,
DISPATCH_TIMER_JOB_QUEUE_SIZE,
dispatchThreadFactory,
new BlockingWaitStrategy(), dispatchTaskExecutorHandlers,
eventTranslator);
@@ -123,7 +123,7 @@ public class TaskDisruptorGroupManager<T extends
AbstractTask> {
disruptorMap.put(JobType.MV, mtmvDisruptor);
}
- public void dispatchTimerJob(AbstractJob<T> job) {
+ public void dispatchTimerJob(AbstractJob job) {
dispatchDisruptor.publishEvent(job);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskTokenManager.java
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskTokenManager.java
index 51b8b6aeb36..877cb306916 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskTokenManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskTokenManager.java
@@ -18,7 +18,7 @@
package org.apache.doris.job.manager;
import lombok.experimental.UtilityClass;
-import lombok.extern.slf4j.Slf4j;
+import lombok.extern.log4j.Log4j2;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -29,7 +29,7 @@ import java.util.concurrent.Semaphore;
* It provides a method to acquire a semaphore token for a specific job ID
with the given maximum concurrency.
* If a semaphore doesn't exist for the job ID, it creates a new one and adds
it to the map.
*/
-@Slf4j
+@Log4j2
@UtilityClass
public class TaskTokenManager {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
index c701c9fd1f8..47e91d97b49 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
@@ -17,6 +17,8 @@
package org.apache.doris.job.scheduler;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
import org.apache.doris.common.CustomThreadFactory;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.base.AbstractJob;
@@ -30,7 +32,7 @@ import org.apache.doris.job.manager.TaskDisruptorGroupManager;
import org.apache.doris.job.task.AbstractTask;
import io.netty.util.HashedWheelTimer;
-import lombok.extern.slf4j.Slf4j;
+import lombok.extern.log4j.Log4j2;
import org.apache.commons.collections.CollectionUtils;
import java.io.Closeable;
@@ -39,8 +41,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-@Slf4j
-public class JobScheduler<T extends AbstractJob<?>> implements Closeable {
+@Log4j2
+public class JobScheduler<T extends AbstractJob<?, C>, C> implements Closeable
{
/**
* scheduler tasks, it's used to scheduler job
@@ -68,6 +70,13 @@ public class JobScheduler<T extends AbstractJob<?>>
implements Closeable {
*/
private static final long BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS =
BATCH_SCHEDULER_INTERVAL_SECONDS * 1000L;
+ /**
+ * Finished job will be cleared after 24 hours
+ */
+ private static final long FINISHED_JOB_CLEANUP_THRESHOLD_TIME_MS =
+ (Config.finished_job_cleanup_threshold_time_hour > 0
+ ? Config.finished_job_cleanup_threshold_time_hour : 24) *
3600 * 1000L;
+
public void start() {
timerTaskScheduler = new HashedWheelTimer(new
CustomThreadFactory("timer-task-scheduler"), 1,
TimeUnit.SECONDS, HASHED_WHEEL_TIMER_TICKS_PER_WHEEL);
@@ -105,20 +114,20 @@ public class JobScheduler<T extends AbstractJob<?>>
implements Closeable {
//manual job will not scheduler
if
(JobExecuteType.MANUAL.equals(job.getJobConfig().getExecuteType())) {
if (job.getJobConfig().isImmediate()) {
- schedulerInstantJob(job, TaskType.MANUAL);
+ schedulerInstantJob(job, TaskType.MANUAL, null);
}
return;
}
//todo skip streaming job,improve in the future
if
(JobExecuteType.INSTANT.equals(job.getJobConfig().getExecuteType())) {
- schedulerInstantJob(job, TaskType.SCHEDULED);
+ schedulerInstantJob(job, TaskType.SCHEDULED, null);
}
}
//RECURRING job and immediate is true
if (job.getJobConfig().isImmediate()) {
job.getJobConfig().getTimerDefinition().setLatestSchedulerTimeMs(System.currentTimeMillis());
- schedulerInstantJob(job, TaskType.SCHEDULED);
+ schedulerInstantJob(job, TaskType.SCHEDULED, null);
}
//if it's timer job and trigger last window already start, we will
scheduler it immediately
cycleTimerJobScheduler(job);
@@ -142,16 +151,11 @@ public class JobScheduler<T extends AbstractJob<?>>
implements Closeable {
}
- public void schedulerInstantJob(T job, TaskType taskType) throws
JobException {
- if (!job.getJobStatus().equals(JobStatus.RUNNING)) {
- throw new JobException("job is not running,job id is %d",
job.getJobId());
- }
- if (!job.isReadyForScheduling()) {
- log.info("job is not ready for scheduling,job id is {}",
job.getJobId());
- return;
- }
- List<? extends AbstractTask> tasks = job.createTasks(taskType);
+ public void schedulerInstantJob(T job, TaskType taskType, C context) {
+ List<? extends AbstractTask> tasks = job.commonCreateTasks(taskType,
context);
if (CollectionUtils.isEmpty(tasks)) {
+ log.info("job create task is empty, skip scheduler, job id is
{},job name is {}", job.getJobId(),
+ job.getJobName());
if
(job.getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
job.setJobStatus(JobStatus.FINISHED);
}
@@ -166,19 +170,34 @@ public class JobScheduler<T extends AbstractJob<?>>
implements Closeable {
* We will get the task in the next time window, and then hand it over to
the time wheel for timing trigger
*/
private void executeTimerJobIdsWithinLastTenMinutesWindow() {
- if (jobMap.isEmpty()) {
- return;
- }
if (latestBatchSchedulerTimerTaskTimeMs < System.currentTimeMillis()) {
this.latestBatchSchedulerTimerTaskTimeMs =
System.currentTimeMillis();
}
this.latestBatchSchedulerTimerTaskTimeMs +=
BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS;
+ if (jobMap.isEmpty()) {
+ return;
+ }
for (Map.Entry<Long, T> entry : jobMap.entrySet()) {
T job = entry.getValue();
- if (!job.getJobConfig().checkIsTimerJob()) {
+ if (job.getJobStatus().equals(JobStatus.FINISHED)) {
+ clearFinishedJob(job);
+ continue;
+ }
+ if (!job.getJobStatus().equals(JobStatus.RUNNING) &&
!job.getJobConfig().checkIsTimerJob()) {
continue;
}
cycleTimerJobScheduler(job);
}
}
+
+ private void clearFinishedJob(T job) {
+ if (job.getFinishTimeMs() + FINISHED_JOB_CLEANUP_THRESHOLD_TIME_MS <
System.currentTimeMillis()) {
+ return;
+ }
+ try {
+ Env.getCurrentEnv().getJobManager().unregisterJob(job.getJobId());
+ } catch (JobException e) {
+ log.error("clear finish job error, job id is {}", job.getJobId(),
e);
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
index efe38b70136..654ee4fbc2f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
@@ -26,10 +26,10 @@ import org.apache.doris.job.exception.JobException;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
-import lombok.extern.slf4j.Slf4j;
+import lombok.extern.log4j.Log4j2;
@Data
-@Slf4j
+@Log4j2
public abstract class AbstractTask implements Task {
@SerializedName(value = "jid")
@@ -49,9 +49,12 @@ public abstract class AbstractTask implements Task {
@SerializedName(value = "tt")
private TaskType taskType;
+ @SerializedName(value = "emg")
+ private String errMsg;
+
@Override
- public void onFail(String msg) {
- status = TaskStatus.FAILD;
+ public void onFail(String msg) throws JobException {
+ status = TaskStatus.FAILED;
if (!isCallable()) {
return;
}
@@ -60,10 +63,10 @@ public abstract class AbstractTask implements Task {
@Override
public void onFail() throws JobException {
- if (TaskStatus.CANCEL.equals(status)) {
+ if (TaskStatus.CANCELED.equals(status)) {
return;
}
- status = TaskStatus.FAILD;
+ status = TaskStatus.FAILED;
setFinishTimeMs(System.currentTimeMillis());
if (!isCallable()) {
return;
@@ -73,7 +76,7 @@ public abstract class AbstractTask implements Task {
}
private boolean isCallable() {
- if (status.equals(TaskStatus.CANCEL)) {
+ if (status.equals(TaskStatus.CANCELED)) {
return false;
}
if (null != Env.getCurrentEnv().getJobManager().getJob(jobId)) {
@@ -84,6 +87,9 @@ public abstract class AbstractTask implements Task {
@Override
public void onSuccess() throws JobException {
+ if (TaskStatus.CANCELED.equals(status)) {
+ return;
+ }
status = TaskStatus.SUCCESS;
setFinishTimeMs(System.currentTimeMillis());
if (!isCallable()) {
@@ -99,7 +105,7 @@ public abstract class AbstractTask implements Task {
@Override
public void cancel() throws JobException {
- status = TaskStatus.CANCEL;
+ status = TaskStatus.CANCELED;
}
@Override
@@ -115,12 +121,12 @@ public abstract class AbstractTask implements Task {
onSuccess();
} catch (Exception e) {
onFail();
- log.warn("execute task error, job id is {},task id is {}", jobId,
taskId, e);
+ log.warn("execute task error, job id is {}, task id is {}", jobId,
taskId, e);
}
}
public boolean isCancelled() {
- return status.equals(TaskStatus.CANCEL);
+ return status.equals(TaskStatus.CANCELED);
}
public String getJobName() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java
b/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java
index b13d22ff665..48ecf9c29e8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java
@@ -54,7 +54,7 @@ public interface Task {
*
* @param msg The error message associated with the failure.
*/
- void onFail(String msg);
+ void onFail(String msg) throws JobException;
/**
* This method is called when the task executes successfully.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
index df1e3dbe07b..416f7f764b6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
@@ -52,6 +52,7 @@ public class MTMVJobManager implements MTMVHookService {
/**
* create MTMVJob
+ *
* @param mtmv
* @throws DdlException
*/
@@ -112,6 +113,7 @@ public class MTMVJobManager implements MTMVHookService {
/**
* drop MTMVJob
+ *
* @param mtmv
* @throws DdlException
*/
@@ -142,6 +144,7 @@ public class MTMVJobManager implements MTMVHookService {
/**
* drop MTMVJob and then create MTMVJob
+ *
* @param mtmv
* @param alterMTMV
* @throws DdlException
@@ -156,6 +159,7 @@ public class MTMVJobManager implements MTMVHookService {
/**
* trigger MTMVJob
+ *
* @param info
* @throws DdlException
* @throws MetaNotFoundException
@@ -170,7 +174,7 @@ public class MTMVJobManager implements MTMVHookService {
throw new DdlException("jobs not normal,should have one job,but
job num is: " + jobs.size());
}
try {
-
Env.getCurrentEnv().getJobManager().triggerJob(jobs.get(0).getJobId());
+
Env.getCurrentEnv().getJobManager().triggerJob(jobs.get(0).getJobId(), null);
} catch (JobException e) {
e.printStackTrace();
throw new DdlException(e.getMessage());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java
index c4f286ba5cd..9b16497d3c5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java
@@ -588,4 +588,8 @@ public class InsertExecutor {
throw new AnalysisException(e.getMessage(), e);
}
}
+
+ public Coordinator getCoordinator() {
+ return coordinator;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
index 62a8f4d95c6..c17f215e757 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
@@ -162,6 +162,9 @@ public class InsertIntoTableCommand extends Command
implements ForwardWithSync,
insertExecutor.finalizeSink(sink,
physicalOlapTableSink.isPartialUpdate(),
physicalOlapTableSink.isFromNativeInsertStmt());
executor.setProfileType(ProfileType.LOAD);
+ // We exposed @StmtExecutor#cancel as a unified entry point for
statement interruption
+ // so we need to set this here
+ executor.setCoord(insertExecutor.getCoordinator());
insertExecutor.executeSingleInsertTransaction(executor, jobId);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index 5a2bae1ee05..7b41715a425 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -37,6 +37,7 @@ import org.apache.doris.analysis.AlterColumnStatsStmt;
import org.apache.doris.analysis.AlterDatabasePropertyStmt;
import org.apache.doris.analysis.AlterDatabaseQuotaStmt;
import org.apache.doris.analysis.AlterDatabaseRename;
+import org.apache.doris.analysis.AlterJobStatusStmt;
import org.apache.doris.analysis.AlterPolicyStmt;
import org.apache.doris.analysis.AlterResourceStmt;
import org.apache.doris.analysis.AlterRoutineLoadStmt;
@@ -51,6 +52,7 @@ import org.apache.doris.analysis.CancelAlterSystemStmt;
import org.apache.doris.analysis.CancelAlterTableStmt;
import org.apache.doris.analysis.CancelBackupStmt;
import org.apache.doris.analysis.CancelExportStmt;
+import org.apache.doris.analysis.CancelJobTaskStmt;
import org.apache.doris.analysis.CancelLoadStmt;
import org.apache.doris.analysis.CleanLabelStmt;
import org.apache.doris.analysis.CleanProfileStmt;
@@ -95,7 +97,6 @@ import org.apache.doris.analysis.DropWorkloadGroupStmt;
import org.apache.doris.analysis.GrantStmt;
import org.apache.doris.analysis.InstallPluginStmt;
import org.apache.doris.analysis.KillAnalysisJobStmt;
-import org.apache.doris.analysis.PauseJobStmt;
import org.apache.doris.analysis.PauseRoutineLoadStmt;
import org.apache.doris.analysis.PauseSyncJobStmt;
import org.apache.doris.analysis.RecoverDbStmt;
@@ -106,12 +107,10 @@ import org.apache.doris.analysis.RefreshDbStmt;
import org.apache.doris.analysis.RefreshLdapStmt;
import org.apache.doris.analysis.RefreshTableStmt;
import org.apache.doris.analysis.RestoreStmt;
-import org.apache.doris.analysis.ResumeJobStmt;
import org.apache.doris.analysis.ResumeRoutineLoadStmt;
import org.apache.doris.analysis.ResumeSyncJobStmt;
import org.apache.doris.analysis.RevokeStmt;
import org.apache.doris.analysis.SetUserPropertyStmt;
-import org.apache.doris.analysis.StopJobStmt;
import org.apache.doris.analysis.StopRoutineLoadStmt;
import org.apache.doris.analysis.StopSyncJobStmt;
import org.apache.doris.analysis.SyncStmt;
@@ -121,8 +120,6 @@ import org.apache.doris.catalog.EncryptKeyHelper;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.util.ProfileManager;
-import org.apache.doris.job.common.JobStatus;
-import org.apache.doris.job.common.JobType;
import org.apache.doris.load.sync.SyncJobManager;
import org.apache.doris.persist.CleanQueryStatsInfo;
import org.apache.doris.statistics.StatisticsRepository;
@@ -190,24 +187,17 @@ public class DdlExecutor {
} catch (Exception e) {
throw new DdlException(e.getMessage());
}
- } else if (ddlStmt instanceof StopJobStmt) {
- StopJobStmt stmt = (StopJobStmt) ddlStmt;
+ } else if (ddlStmt instanceof AlterJobStatusStmt) {
+ AlterJobStatusStmt stmt = (AlterJobStatusStmt) ddlStmt;
try {
- env.getJobManager().unregisterJob(stmt.getName(),
JobType.INSERT);
+ env.getJobManager().alterJobStatus(stmt.getJobName(),
stmt.getJobStatus());
} catch (Exception e) {
throw new DdlException(e.getMessage());
}
- } else if (ddlStmt instanceof PauseJobStmt) {
- PauseJobStmt stmt = (PauseJobStmt) ddlStmt;
+ } else if (ddlStmt instanceof CancelJobTaskStmt) {
+ CancelJobTaskStmt stmt = (CancelJobTaskStmt) ddlStmt;
try {
- env.getJobManager().alterJobStatus(stmt.getName(),
JobStatus.PAUSED, JobType.INSERT);
- } catch (Exception e) {
- throw new DdlException(e.getMessage());
- }
- } else if (ddlStmt instanceof ResumeJobStmt) {
- ResumeJobStmt stmt = (ResumeJobStmt) ddlStmt;
- try {
- env.getJobManager().alterJobStatus(stmt.getName(),
JobStatus.RUNNING, JobType.INSERT);
+ env.getJobManager().cancelTaskById(stmt.getJobName(),
stmt.getTaskId());
} catch (Exception e) {
throw new DdlException(e.getMessage());
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index 430f790ecd3..fd8c4896f48 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -1429,7 +1429,7 @@ public class ShowExecutor {
return;
}
org.apache.doris.job.base.AbstractJob job = jobs.get(0);
- List<AbstractTask> jobTasks = job.queryTasks();
+ List<AbstractTask> jobTasks = job.queryAllTasks();
if (CollectionUtils.isEmpty(jobTasks)) {
resultSet = new ShowResultSet(job.getTaskMetaData(), rows);
return;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index dbb9b3bd57a..29e7aaa3b9c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -177,6 +177,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
+import lombok.Setter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
@@ -214,6 +215,8 @@ public class StmtExecutor {
private StatementBase parsedStmt;
private Analyzer analyzer;
private ProfileType profileType = ProfileType.QUERY;
+
+ @Setter
private volatile Coordinator coord = null;
private MasterOpExecutor masterOpExecutor = null;
private RedirectStatus redirectStatus = null;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
index eadc6c567d3..87d1430375a 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
@@ -48,22 +48,22 @@ public class JobExecutionConfigurationTest {
configuration.setExecuteType(JobExecuteType.RECURRING);
TimerDefinition timerDefinition = new TimerDefinition();
- timerDefinition.setStartTimeMs(1000L); // Start time set to 1 second
in the future
+ timerDefinition.setStartTimeMs(100000L); // Start time set to 1 second
in the future
timerDefinition.setInterval(10L); // Interval set to 10 milliseconds
- timerDefinition.setIntervalUnit(IntervalUnit.SECOND);
+ timerDefinition.setIntervalUnit(IntervalUnit.MINUTE);
configuration.setTimerDefinition(timerDefinition);
List<Long> delayTimes = configuration.getTriggerDelayTimes(
- 0L, 0L, 11000L);
+ 0L, 0L, 1100000L);
Assertions.assertEquals(2, delayTimes.size());
- Assertions.assertArrayEquals(new Long[]{1L, 11L},
delayTimes.toArray());
+ Assertions.assertArrayEquals(new Long[]{100L, 700L},
delayTimes.toArray());
delayTimes = configuration.getTriggerDelayTimes(
- 2000L, 0L, 11000L);
+ 200000L, 0L, 1100000L);
Assertions.assertEquals(1, delayTimes.size());
- Assertions.assertArrayEquals(new Long[]{ 9L}, delayTimes.toArray());
+ Assertions.assertArrayEquals(new Long[]{ 500L}, delayTimes.toArray());
delayTimes = configuration.getTriggerDelayTimes(
- 1001L, 0L, 10000L);
+ 1001000L, 0L, 1000000L);
Assertions.assertEquals(0, delayTimes.size());
}
diff --git a/regression-test/pipeline/p0/conf/fe.conf
b/regression-test/pipeline/p0/conf/fe.conf
index df5890b5d12..fadfe8066be 100644
--- a/regression-test/pipeline/p0/conf/fe.conf
+++ b/regression-test/pipeline/p0/conf/fe.conf
@@ -95,3 +95,7 @@ edit_log_roll_num = 1000
history_job_keep_max_second = 300
streaming_label_keep_max_second = 300
label_keep_max_second = 300
+
+# job test configurations
+#allows the creation of jobs with an interval of second
+enable_job_schedule_second_for_test = true
diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy
b/regression-test/suites/job_p0/test_base_insert_job.groovy
index 76d38460ffb..adde94dc6f0 100644
--- a/regression-test/suites/job_p0/test_base_insert_job.groovy
+++ b/regression-test/suites/job_p0/test_base_insert_job.groovy
@@ -25,7 +25,7 @@ suite("test_base_insert_job") {
def jobName = "insert_recovery_test_base_insert_job"
sql """drop table if exists `${tableName}` force"""
sql """
- STOP JOB for ${jobName}
+ DROP JOB where jobname = '${jobName}'
"""
sql """
@@ -49,7 +49,7 @@ suite("test_base_insert_job") {
println jobs
assert 3>=jobs.size() >= (2 as Boolean) //at least 2 records, some times 3
records
sql """
- STOP JOB for ${jobName}
+ DROP JOB where jobname = '${jobName}'
"""
sql """drop table if exists `${tableName}` force """
sql """
@@ -70,42 +70,76 @@ suite("test_base_insert_job") {
def formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
def startTime= dateTime.format(formatter);
+ def dataCount = sql """select count(*) from ${tableName}"""
+ assert dataCount.get(0).get(0) == 0
sql """
- CREATE JOB ${jobName} ON SCHEDULER at '${startTime}' comment
'test' DO insert into ${tableName} (timestamp, type, user_id) values
('2023-03-18','1','12213');
+ CREATE JOB ${jobName} ON SCHEDULER at '${startTime}' comment
'test for test&68686781jbjbhj//ncsa' DO insert into ${tableName} values
('2023-07-19', sleep(1000), 1001);
"""
- Thread.sleep(2500)
-
- def datas = sql """select * from ${tableName}"""
+ Thread.sleep(3000)
+
+ // test cancel task
+ def datas = sql """show job tasks for ${jobName}"""
println datas
- //assert datas.size() == 1
+ assert datas.size() == 1
+ println datas.get(0).get(2)
+ assert datas.get(0).get(2) == "RUNNING"
+ def taskId = datas.get(0).get(0)
+ sql """cancel task where jobName='${jobName}' and taskId= ${taskId}"""
+ def cancelTask = sql """ show job tasks for ${jobName}"""
+ println cancelTask
+ //check task status
+ assert cancelTask.size() == 1
+ assert cancelTask.get(0).get(2) == "CANCELED"
+ // check table data
+ def dataCount1 = sql """select count(*) from ${tableName}"""
+ assert dataCount1.get(0).get(0) == 0
+ // check job status
+ def oncejob=sql """show job for ${jobName} """
+ println oncejob
+ assert oncejob.get(0).get(5) == "FINISHED"
+ //assert comment
+ println oncejob.get(0).get(8)
+ //check comment
+ assert oncejob.get(0).get(8) == "test for test&68686781jbjbhj//ncsa"
+
try{
sql """
CREATE JOB ${jobName} ON SCHEDULER at '${startTime}' comment
'test' DO insert into ${tableName} (timestamp, type, user_id) values
('2023-03-18','1','12213');
"""
} catch (Exception e) {
- assert true
+ assert e.getMessage().contains("startTimeMs must be greater than
current time")
}
sql """
- STOP JOB for test_one_time_error_starts
+ DROP JOB where jobname = 'test_one_time_error_starts'
"""
try{
sql """
CREATE JOB test_one_time_error_starts ON SCHEDULER at '2023-11-13
14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type,
user_id) values ('2023-03-18','1','12213');
"""
} catch (Exception e) {
- assert true
+ assert e.getMessage().contains("startTimeMs must be greater than
current time")
}
sql """
- STOP JOB for test_error_starts
+ DROP JOB where jobname = 'test_error_starts'
"""
try{
sql """
CREATE JOB test_error_starts ON SCHEDULER every 1 second ends
'2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp,
type, user_id) values ('2023-03-18','1','12213');
"""
} catch (Exception e) {
- assert true
+ assert e.getMessage().contains("end time cannot be less than start
time")
}
-
+ sql """
+ DROP JOB where jobname = 'test_error_starts'
+ """
+ try{
+ sql """
+ CREATE JOB test_error_starts ON SCHEDULER every 1 years ends
'2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp,
type, user_id) values ('2023-03-18','1','12213');
+ """
+ } catch (Exception e) {
+ assert e.getMessage().contains("interval time unit can not be years")
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]