This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new cbf1f8620a4 [Feature](job)support cancel task and fix log invalid 
(#27703)
cbf1f8620a4 is described below

commit cbf1f8620a4be7117d81edaf0a130d5a5ac19c67
Author: Calvin Kirs <[email protected]>
AuthorDate: Wed Dec 6 10:44:09 2023 +0800

    [Feature](job)support cancel task and fix log invalid (#27703)
    
    - Running task can be show and fix cancel fail
    - When the insert task scheduling cycle is reached, if there are still 
tasks running, the scheduling of this task will be canceled at this time.
    - refactor job status changes SQL
    - Fix timer job window error
    - Support cancel task
---
 .../Create/CREATE-JOB.md                           | 48 ++++++++++++--
 .../main/java/org/apache/doris/common/Config.java  | 13 +++-
 fe/fe-core/src/main/cup/sql_parser.cup             | 34 ++++++----
 ...{ResumeJobStmt.java => AlterJobStatusStmt.java} | 52 +++++++--------
 .../apache/doris/analysis/CancelJobTaskStmt.java   | 72 ++++++++++++++++++++
 .../org/apache/doris/analysis/CreateJobStmt.java   | 30 ++++++++-
 .../org/apache/doris/analysis/PauseJobStmt.java    | 69 -------------------
 .../org/apache/doris/analysis/StopJobStmt.java     | 49 --------------
 .../main/java/org/apache/doris/catalog/Env.java    |  2 +-
 .../org/apache/doris/job/base/AbstractJob.java     | 77 ++++++++++++++++++----
 .../main/java/org/apache/doris/job/base/Job.java   | 21 ++++--
 .../org/apache/doris/job/common/IntervalUnit.java  |  3 +-
 .../org/apache/doris/job/common/TaskStatus.java    |  4 +-
 .../apache/doris/job/disruptor/TimerJobEvent.java  |  4 +-
 .../job/executor/DefaultTaskExecutorHandler.java   |  4 +-
 .../doris/job/executor/DispatchTaskHandler.java    | 22 +++++--
 .../doris/job/executor/TimerJobSchedulerTask.java  | 14 ++--
 .../doris/job/extensions/insert/InsertJob.java     | 24 +++----
 .../doris/job/extensions/insert/InsertTask.java    | 30 ++++++++-
 .../apache/doris/job/extensions/mtmv/MTMVJob.java  |  7 +-
 .../org/apache/doris/job/manager/JobManager.java   | 54 ++++++++-------
 .../job/manager/TaskDisruptorGroupManager.java     | 10 +--
 .../apache/doris/job/manager/TaskTokenManager.java |  4 +-
 .../apache/doris/job/scheduler/JobScheduler.java   | 57 ++++++++++------
 .../org/apache/doris/job/task/AbstractTask.java    | 26 +++++---
 .../main/java/org/apache/doris/job/task/Task.java  |  2 +-
 .../java/org/apache/doris/mtmv/MTMVJobManager.java |  6 +-
 .../trees/plans/commands/InsertExecutor.java       |  4 ++
 .../plans/commands/InsertIntoTableCommand.java     |  3 +
 .../main/java/org/apache/doris/qe/DdlExecutor.java | 26 +++-----
 .../java/org/apache/doris/qe/ShowExecutor.java     |  2 +-
 .../java/org/apache/doris/qe/StmtExecutor.java     |  3 +
 .../job/base/JobExecutionConfigurationTest.java    | 14 ++--
 regression-test/pipeline/p0/conf/fe.conf           |  4 ++
 .../suites/job_p0/test_base_insert_job.groovy      | 60 +++++++++++++----
 35 files changed, 529 insertions(+), 325 deletions(-)

diff --git 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md
 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md
index c06d48a4d01..1202bbff423 100644
--- 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md
+++ 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md
@@ -34,12 +34,28 @@ CREATE JOB
 ### Description
 
 Doris Job 是根据既定计划运行的任务,用于在特定时间或指定时间间隔触发预定义的操作,从而帮助我们自动执行一些任务。从功能上来讲,它类似于操作系统上的
-定时任务(如:Linux 中的 cron、Windows 中的计划任务)。但 Doris 的 Job 调度可以精确到秒级。
+定时任务(如:Linux 中的 cron、Windows 中的计划任务)。
 
-Job 有两种类型:`ONE_TIME` 和 `BATCH`。其中 `ONE_TIME` 类型的 Job 会在指定的时间点触发,它主要用于一次性任务,而 
`BATCH` 类型的 Job 会在指定的时间间隔内循环触发。
-主要用于周期性执行的任务。
+Job 有两种类型:`ONE_TIME` 和 `RECURRING`。其中 `ONE_TIME` 类型的 Job 
会在指定的时间点触发,它主要用于一次性任务,而 `RECURRING` 类型的 Job 会在指定的时间间隔内循环触发, 此方式主要用于周期性执行的任务。
+`RECURRING` 类型的 Job 可指定开始时间,结束时间,即 `STARTS\ENDS`, 如果不指定开始时间,则默认首次执行时间为当前时间 + 
一次调度周期。如果指定结束时间,则 task 执行完成如果达到结束时间(或超过,或下次执行周期会超过结束时间)则更新为FINISHED状态,此时不会再产生 
Task。
+
+JOB 
共4种状态(`RUNNING`,`STOPPED`,`PAUSED`,`FINISHED`,),初始状态为RUNNING,RUNNING状态的JOB会根据既定的调度周期去生成
 TASK 执行,Job 执行完成达到结束时间则状态变更为 `FINISHED`.
+
+RUNNING 状态的JOB 可以被 pause,即暂停,此时不会再生成 Task。
+
+PAUSE状态的 JOB 可以通过 RESUME 操作来恢复运行,更改为RUNNING状态。
+
+STOP 状态的 JOB 由用户主动触发,此时会 Cancel 正在运行中的作业,然后删除 JOB。
+
+Finished 状态的 JOB 会保留在系统中 24 H,24H 后会被删除。
+
+JOB 只描述作业信息, 执行会生成 TASK, TASK 状态分为 PENDING,RUNNING,SUCCEESS,FAILED,CANCELED
+PENDING 表示到达触发时间了但是等待资源 RUN, 分配到资源后状态变更为RUNNING ,执行成功/失败即变更为 SUCCESS/FAILED. 
+CANCELED 即取消状态 ,TASK持久化最终状态,即SUCCESS/FAILED,其他状态运行中可以查到,但是如果重启则不可见。TASK只保留最新的 
100 条记录。
+
+- 目前仅支持 ***ADMIN*** 权限执行此操作。
+- 目前仅支持 ***INSERT 内表*** 
 
-目前仅支持 ***ADMIN*** 权限执行此操作。
 
  语法:
 
@@ -59,8 +75,7 @@ schedule: {
 }
 
 interval:
-    quantity { DAY | HOUR | MINUTE |
-              WEEK | SECOND }
+    quantity { WEEK | DAY | HOUR | MINUTE }
 ```
 
 一条有效的 Job 语句必须包含以下内容
@@ -88,7 +103,7 @@ SCHEDULER 语句用于定义作业的执行时间,频率以及持续时间,
     
     - interval
   
-    用于指定作业执行频率,它可以是天、小时、分钟、秒、周。例如:` 1 DAY` 表示每天执行一次,` 1 HOUR` 表示每小时执行一次,` 1 
MINUTE` 表示每分钟执行一次,` 1 WEEK` 表示每周执行一次,` 1 SECOND` 表示每秒执行一次。
+    用于指定作业执行频率,它可以是天、小时、分钟、周。例如:` 1 DAY` 表示每天执行一次,` 1 HOUR` 表示每小时执行一次,` 1 
MINUTE` 表示每分钟执行一次,` 1 WEEK` 表示每周执行一次。
 
     - STARTS timestamp
 
@@ -122,6 +137,25 @@ CREATE JOB my_job ON SCHEDULE EVERY 1 DAY STARTS 
'2020-01-01 00:00:00' DO INSERT
 ```sql
 CREATE JOB my_job ON SCHEDULER EVERY 1 DAY STARTS '2020-01-01 00:00:00' ENDS 
'2020-01-01 00:10:00' DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2 
create_time >=  days_add(now(),-1);
 ```
+### INSERT JOB
+目前仅支持 ***INSERT 内表***
+
+### CONFIG
+
+fe.conf
+
+- job_dispatch_timer_job_thread_num, 用于分发定时任务的线程数, 默认值2,如果含有大量周期执行任务,可以调大这个参数。
+
+- job_dispatch_timer_job_queue_size, 任务堆积时用于存放定时任务的队列大小,默认值 1024. 
如果有大量任务同一时间触发,可以调大这个参数。否则会导致队列满,提交任务会进入阻塞状态,从而导致后续任务无法提交。
+
+- finished_job_cleanup_threshold_time_hour, 用于清理已完成的任务的时间阈值,单位为小时,默认值为24小时。
+
+- job_insert_task_consumer_thread_num = 10;用于执行 Insert 任务的线程数, 值应该大于0,否则默认为5.
+
+### Best Practice
+
+合理的进行 Job 的管理,避免大量的 Job 同时触发,导致任务堆积,从而影响系统的正常运行。
+任务的执行间隔应该设置在一个合理的范围,至少应该大于任务执行时间。
 
 ### Keywords
 
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index d6be0fb9190..d350a44c9fa 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1570,7 +1570,7 @@ public class Config extends ConfigBase {
      */
     @ConfField(description = {"用于分发定时任务的线程数",
             "The number of threads used to dispatch timer job."})
-    public static int job_dispatch_timer_job_thread_num = 5;
+    public static int job_dispatch_timer_job_thread_num = 2;
 
     /**
      * The number of timer jobs that can be queued.
@@ -1582,6 +1582,10 @@ public class Config extends ConfigBase {
     @ConfField(description = {"任务堆积时用于存放定时任务的队列大小", "The number of timer jobs 
that can be queued."})
     public static int job_dispatch_timer_job_queue_size = 1024;
 
+    @ConfField(description = {"finished 状态的 job 最长保存时间,超过这个时间将会被删除, 单位:小时",
+            "The longest time to save the job in finished status, it will be 
deleted after this time. Unit: hour"})
+    public static int finished_job_cleanup_threshold_time_hour = 24;
+
     @ConfField(description = {"用于执行 Insert 任务的线程数,值应该大于0,否则默认为5",
             "The number of threads used to consume Insert tasks, "
                     + "the value should be greater than 0, if it is <=0, 
default is 5."})
@@ -1592,6 +1596,13 @@ public class Config extends ConfigBase {
                     + "the value should be greater than 0, if it is <=0, 
default is 5."})
     public static int job_mtmv_task_consumer_thread_num = 10;
 
+    /* job test config */
+    /**
+     * If set to true, we will allow the interval unit to be set to second, 
when creating a recurring job.
+     */
+    @ConfField
+    public static boolean enable_job_schedule_second_for_test = false;
+
     /*---------------------- JOB CONFIG END------------------------*/
     /**
      * The number of async tasks that can be queued. @See TaskDisruptor
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup 
b/fe/fe-core/src/main/cup/sql_parser.cup
index 6334f802073..bb6c09e1d42 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -688,7 +688,7 @@ nonterminal StatementBase stmt, show_stmt, show_param, 
help_stmt, load_stmt,
     create_routine_load_stmt, pause_routine_load_stmt, 
resume_routine_load_stmt, stop_routine_load_stmt,
     show_routine_load_stmt, show_routine_load_task_stmt, 
show_create_routine_load_stmt, show_create_load_stmt, 
show_create_reporitory_stmt,
     describe_stmt, alter_stmt, unset_var_stmt,
-    create_job_stmt,pause_job_stmt,resume_job_stmt,stop_job_stmt,show_job_stmt,
+    
create_job_stmt,pause_job_stmt,resume_job_stmt,stop_job_stmt,show_job_stmt,cancel_job_task_stmt,
     use_stmt, kill_stmt, drop_stmt, recover_stmt, grant_stmt, revoke_stmt, 
create_stmt, set_stmt, sync_stmt, cancel_stmt, cancel_param, delete_stmt,
     switch_stmt, transaction_stmt, unsupported_stmt, export_stmt, admin_stmt, 
truncate_stmt,
     import_columns_stmt, import_delete_on_stmt, import_sequence_stmt, 
import_where_stmt, install_plugin_stmt, uninstall_plugin_stmt,
@@ -1157,6 +1157,8 @@ stmt ::=
     {: RESULT = stmt; :}
     | pause_job_stmt : stmt   
     {: RESULT = stmt; :}
+    | cancel_job_task_stmt : stmt
+    {: RESULT = stmt; :}       
     | show_job_stmt : stmt   
     {: RESULT = stmt; :}
     | stop_job_stmt : stmt   
@@ -2589,13 +2591,7 @@ create_job_stmt ::=
      {:  
           RESULT = endTime;  
      :}
-     ;       
-resume_job_stmt ::=
-    KW_RESUME KW_JOB  KW_FOR job_label:jobLabel
-    {:
-        RESULT = new ResumeJobStmt(jobLabel);
-    :}
-    ;    
+     ;        
 show_job_stmt ::=
     KW_SHOW KW_JOBS
     {:
@@ -2623,18 +2619,30 @@ show_job_stmt ::=
     :}       
     ;
 pause_job_stmt ::=
-    KW_PAUSE KW_JOB KW_FOR job_label:jobLabel
+    KW_PAUSE KW_JOB opt_wild_where
     {:
-        RESULT = new PauseJobStmt(jobLabel);
+       RESULT = new 
AlterJobStatusStmt(parser.where,org.apache.doris.job.common.JobStatus.PAUSED);
     :}
     ;
 
 stop_job_stmt ::=
-    KW_STOP KW_JOB KW_FOR job_label:jobLabel
+    KW_DROP KW_JOB opt_wild_where
+    {:
+      RESULT = new 
AlterJobStatusStmt(parser.where,org.apache.doris.job.common.JobStatus.STOPPED);
+    :}
+    ;    
+resume_job_stmt ::=
+    KW_RESUME KW_JOB  opt_wild_where
+    {:
+        RESULT = new 
AlterJobStatusStmt(parser.where,org.apache.doris.job.common.JobStatus.RUNNING);
+    :}
+    ;    
+cancel_job_task_stmt ::=
+   KW_CANCEL KW_TASK opt_wild_where
     {:
-        RESULT = new StopJobStmt(jobLabel);
+      RESULT = new CancelJobTaskStmt(parser.where);
     :}
-    ;                       
+    ;                          
 // Routine load statement
 create_routine_load_stmt ::=
     KW_CREATE KW_ROUTINE KW_LOAD job_label:jobLabel optional_on_ident:tableName
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ResumeJobStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterJobStatusStmt.java
similarity index 51%
rename from 
fe/fe-core/src/main/java/org/apache/doris/analysis/ResumeJobStmt.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/analysis/AlterJobStatusStmt.java
index 725d24f47af..09f135d2fb0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ResumeJobStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterJobStatusStmt.java
@@ -17,47 +17,47 @@
 
 package org.apache.doris.analysis;
 
-import org.apache.doris.cluster.ClusterNamespace;
-import org.apache.doris.common.ErrorCode;
-import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.UserException;
+import org.apache.doris.job.common.JobStatus;
 
 import com.google.common.base.Strings;
+import lombok.Getter;
 
-public class ResumeJobStmt extends DdlStmt {
+public class AlterJobStatusStmt extends DdlStmt {
 
-    private final LabelName labelName;
+    private Expr expr;
 
-    private String db;
+    private static final String columnName = "jobName";
 
-    public ResumeJobStmt(LabelName labelName) {
-        this.labelName = labelName;
-    }
-
-    public boolean isAll() {
-        return labelName == null;
-    }
+    @Getter
+    private String jobName;
 
-    public String getName() {
-        return labelName.getLabelName();
-    }
+    @Getter
+    private JobStatus jobStatus;
 
-    public String getDbFullName() {
-        return db;
+    public AlterJobStatusStmt(Expr whereClause, JobStatus jobStatus) {
+        this.expr = whereClause;
+        this.jobStatus = jobStatus;
     }
 
     @Override
     public void analyze(Analyzer analyzer) throws UserException {
         super.analyze(analyzer);
         CreateJobStmt.checkAuth();
-        if (labelName != null) {
-            labelName.analyze(analyzer);
-            db = labelName.getDbName();
-        } else {
-            if (Strings.isNullOrEmpty(analyzer.getDefaultDb())) {
-                ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
-            }
-            db = ClusterNamespace.getFullName(analyzer.getClusterName(), 
analyzer.getDefaultDb());
+        String inputCol = ((SlotRef) expr.getChild(0)).getColumnName();
+        if (!inputCol.equalsIgnoreCase(columnName)) {
+            throw new AnalysisException("Current not support " + inputCol);
         }
+        if (!(expr.getChild(1) instanceof StringLiteral)) {
+            throw new AnalysisException("Value must is string");
+        }
+
+        String inputValue = expr.getChild(1).getStringValue();
+        if (Strings.isNullOrEmpty(inputValue)) {
+            throw new AnalysisException("Value can't is null");
+        }
+        this.jobName = inputValue;
+
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelJobTaskStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelJobTaskStmt.java
new file mode 100644
index 00000000000..ea6febe1630
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelJobTaskStmt.java
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/StringLiteral.java
+// and modified by Doris
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
+
+import lombok.Getter;
+
+public class CancelJobTaskStmt extends DdlStmt {
+
+    @Getter
+    private String jobName;
+
+    @Getter
+    private Long taskId;
+
+    private Expr expr;
+
+    private static final String jobNameKey = "jobName";
+
+    private static final String taskIdKey = "taskId";
+
+    public CancelJobTaskStmt(Expr whereExpr) {
+        this.expr = whereExpr;
+    }
+
+    @Override
+    public void analyze(Analyzer analyzer) throws UserException {
+        super.analyze(analyzer);
+        CreateJobStmt.checkAuth();
+        CompoundPredicate compoundPredicate = (CompoundPredicate) expr;
+        if (!compoundPredicate.getOp().equals(CompoundPredicate.Operator.AND)) 
{
+            throw new AnalysisException("Only allow compound predicate with 
operator AND");
+        }
+        String jobNameInput = ((SlotRef) 
compoundPredicate.getChildren().get(0).getChild(0)).getColumnName();
+        if (!jobNameKey.equalsIgnoreCase(jobNameInput)) {
+            throw new AnalysisException("Current not support " + jobNameInput);
+        }
+
+        if (!(compoundPredicate.getChildren().get(0).getChild(1) instanceof 
StringLiteral)) {
+            throw new AnalysisException("JobName value must is string");
+        }
+        this.jobName = 
compoundPredicate.getChildren().get(0).getChild(1).getStringValue();
+        String taskIdInput = ((SlotRef) 
compoundPredicate.getChildren().get(1).getChild(0)).getColumnName();
+        if (!taskIdKey.equalsIgnoreCase(taskIdInput)) {
+            throw new AnalysisException("Current not support " + taskIdInput);
+        }
+        if (!(compoundPredicate.getChildren().get(1).getChild(1) instanceof 
IntLiteral)) {
+            throw new AnalysisException("task id  value must is large int");
+        }
+        this.taskId = ((IntLiteral) 
compoundPredicate.getChildren().get(1).getChild(1)).getLongValue();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
index a1c008fb95e..1a44ee4f90c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
@@ -19,6 +19,7 @@ package org.apache.doris.analysis;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.UserException;
@@ -39,6 +40,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 
 import java.util.HashSet;
+import java.util.Set;
 
 /**
  * syntax:
@@ -66,7 +68,7 @@ public class CreateJobStmt extends DdlStmt {
     private StatementBase doStmt;
 
     @Getter
-    private AbstractJob<?> jobInstance;
+    private AbstractJob jobInstance;
 
     private final LabelName labelName;
 
@@ -83,6 +85,13 @@ public class CreateJobStmt extends DdlStmt {
     private final String comment;
     private JobExecuteType executeType;
 
+    // exclude job name prefix, which is used by inner job
+    private final Set<String> excludeJobNamePrefix = new HashSet<>();
+
+    {
+        excludeJobNamePrefix.add("inner_mtmv_");
+    }
+
     private static final ImmutableSet<Class<? extends DdlStmt>> 
supportStmtSuperClass
             = new ImmutableSet.Builder<Class<? extends 
DdlStmt>>().add(InsertStmt.class)
             .add(UpdateStmt.class).build();
@@ -126,7 +135,15 @@ public class CreateJobStmt extends DdlStmt {
             timerDefinition.setInterval(interval);
         }
         if (null != intervalTimeUnit) {
-            
timerDefinition.setIntervalUnit(IntervalUnit.valueOf(intervalTimeUnit.toUpperCase()));
+            IntervalUnit intervalUnit = 
IntervalUnit.fromString(intervalTimeUnit.toUpperCase());
+            if (null == intervalUnit) {
+                throw new AnalysisException("interval time unit can not be " + 
intervalTimeUnit);
+            }
+            if (intervalUnit.equals(IntervalUnit.SECOND)
+                    && !Config.enable_job_schedule_second_for_test) {
+                throw new AnalysisException("interval time unit can not be 
second");
+            }
+            timerDefinition.setIntervalUnit(intervalUnit);
         }
         if (null != startsTimeStamp) {
             
timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(startsTimeStamp));
@@ -134,6 +151,7 @@ public class CreateJobStmt extends DdlStmt {
         if (null != endsTimeStamp) {
             
timerDefinition.setEndTimeMs(TimeUtils.timeStringToLong(endsTimeStamp));
         }
+        checkJobName(labelName.getLabelName());
         jobExecutionConfiguration.setTimerDefinition(timerDefinition);
         job.setJobConfig(jobExecutionConfiguration);
 
@@ -151,6 +169,14 @@ public class CreateJobStmt extends DdlStmt {
         jobInstance = job;
     }
 
+    private void checkJobName(String jobName) throws AnalysisException {
+        for (String prefix : excludeJobNamePrefix) {
+            if (jobName.startsWith(prefix)) {
+                throw new AnalysisException("job name can not start with " + 
prefix);
+            }
+        }
+    }
+
     protected static void checkAuth() throws AnalysisException {
         if 
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), 
PrivPredicate.ADMIN)) {
             
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, 
"ADMIN");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/PauseJobStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/PauseJobStmt.java
deleted file mode 100644
index c399fc37cca..00000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/PauseJobStmt.java
+++ /dev/null
@@ -1,69 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.analysis;
-
-import org.apache.doris.cluster.ClusterNamespace;
-import org.apache.doris.common.ErrorCode;
-import org.apache.doris.common.ErrorReport;
-import org.apache.doris.common.UserException;
-
-import com.google.common.base.Strings;
-
-/**
- * syntax:
- *  PAUSE JOB FOR  [database.]name
- *  we can pause a job by jobName
- *  it's only running job can be paused, and it will be paused immediately
- *  paused job can be resumed by RESUME EVENT FOR jobName
- */
-public class PauseJobStmt extends DdlStmt {
-
-    private final LabelName labelName;
-    private String db;
-
-    public PauseJobStmt(LabelName labelName) {
-        this.labelName = labelName;
-    }
-
-    public boolean isAll() {
-        return labelName == null;
-    }
-
-    public String getName() {
-        return labelName.getLabelName();
-    }
-
-    public String getDbFullName() {
-        return db;
-    }
-
-    @Override
-    public void analyze(Analyzer analyzer) throws UserException {
-        super.analyze(analyzer);
-        CreateJobStmt.checkAuth();
-        if (labelName != null) {
-            labelName.analyze(analyzer);
-            db = labelName.getDbName();
-        } else {
-            if (Strings.isNullOrEmpty(analyzer.getDefaultDb())) {
-                ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
-            }
-            db = ClusterNamespace.getFullName(analyzer.getClusterName(), 
analyzer.getDefaultDb());
-        }
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/StopJobStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/StopJobStmt.java
deleted file mode 100644
index afef3ef01b5..00000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StopJobStmt.java
+++ /dev/null
@@ -1,49 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.analysis;
-
-import org.apache.doris.common.UserException;
-
-/**
- * syntax:
- * STOP JOB FOR [database.]name
- * only run job can be stopped, and stopped job can't be resumed
- */
-public class StopJobStmt extends DdlStmt {
-
-    private final LabelName labelName;
-
-    public StopJobStmt(LabelName labelName) {
-        this.labelName = labelName;
-    }
-
-    public String getName() {
-        return labelName.getLabelName();
-    }
-
-    public String getDbFullName() {
-        return labelName.getDbName();
-    }
-
-    @Override
-    public void analyze(Analyzer analyzer) throws UserException {
-        super.analyze(analyzer);
-        CreateJobStmt.checkAuth();
-        labelName.analyze(analyzer);
-    }
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 2e5874fed12..61379c1b28e 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -360,7 +360,7 @@ public class Env {
     private MetastoreEventsProcessor metastoreEventsProcessor;
 
     private ExportTaskRegister exportTaskRegister;
-    private JobManager<? extends AbstractJob> jobManager;
+    private JobManager<? extends AbstractJob<?, ?>, ?> jobManager;
     private TransientTaskManager transientTaskManager;
 
     private MasterDaemon labelCleaner; // To clean old LabelInfo, 
ExportJobInfos
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
index ca98756f6d3..83f02326d82 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
@@ -26,6 +26,7 @@ import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.job.common.JobStatus;
 import org.apache.doris.job.common.TaskStatus;
+import org.apache.doris.job.common.TaskType;
 import org.apache.doris.job.exception.JobException;
 import org.apache.doris.job.task.AbstractTask;
 import org.apache.doris.persist.gson.GsonUtils;
@@ -36,6 +37,7 @@ import org.apache.doris.thrift.TRow;
 import com.google.common.collect.ImmutableList;
 import com.google.gson.annotations.SerializedName;
 import lombok.Data;
+import lombok.extern.log4j.Log4j2;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.RandomUtils;
 
@@ -43,10 +45,14 @@ import java.io.DataInput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 @Data
-public abstract class AbstractJob<T extends AbstractTask> implements Job<T>, 
Writable {
+@Log4j2
+public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, 
C>, Writable {
 
     @SerializedName(value = "jid")
     private Long jobId;
@@ -75,6 +81,9 @@ public abstract class AbstractJob<T extends AbstractTask> 
implements Job<T>, Wri
     @SerializedName(value = "sql")
     String executeSql;
 
+    @SerializedName(value = "ftm")
+    private long finishTimeMs;
+
     private List<T> runningTasks = new ArrayList<>();
 
     @Override
@@ -106,7 +115,43 @@ public abstract class AbstractJob<T extends AbstractTask> 
implements Job<T>, Wri
             throw new JobException("no running task");
         }
         runningTasks.stream().filter(task -> 
task.getTaskId().equals(taskId)).findFirst()
-                .orElseThrow(() -> new JobException("no task id:" + 
taskId)).cancel();
+                .orElseThrow(() -> new JobException("no task id: " + 
taskId)).cancel();
+        if (jobConfig.getExecuteType().equals(JobExecuteType.ONE_TIME)) {
+            updateJobStatus(JobStatus.FINISHED);
+        }
+    }
+
+    public List<T> queryAllTasks() {
+        List<T> tasks = new ArrayList<>();
+        if (CollectionUtils.isEmpty(runningTasks)) {
+            return queryTasks();
+        }
+
+        List<T> historyTasks = queryTasks();
+        if (CollectionUtils.isNotEmpty(historyTasks)) {
+            tasks.addAll(historyTasks);
+        }
+        Set<Long> loadTaskIds = 
tasks.stream().map(AbstractTask::getTaskId).collect(Collectors.toSet());
+        runningTasks.forEach(task -> {
+            if (!loadTaskIds.contains(task.getTaskId())) {
+                tasks.add(task);
+            }
+        });
+        Comparator<T> taskComparator = 
Comparator.comparingLong(T::getCreateTimeMs).reversed();
+        tasks.sort(taskComparator);
+        return tasks;
+    }
+
+    public List<T> commonCreateTasks(TaskType taskType, C taskContext) {
+        if (!getJobStatus().equals(JobStatus.RUNNING)) {
+            log.warn("job is not running, job id is {}", jobId);
+            return new ArrayList<>();
+        }
+        if (!isReadyForScheduling(taskContext)) {
+            log.info("job is not ready for scheduling, job id is {}", jobId);
+            return new ArrayList<>();
+        }
+        return createTasks(taskType, taskContext);
     }
 
     public void initTasks(List<? extends AbstractTask> tasks) {
@@ -133,21 +178,26 @@ public abstract class AbstractJob<T extends AbstractTask> 
implements Job<T>, Wri
         checkJobParamsInternal();
     }
 
-    public void updateJobStatus(JobStatus newJobStatus) {
+    public void updateJobStatus(JobStatus newJobStatus) throws JobException {
         if (null == newJobStatus) {
             throw new IllegalArgumentException("jobStatus cannot be null");
         }
+        String errorMsg = String.format("Can't update job %s status to the %s 
status",
+                jobStatus.name(), newJobStatus.name());
         if (jobStatus == newJobStatus) {
-            throw new IllegalArgumentException(String.format("Can't update job 
%s status to the %s status",
-                    jobStatus.name(), this.jobStatus.name()));
+            throw new IllegalArgumentException(errorMsg);
         }
         if (newJobStatus.equals(JobStatus.RUNNING) && 
!jobStatus.equals(JobStatus.PAUSED)) {
-            throw new IllegalArgumentException(String.format("Can't update job 
%s status to the %s status",
-                    jobStatus.name(), this.jobStatus.name()));
+            throw new IllegalArgumentException(errorMsg);
         }
         if (newJobStatus.equals(JobStatus.STOPPED) && 
!jobStatus.equals(JobStatus.RUNNING)) {
-            throw new IllegalArgumentException(String.format("Can't update job 
%s status to the %s status",
-                    jobStatus.name(), this.jobStatus.name()));
+            throw new IllegalArgumentException(errorMsg);
+        }
+        if (newJobStatus.equals(JobStatus.FINISHED)) {
+            this.finishTimeMs = System.currentTimeMillis();
+        }
+        if (JobStatus.PAUSED.equals(newJobStatus)) {
+            cancelAllTasks();
         }
         jobStatus = newJobStatus;
     }
@@ -157,25 +207,26 @@ public abstract class AbstractJob<T extends AbstractTask> 
implements Job<T>, Wri
 
     public static AbstractJob readFields(DataInput in) throws IOException {
         String jsonJob = Text.readString(in);
-        AbstractJob<?> job = GsonUtils.GSON.fromJson(jsonJob, 
AbstractJob.class);
+        AbstractJob job = GsonUtils.GSON.fromJson(jsonJob, AbstractJob.class);
         job.setRunningTasks(new ArrayList<>());
         return job;
     }
 
     @Override
-    public void onTaskFail(T task) {
+    public void onTaskFail(T task) throws JobException {
         updateJobStatusIfEnd();
         runningTasks.remove(task);
     }
 
     @Override
-    public void onTaskSuccess(T task) {
+    public void onTaskSuccess(T task) throws JobException {
         updateJobStatusIfEnd();
         runningTasks.remove(task);
 
     }
 
-    private void updateJobStatusIfEnd() {
+
+    private void updateJobStatusIfEnd() throws JobException {
         JobExecuteType executeType = getJobConfig().getExecuteType();
         if (executeType.equals(JobExecuteType.MANUAL)) {
             return;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
index a530ce3b2a0..ee352a0f417 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
@@ -33,23 +33,29 @@ import java.util.List;
  * The job status is used to control the execution of the job.
  *
  * @param <T> The type of task associated with the job, extending AbstractTask.
+ *            <C> The type of task context associated with the job
  */
-public interface Job<T extends AbstractTask> {
+public interface Job<T extends AbstractTask, C> {
 
     /**
      * Creates a list of tasks of the specified type for this job.
+     * you can set task context for task,
+     * eg: insert task, execute sql is insert into table select * from table1 
limit ${limit}
+     * every task context is different, eg: limit 1000, limit 2000,you can set 
task context to 1000,2000
+     * it's used by manual task or streaming task
      *
-     * @param taskType The type of tasks to create.
+     * @param taskType    The type of tasks to create. @See TaskType
+     * @param taskContext The context of tasks to create.
      * @return A list of tasks.
      */
-    List<T> createTasks(TaskType taskType);
+    List<T> createTasks(TaskType taskType, C taskContext);
 
     /**
      * Cancels the task with the specified taskId.
      *
      * @param taskId The ID of the task to cancel.
      * @throws JobException If the task is not in the running state, it may 
have already
-     * finished and cannot be cancelled.
+     *                      finished and cannot be cancelled.
      */
     void cancelTaskById(long taskId) throws JobException;
 
@@ -60,7 +66,7 @@ public interface Job<T extends AbstractTask> {
      *
      * @return True if the job is ready for scheduling, false otherwise.
      */
-    boolean isReadyForScheduling();
+    boolean isReadyForScheduling(C taskContext);
 
     /**
      * Retrieves the metadata for the job, which is used to display job 
information.
@@ -103,17 +109,18 @@ public interface Job<T extends AbstractTask> {
      *
      * @param task The failed task.
      */
-    void onTaskFail(T task);
+    void onTaskFail(T task) throws JobException;
 
     /**
      * Notifies the job when a task execution is successful.
      *
      * @param task The successful task.
      */
-    void onTaskSuccess(T task);
+    void onTaskSuccess(T task) throws JobException;
 
     /**
      * get the job's show info, which is used to sql show the job information
+     *
      * @return List<String> job common show info
      */
     List<String> getShowInfo();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/common/IntervalUnit.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/common/IntervalUnit.java
index 4c576e986fa..dba324a81c4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/common/IntervalUnit.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/common/IntervalUnit.java
@@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
 public enum IntervalUnit {
-
     SECOND("second", 0L, TimeUnit.SECONDS::toMillis),
     MINUTE("minute", 0L, TimeUnit.MINUTES::toMillis),
     HOUR("hour", 0L, TimeUnit.HOURS::toMillis),
@@ -57,7 +56,7 @@ public enum IntervalUnit {
         return Arrays.stream(IntervalUnit.values())
                 .filter(config -> config.getUnit().equals(name))
                 .findFirst()
-                .orElseThrow(() -> new IllegalArgumentException("Unknown 
configuration " + name));
+                .orElseThrow(() -> new IllegalArgumentException("Unknown 
configuration interval " + name));
     }
 
     public Long getIntervalMs(Long interval) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/common/TaskStatus.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/common/TaskStatus.java
index b4040d31e08..4cb401f28dd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/common/TaskStatus.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/common/TaskStatus.java
@@ -19,8 +19,8 @@ package org.apache.doris.job.common;
 
 public enum TaskStatus {
     PENDING,
-    CANCEL,
+    CANCELED,
     RUNNING,
     SUCCESS,
-    FAILD;
+    FAILED;
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TimerJobEvent.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TimerJobEvent.java
index 218fefd0419..65654c225fe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TimerJobEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TimerJobEvent.java
@@ -23,12 +23,12 @@ import com.lmax.disruptor.EventFactory;
 import lombok.Data;
 
 @Data
-public class TimerJobEvent<T extends AbstractJob<?>> {
+public class TimerJobEvent<T extends AbstractJob> {
 
 
     private T job;
 
-    public static <T extends AbstractJob<?>> EventFactory<TimerJobEvent<T>> 
factory() {
+    public static <T extends AbstractJob> EventFactory<TimerJobEvent<T>> 
factory() {
         return TimerJobEvent::new;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DefaultTaskExecutorHandler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DefaultTaskExecutorHandler.java
index 0e5911a4e85..a07e248af6d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DefaultTaskExecutorHandler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DefaultTaskExecutorHandler.java
@@ -21,7 +21,7 @@ import org.apache.doris.job.disruptor.ExecuteTaskEvent;
 import org.apache.doris.job.task.AbstractTask;
 
 import com.lmax.disruptor.WorkHandler;
-import lombok.extern.slf4j.Slf4j;
+import lombok.extern.log4j.Log4j2;
 
 /**
  * DefaultTaskExecutor is an implementation of the TaskExecutor interface.
@@ -30,7 +30,7 @@ import lombok.extern.slf4j.Slf4j;
  * It executes a given AbstractTask by acquiring a semaphore token from the 
TaskTokenManager
  * and releasing it after the task execution.
  */
-@Slf4j
+@Log4j2
 public class DefaultTaskExecutorHandler<T extends AbstractTask> implements 
WorkHandler<ExecuteTaskEvent<T>> {
 
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
index 852ba13415b..cb0a393d917 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
@@ -26,8 +26,8 @@ import org.apache.doris.job.disruptor.TimerJobEvent;
 import org.apache.doris.job.task.AbstractTask;
 
 import com.lmax.disruptor.WorkHandler;
-import jline.internal.Log;
-import lombok.extern.slf4j.Slf4j;
+import lombok.extern.log4j.Log4j2;
+import org.apache.commons.collections.CollectionUtils;
 
 import java.util.List;
 import java.util.Map;
@@ -37,8 +37,8 @@ import java.util.Map;
  * when job is ready for scheduling and job status is running
  * we will create task and publish to task disruptor @see 
DefaultTaskExecutorHandler
  */
-@Slf4j
-public class DispatchTaskHandler<T extends AbstractJob<?>> implements 
WorkHandler<TimerJobEvent<T>> {
+@Log4j2
+public class DispatchTaskHandler<T extends AbstractJob> implements 
WorkHandler<TimerJobEvent<T>> {
 
     private final Map<JobType, TaskDisruptor<T>> disruptorMap;
 
@@ -50,19 +50,27 @@ public class DispatchTaskHandler<T extends AbstractJob<?>> 
implements WorkHandle
     @Override
     public void onEvent(TimerJobEvent<T> event) {
         try {
+            log.info("dispatch timer job, job id is {}, job name is {}", 
event.getJob().getJobId(),
+                    event.getJob().getJobName());
             if (null == event.getJob()) {
                 log.info("job is null,may be job is deleted, ignore");
                 return;
             }
-            if (event.getJob().isReadyForScheduling() && 
event.getJob().getJobStatus() == JobStatus.RUNNING) {
-                List<? extends AbstractTask> tasks = 
event.getJob().createTasks(TaskType.SCHEDULED);
+            if (event.getJob().isReadyForScheduling(null) && 
event.getJob().getJobStatus() == JobStatus.RUNNING) {
+                List<? extends AbstractTask> tasks = 
event.getJob().commonCreateTasks(TaskType.SCHEDULED, null);
+                if (CollectionUtils.isEmpty(tasks)) {
+                    log.warn("job is ready for scheduling, but create task is 
empty, skip scheduler,"
+                                    + "job id is {}," + " job name is {}", 
event.getJob().getJobId(),
+                            event.getJob().getJobName());
+                    return;
+                }
                 JobType jobType = event.getJob().getJobType();
                 for (AbstractTask task : tasks) {
                     disruptorMap.get(jobType).publishEvent(task, 
event.getJob().getJobConfig());
                 }
             }
         } catch (Exception e) {
-            Log.warn("dispatch timer job error, task id is {}", 
event.getJob().getJobId(), e);
+            log.warn("dispatch timer job error, task id is {}", 
event.getJob().getJobId(), e);
         }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java
index 9bd4cbfeae2..74efe49beb1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java
@@ -18,15 +18,15 @@
 package org.apache.doris.job.executor;
 
 import org.apache.doris.job.base.AbstractJob;
+import org.apache.doris.job.common.JobStatus;
 import org.apache.doris.job.disruptor.TaskDisruptor;
 
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
-import jline.internal.Log;
-import lombok.extern.slf4j.Slf4j;
+import lombok.extern.log4j.Log4j2;
 
-@Slf4j
-public class TimerJobSchedulerTask<T extends AbstractJob<?>> implements 
TimerTask {
+@Log4j2
+public class TimerJobSchedulerTask<T extends AbstractJob> implements TimerTask 
{
 
     private TaskDisruptor dispatchDisruptor;
 
@@ -40,9 +40,13 @@ public class TimerJobSchedulerTask<T extends AbstractJob<?>> 
implements TimerTas
     @Override
     public void run(Timeout timeout) {
         try {
+            if (!JobStatus.RUNNING.equals(job.getJobStatus())) {
+                log.info("job status is not running, job id is {}, skip 
dispatch", this.job.getJobId());
+                return;
+            }
             dispatchDisruptor.publishEvent(this.job);
         } catch (Exception e) {
-            Log.warn("dispatch timer job error, task id is {}", 
this.job.getJobId(), e);
+            log.warn("dispatch timer job error, task id is {}", 
this.job.getJobId(), e);
         }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
index 619a5c7fded..968f1413528 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
@@ -47,11 +47,12 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 @Data
 @Slf4j
-public class InsertJob extends AbstractJob<InsertTask> {
+public class InsertJob extends AbstractJob<InsertTask, Map> {
 
     public static final ImmutableList<Column> SCHEMA = ImmutableList.of(
             new Column("Id", ScalarType.createStringType()),
@@ -87,11 +88,11 @@ public class InsertJob extends AbstractJob<InsertTask> {
     ConcurrentLinkedQueue<Long> taskIdList;
 
     // max save task num, do we need to config it?
-    private static final int MAX_SAVE_TASK_NUM = 50;
-
+    private static final int MAX_SAVE_TASK_NUM = 100;
 
     @Override
-    public List<InsertTask> createTasks(TaskType taskType) {
+    public List<InsertTask> createTasks(TaskType taskType, Map taskContext) {
+        //nothing need to do in insert job
         InsertTask task = new InsertTask(null, getCurrentDbName(), 
getExecuteSql(), getCreateUser());
         task.setJobId(getJobId());
         task.setTaskType(taskType);
@@ -123,15 +124,15 @@ public class InsertJob extends AbstractJob<InsertTask> {
         super.cancelTaskById(taskId);
     }
 
-
     @Override
-    public void cancelAllTasks() throws JobException {
-        super.cancelAllTasks();
+    public boolean isReadyForScheduling(Map taskContext) {
+        return CollectionUtils.isEmpty(getRunningTasks());
     }
 
+
     @Override
-    public boolean isReadyForScheduling() {
-        return true;
+    public void cancelAllTasks() throws JobException {
+        super.cancelAllTasks();
     }
 
 
@@ -162,8 +163,9 @@ public class InsertJob extends AbstractJob<InsertTask> {
             InsertTask task;
             try {
                 task = new InsertTask(loadJob.getLabel(), 
loadJob.getDb().getFullName(), null, getCreateUser());
+                task.setCreateTimeMs(loadJob.getCreateTimestamp());
             } catch (MetaNotFoundException e) {
-                log.warn("load job not found,job id is {}", loadJob.getId());
+                log.warn("load job not found, job id is {}", loadJob.getId());
                 return;
             }
             task.setJobId(getJobId());
@@ -195,7 +197,7 @@ public class InsertJob extends AbstractJob<InsertTask> {
     }
 
     @Override
-    public void onTaskSuccess(InsertTask task) {
+    public void onTaskSuccess(InsertTask task) throws JobException {
         super.onTaskSuccess(task);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
index eb319f4d5e7..69cdfe26b4b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
@@ -41,7 +41,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import lombok.Getter;
 import lombok.Setter;
-import lombok.extern.slf4j.Slf4j;
+import lombok.extern.log4j.Log4j2;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -52,7 +52,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 /**
  * todo implement this later
  */
-@Slf4j
+@Log4j2
 public class InsertTask extends AbstractTask {
 
     public static final ImmutableList<Column> SCHEMA = ImmutableList.of(
@@ -144,8 +144,14 @@ public class InsertTask extends AbstractTask {
     @Override
     public void run() throws JobException {
         try {
+            if (isCanceled.get()) {
+                log.info("task has been canceled, task id is {}", getTaskId());
+                return;
+            }
             command.run(ctx, stmtExecutor);
         } catch (Exception e) {
+            log.warn("execute insert task error, job id is {}, task id is 
{},sql is {}", getJobId(),
+                    getTaskId(), sql, e);
             throw new JobException(e);
         }
     }
@@ -177,7 +183,7 @@ public class InsertTask extends AbstractTask {
     @Override
     public List<String> getShowInfo() {
         if (null == loadJob) {
-            return new ArrayList<>();
+            return getPendingTaskShowInfo();
         }
         List<String> jobInfo = Lists.newArrayList();
         // jobId
@@ -258,4 +264,22 @@ public class InsertTask extends AbstractTask {
         return trow;
     }
 
+    // if task not start, load job is null,return pending task show info
+    private List<String> getPendingTaskShowInfo() {
+        List<String> datas = new ArrayList<>();
+
+        datas.add(String.valueOf(getTaskId()));
+        datas.add(getJobId() + "_" + getTaskId());
+        datas.add(getStatus().name());
+        datas.add(FeConstants.null_string);
+        datas.add(FeConstants.null_string);
+        datas.add(FeConstants.null_string);
+        datas.add(TimeUtils.longToTimeString(getCreateTimeMs()));
+        datas.add(FeConstants.null_string);
+        datas.add(FeConstants.null_string);
+        datas.add(FeConstants.null_string);
+        datas.add(userIdentity.getQualifiedUser());
+        return datas;
+    }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java
index c00f97cd79f..6321679b6b2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java
@@ -47,8 +47,9 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
-public class MTMVJob extends AbstractJob<MTMVTask> {
+public class MTMVJob extends AbstractJob<MTMVTask, Map> {
     private static final Logger LOG = LogManager.getLogger(MTMVJob.class);
     private static final ShowResultSetMetaData JOB_META_DATA =
             ShowResultSetMetaData.builder()
@@ -109,7 +110,7 @@ public class MTMVJob extends AbstractJob<MTMVTask> {
     }
 
     @Override
-    public List<MTMVTask> createTasks(TaskType taskType) {
+    public List<MTMVTask> createTasks(TaskType taskType, Map taskContext) {
         MTMVTask task = new MTMVTask(dbId, mtmvId);
         task.setTaskType(taskType);
         ArrayList<MTMVTask> tasks = new ArrayList<>();
@@ -119,7 +120,7 @@ public class MTMVJob extends AbstractJob<MTMVTask> {
     }
 
     @Override
-    public boolean isReadyForScheduling() {
+    public boolean isReadyForScheduling(Map taskContext) {
         return CollectionUtils.isEmpty(getRunningTasks());
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index c7d04cdd282..776af152c69 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -29,7 +29,7 @@ import org.apache.doris.job.exception.JobException;
 import org.apache.doris.job.scheduler.JobScheduler;
 import org.apache.doris.job.task.AbstractTask;
 
-import lombok.extern.slf4j.Slf4j;
+import lombok.extern.log4j.Log4j2;
 import org.apache.commons.lang3.StringUtils;
 
 import java.io.DataInput;
@@ -39,8 +39,8 @@ import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
-@Slf4j
-public class JobManager<T extends AbstractJob<?>> implements Writable {
+@Log4j2
+public class JobManager<T extends AbstractJob<?, C>, C> implements Writable {
 
 
     private final ConcurrentHashMap<Long, T> jobMap = new 
ConcurrentHashMap<>(32);
@@ -54,9 +54,9 @@ public class JobManager<T extends AbstractJob<?>> implements 
Writable {
 
     public void registerJob(T job) throws JobException {
         job.checkJobParams();
-        checkJobNameExist(job.getJobName(), job.getJobType());
+        checkJobNameExist(job.getJobName());
         if (jobMap.get(job.getJobId()) != null) {
-            throw new JobException("job id exist,jobId:" + job.getJobId());
+            throw new JobException("job id exist, jobId:" + job.getJobId());
         }
         Env.getCurrentEnv().getEditLog().logCreateJob(job);
         jobMap.put(job.getJobId(), job);
@@ -65,9 +65,9 @@ public class JobManager<T extends AbstractJob<?>> implements 
Writable {
     }
 
 
-    private void checkJobNameExist(String jobName, JobType type) throws 
JobException {
-        if (jobMap.values().stream().anyMatch(a -> 
a.getJobName().equals(jobName) && a.getJobType().equals(type))) {
-            throw new JobException("job name exist,jobName:" + jobName);
+    private void checkJobNameExist(String jobName) throws JobException {
+        if (jobMap.values().stream().anyMatch(a -> 
a.getJobName().equals(jobName))) {
+            throw new JobException("job name exist, jobName:" + jobName);
         }
     }
 
@@ -79,13 +79,13 @@ public class JobManager<T extends AbstractJob<?>> 
implements Writable {
         jobMap.remove(jobId);
     }
 
-    public void unregisterJob(String jobName, JobType jobType) throws 
JobException {
+    public void unregisterJob(String jobName) throws JobException {
         for (T a : jobMap.values()) {
-            if (a.getJobName().equals(jobName) && 
a.getJobType().equals(jobType)) {
+            if (a.getJobName().equals(jobName)) {
                 try {
                     unregisterJob(a.getJobId());
                 } catch (JobException e) {
-                    throw new JobException("unregister job error,jobName:" + 
jobName);
+                    throw new JobException("unregister job error, jobName:" + 
jobName);
                 }
             }
         }
@@ -98,13 +98,17 @@ public class JobManager<T extends AbstractJob<?>> 
implements Writable {
         Env.getCurrentEnv().getEditLog().logUpdateJob(jobMap.get(jobId));
     }
 
-    public void alterJobStatus(String jobName, JobStatus jobStatus, JobType 
jobType) throws JobException {
+    public void alterJobStatus(String jobName, JobStatus jobStatus) throws 
JobException {
         for (T a : jobMap.values()) {
-            if (a.getJobName().equals(jobName) && 
jobType.equals(a.getJobType())) {
+            if (a.getJobName().equals(jobName)) {
                 try {
+                    if (jobStatus.equals(JobStatus.STOPPED)) {
+                        unregisterJob(a.getJobId());
+                        return;
+                    }
                     alterJobStatus(a.getJobId(), jobStatus);
                 } catch (JobException e) {
-                    throw new JobException("unregister job error,jobName:" + 
jobName);
+                    throw new JobException("unregister job error, jobName:" + 
jobName);
                 }
             }
         }
@@ -112,7 +116,7 @@ public class JobManager<T extends AbstractJob<?>> 
implements Writable {
 
     private void checkJobExist(Long jobId) throws JobException {
         if (null == jobMap.get(jobId)) {
-            throw new JobException("job not exist,jobId:" + jobId);
+            throw new JobException("job not exist, jobId:" + jobId);
         }
     }
 
@@ -129,6 +133,7 @@ public class JobManager<T extends AbstractJob<?>> 
implements Writable {
 
     /**
      * query jobs by job type
+     *
      * @param jobTypes @JobType
      * @return List<AbstractJob> job list
      */
@@ -156,12 +161,12 @@ public class JobManager<T extends AbstractJob<?>> 
implements Writable {
 
     public List<? extends AbstractTask> queryTasks(Long jobId) throws 
JobException {
         checkJobExist(jobId);
-        return jobMap.get(jobId).queryTasks();
+        return jobMap.get(jobId).queryAllTasks();
     }
 
-    public void triggerJob(long jobId) throws JobException {
+    public void triggerJob(long jobId, C context) throws JobException {
         checkJobExist(jobId);
-        jobScheduler.schedulerInstantJob(jobMap.get(jobId), TaskType.MANUAL);
+        jobScheduler.schedulerInstantJob(jobMap.get(jobId), TaskType.MANUAL, 
context);
     }
 
     public void replayCreateJob(T job) {
@@ -191,11 +196,14 @@ public class JobManager<T extends AbstractJob<?>> 
implements Writable {
                 .add("msg", "replay delete scheduler job").build());
     }
 
-    void cancelTask(Long jobId, Long taskId) throws JobException {
-        checkJobExist(jobId);
-        if (null == jobMap.get(jobId).getRunningTasks()) {
-            throw new JobException("task not exist,taskId:" + taskId);
+    public void cancelTaskById(String jobName, Long taskId) throws 
JobException {
+        for (T job : jobMap.values()) {
+            if (job.getJobName().equals(jobName)) {
+                job.cancelTaskById(taskId);
+                return;
+            }
         }
+        throw new JobException("job not exist, jobName:" + jobName);
     }
 
     @Override
@@ -205,7 +213,7 @@ public class JobManager<T extends AbstractJob<?>> 
implements Writable {
             try {
                 job.write(out);
             } catch (IOException e) {
-                log.error("write job error,jobId:" + jobId, e);
+                log.error("write job error, jobId:" + jobId, e);
             }
         });
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java
index d07a109fc53..4e31e467013 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java
@@ -46,7 +46,7 @@ public class TaskDisruptorGroupManager<T extends 
AbstractTask> {
     private final Map<JobType, TaskDisruptor<T>> disruptorMap = new 
EnumMap<>(JobType.class);
 
     @Getter
-    private TaskDisruptor<TimerJobEvent<AbstractJob<?>>> dispatchDisruptor;
+    private TaskDisruptor<TimerJobEvent<AbstractJob>> dispatchDisruptor;
 
     private static final int DEFAULT_RING_BUFFER_SIZE = 1024;
 
@@ -76,14 +76,14 @@ public class TaskDisruptorGroupManager<T extends 
AbstractTask> {
     }
 
     private void registerDispatchDisruptor() {
-        EventFactory<TimerJobEvent<AbstractJob<T>>> dispatchEventFactory = 
TimerJobEvent.factory();
+        EventFactory<TimerJobEvent<AbstractJob>> dispatchEventFactory = 
TimerJobEvent.factory();
         ThreadFactory dispatchThreadFactory = new 
CustomThreadFactory("dispatch-task");
         WorkHandler[] dispatchTaskExecutorHandlers = new 
WorkHandler[DISPATCH_TIMER_JOB_CONSUMER_THREAD_NUM];
         for (int i = 0; i < DISPATCH_TIMER_JOB_CONSUMER_THREAD_NUM; i++) {
             dispatchTaskExecutorHandlers[i] = new 
DispatchTaskHandler(this.disruptorMap);
         }
-        EventTranslatorVararg<TimerJobEvent<AbstractJob<T>>> eventTranslator =
-                (event, sequence, args) -> event.setJob((AbstractJob<T>) 
args[0]);
+        EventTranslatorVararg<TimerJobEvent<AbstractJob>> eventTranslator =
+                (event, sequence, args) -> event.setJob((AbstractJob) args[0]);
         this.dispatchDisruptor = new TaskDisruptor<>(dispatchEventFactory, 
DISPATCH_TIMER_JOB_QUEUE_SIZE,
                 dispatchThreadFactory,
                 new BlockingWaitStrategy(), dispatchTaskExecutorHandlers, 
eventTranslator);
@@ -123,7 +123,7 @@ public class TaskDisruptorGroupManager<T extends 
AbstractTask> {
         disruptorMap.put(JobType.MV, mtmvDisruptor);
     }
 
-    public void dispatchTimerJob(AbstractJob<T> job) {
+    public void dispatchTimerJob(AbstractJob job) {
         dispatchDisruptor.publishEvent(job);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskTokenManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskTokenManager.java
index 51b8b6aeb36..877cb306916 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskTokenManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskTokenManager.java
@@ -18,7 +18,7 @@
 package org.apache.doris.job.manager;
 
 import lombok.experimental.UtilityClass;
-import lombok.extern.slf4j.Slf4j;
+import lombok.extern.log4j.Log4j2;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -29,7 +29,7 @@ import java.util.concurrent.Semaphore;
  * It provides a method to acquire a semaphore token for a specific job ID 
with the given maximum concurrency.
  * If a semaphore doesn't exist for the job ID, it creates a new one and adds 
it to the map.
  */
-@Slf4j
+@Log4j2
 @UtilityClass
 public class TaskTokenManager {
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
index c701c9fd1f8..47e91d97b49 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
@@ -17,6 +17,8 @@
 
 package org.apache.doris.job.scheduler;
 
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.CustomThreadFactory;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.job.base.AbstractJob;
@@ -30,7 +32,7 @@ import org.apache.doris.job.manager.TaskDisruptorGroupManager;
 import org.apache.doris.job.task.AbstractTask;
 
 import io.netty.util.HashedWheelTimer;
-import lombok.extern.slf4j.Slf4j;
+import lombok.extern.log4j.Log4j2;
 import org.apache.commons.collections.CollectionUtils;
 
 import java.io.Closeable;
@@ -39,8 +41,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-@Slf4j
-public class JobScheduler<T extends AbstractJob<?>> implements Closeable {
+@Log4j2
+public class JobScheduler<T extends AbstractJob<?, C>, C> implements Closeable 
{
 
     /**
      * scheduler tasks, it's used to scheduler job
@@ -68,6 +70,13 @@ public class JobScheduler<T extends AbstractJob<?>> 
implements Closeable {
      */
     private static final long BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS = 
BATCH_SCHEDULER_INTERVAL_SECONDS * 1000L;
 
+    /**
+     * Finished job will be cleared after 24 hours
+     */
+    private static final long FINISHED_JOB_CLEANUP_THRESHOLD_TIME_MS =
+            (Config.finished_job_cleanup_threshold_time_hour > 0
+                    ? Config.finished_job_cleanup_threshold_time_hour : 24) * 
3600 * 1000L;
+
     public void start() {
         timerTaskScheduler = new HashedWheelTimer(new 
CustomThreadFactory("timer-task-scheduler"), 1,
                 TimeUnit.SECONDS, HASHED_WHEEL_TIMER_TICKS_PER_WHEEL);
@@ -105,20 +114,20 @@ public class JobScheduler<T extends AbstractJob<?>> 
implements Closeable {
             //manual job will not scheduler
             if 
(JobExecuteType.MANUAL.equals(job.getJobConfig().getExecuteType())) {
                 if (job.getJobConfig().isImmediate()) {
-                    schedulerInstantJob(job, TaskType.MANUAL);
+                    schedulerInstantJob(job, TaskType.MANUAL, null);
                 }
                 return;
             }
 
             //todo skip streaming job,improve in the future
             if 
(JobExecuteType.INSTANT.equals(job.getJobConfig().getExecuteType())) {
-                schedulerInstantJob(job, TaskType.SCHEDULED);
+                schedulerInstantJob(job, TaskType.SCHEDULED, null);
             }
         }
         //RECURRING job and  immediate is true
         if (job.getJobConfig().isImmediate()) {
             
job.getJobConfig().getTimerDefinition().setLatestSchedulerTimeMs(System.currentTimeMillis());
-            schedulerInstantJob(job, TaskType.SCHEDULED);
+            schedulerInstantJob(job, TaskType.SCHEDULED, null);
         }
         //if it's timer job and trigger last window already start, we will 
scheduler it immediately
         cycleTimerJobScheduler(job);
@@ -142,16 +151,11 @@ public class JobScheduler<T extends AbstractJob<?>> 
implements Closeable {
     }
 
 
-    public void schedulerInstantJob(T job, TaskType taskType) throws 
JobException {
-        if (!job.getJobStatus().equals(JobStatus.RUNNING)) {
-            throw new JobException("job is not running,job id is %d", 
job.getJobId());
-        }
-        if (!job.isReadyForScheduling()) {
-            log.info("job is not ready for scheduling,job id is {}", 
job.getJobId());
-            return;
-        }
-        List<? extends AbstractTask> tasks = job.createTasks(taskType);
+    public void schedulerInstantJob(T job, TaskType taskType, C context) {
+        List<? extends AbstractTask> tasks = job.commonCreateTasks(taskType, 
context);
         if (CollectionUtils.isEmpty(tasks)) {
+            log.info("job create task is empty, skip scheduler, job id is 
{},job name is {}", job.getJobId(),
+                    job.getJobName());
             if 
(job.getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
                 job.setJobStatus(JobStatus.FINISHED);
             }
@@ -166,19 +170,34 @@ public class JobScheduler<T extends AbstractJob<?>> 
implements Closeable {
      * We will get the task in the next time window, and then hand it over to 
the time wheel for timing trigger
      */
     private void executeTimerJobIdsWithinLastTenMinutesWindow() {
-        if (jobMap.isEmpty()) {
-            return;
-        }
         if (latestBatchSchedulerTimerTaskTimeMs < System.currentTimeMillis()) {
             this.latestBatchSchedulerTimerTaskTimeMs = 
System.currentTimeMillis();
         }
         this.latestBatchSchedulerTimerTaskTimeMs += 
BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS;
+        if (jobMap.isEmpty()) {
+            return;
+        }
         for (Map.Entry<Long, T> entry : jobMap.entrySet()) {
             T job = entry.getValue();
-            if (!job.getJobConfig().checkIsTimerJob()) {
+            if (job.getJobStatus().equals(JobStatus.FINISHED)) {
+                clearFinishedJob(job);
+                continue;
+            }
+            if (!job.getJobStatus().equals(JobStatus.RUNNING) && 
!job.getJobConfig().checkIsTimerJob()) {
                 continue;
             }
             cycleTimerJobScheduler(job);
         }
     }
+
+    private void clearFinishedJob(T job) {
+        if (job.getFinishTimeMs() + FINISHED_JOB_CLEANUP_THRESHOLD_TIME_MS < 
System.currentTimeMillis()) {
+            return;
+        }
+        try {
+            Env.getCurrentEnv().getJobManager().unregisterJob(job.getJobId());
+        } catch (JobException e) {
+            log.error("clear finish job error, job id is {}", job.getJobId(), 
e);
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
index efe38b70136..654ee4fbc2f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
@@ -26,10 +26,10 @@ import org.apache.doris.job.exception.JobException;
 
 import com.google.gson.annotations.SerializedName;
 import lombok.Data;
-import lombok.extern.slf4j.Slf4j;
+import lombok.extern.log4j.Log4j2;
 
 @Data
-@Slf4j
+@Log4j2
 public abstract class AbstractTask implements Task {
 
     @SerializedName(value = "jid")
@@ -49,9 +49,12 @@ public abstract class AbstractTask implements Task {
     @SerializedName(value = "tt")
     private TaskType taskType;
 
+    @SerializedName(value = "emg")
+    private String errMsg;
+
     @Override
-    public void onFail(String msg) {
-        status = TaskStatus.FAILD;
+    public void onFail(String msg) throws JobException {
+        status = TaskStatus.FAILED;
         if (!isCallable()) {
             return;
         }
@@ -60,10 +63,10 @@ public abstract class AbstractTask implements Task {
 
     @Override
     public void onFail() throws JobException {
-        if (TaskStatus.CANCEL.equals(status)) {
+        if (TaskStatus.CANCELED.equals(status)) {
             return;
         }
-        status = TaskStatus.FAILD;
+        status = TaskStatus.FAILED;
         setFinishTimeMs(System.currentTimeMillis());
         if (!isCallable()) {
             return;
@@ -73,7 +76,7 @@ public abstract class AbstractTask implements Task {
     }
 
     private boolean isCallable() {
-        if (status.equals(TaskStatus.CANCEL)) {
+        if (status.equals(TaskStatus.CANCELED)) {
             return false;
         }
         if (null != Env.getCurrentEnv().getJobManager().getJob(jobId)) {
@@ -84,6 +87,9 @@ public abstract class AbstractTask implements Task {
 
     @Override
     public void onSuccess() throws JobException {
+        if (TaskStatus.CANCELED.equals(status)) {
+            return;
+        }
         status = TaskStatus.SUCCESS;
         setFinishTimeMs(System.currentTimeMillis());
         if (!isCallable()) {
@@ -99,7 +105,7 @@ public abstract class AbstractTask implements Task {
 
     @Override
     public void cancel() throws JobException {
-        status = TaskStatus.CANCEL;
+        status = TaskStatus.CANCELED;
     }
 
     @Override
@@ -115,12 +121,12 @@ public abstract class AbstractTask implements Task {
             onSuccess();
         } catch (Exception e) {
             onFail();
-            log.warn("execute task error, job id is {},task id is {}", jobId, 
taskId, e);
+            log.warn("execute task error, job id is {}, task id is {}", jobId, 
taskId, e);
         }
     }
 
     public boolean isCancelled() {
-        return status.equals(TaskStatus.CANCEL);
+        return status.equals(TaskStatus.CANCELED);
     }
 
     public String getJobName() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java
index b13d22ff665..48ecf9c29e8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java
@@ -54,7 +54,7 @@ public interface Task {
      *
      * @param msg The error message associated with the failure.
      */
-    void onFail(String msg);
+    void onFail(String msg) throws JobException;
 
     /**
      * This method is called when the task executes successfully.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
index df1e3dbe07b..416f7f764b6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
@@ -52,6 +52,7 @@ public class MTMVJobManager implements MTMVHookService {
 
     /**
      * create MTMVJob
+     *
      * @param mtmv
      * @throws DdlException
      */
@@ -112,6 +113,7 @@ public class MTMVJobManager implements MTMVHookService {
 
     /**
      * drop MTMVJob
+     *
      * @param mtmv
      * @throws DdlException
      */
@@ -142,6 +144,7 @@ public class MTMVJobManager implements MTMVHookService {
 
     /**
      * drop MTMVJob and then create MTMVJob
+     *
      * @param mtmv
      * @param alterMTMV
      * @throws DdlException
@@ -156,6 +159,7 @@ public class MTMVJobManager implements MTMVHookService {
 
     /**
      * trigger MTMVJob
+     *
      * @param info
      * @throws DdlException
      * @throws MetaNotFoundException
@@ -170,7 +174,7 @@ public class MTMVJobManager implements MTMVHookService {
             throw new DdlException("jobs not normal,should have one job,but 
job num is: " + jobs.size());
         }
         try {
-            
Env.getCurrentEnv().getJobManager().triggerJob(jobs.get(0).getJobId());
+            
Env.getCurrentEnv().getJobManager().triggerJob(jobs.get(0).getJobId(), null);
         } catch (JobException e) {
             e.printStackTrace();
             throw new DdlException(e.getMessage());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java
index c4f286ba5cd..9b16497d3c5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java
@@ -588,4 +588,8 @@ public class InsertExecutor {
             throw new AnalysisException(e.getMessage(), e);
         }
     }
+
+    public Coordinator getCoordinator() {
+        return coordinator;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
index 62a8f4d95c6..c17f215e757 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
@@ -162,6 +162,9 @@ public class InsertIntoTableCommand extends Command 
implements ForwardWithSync,
         insertExecutor.finalizeSink(sink, 
physicalOlapTableSink.isPartialUpdate(),
                 physicalOlapTableSink.isFromNativeInsertStmt());
         executor.setProfileType(ProfileType.LOAD);
+        // We exposed @StmtExecutor#cancel as a unified entry point for 
statement interruption
+        // so we need to set this here
+        executor.setCoord(insertExecutor.getCoordinator());
         insertExecutor.executeSingleInsertTransaction(executor, jobId);
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index 5a2bae1ee05..7b41715a425 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -37,6 +37,7 @@ import org.apache.doris.analysis.AlterColumnStatsStmt;
 import org.apache.doris.analysis.AlterDatabasePropertyStmt;
 import org.apache.doris.analysis.AlterDatabaseQuotaStmt;
 import org.apache.doris.analysis.AlterDatabaseRename;
+import org.apache.doris.analysis.AlterJobStatusStmt;
 import org.apache.doris.analysis.AlterPolicyStmt;
 import org.apache.doris.analysis.AlterResourceStmt;
 import org.apache.doris.analysis.AlterRoutineLoadStmt;
@@ -51,6 +52,7 @@ import org.apache.doris.analysis.CancelAlterSystemStmt;
 import org.apache.doris.analysis.CancelAlterTableStmt;
 import org.apache.doris.analysis.CancelBackupStmt;
 import org.apache.doris.analysis.CancelExportStmt;
+import org.apache.doris.analysis.CancelJobTaskStmt;
 import org.apache.doris.analysis.CancelLoadStmt;
 import org.apache.doris.analysis.CleanLabelStmt;
 import org.apache.doris.analysis.CleanProfileStmt;
@@ -95,7 +97,6 @@ import org.apache.doris.analysis.DropWorkloadGroupStmt;
 import org.apache.doris.analysis.GrantStmt;
 import org.apache.doris.analysis.InstallPluginStmt;
 import org.apache.doris.analysis.KillAnalysisJobStmt;
-import org.apache.doris.analysis.PauseJobStmt;
 import org.apache.doris.analysis.PauseRoutineLoadStmt;
 import org.apache.doris.analysis.PauseSyncJobStmt;
 import org.apache.doris.analysis.RecoverDbStmt;
@@ -106,12 +107,10 @@ import org.apache.doris.analysis.RefreshDbStmt;
 import org.apache.doris.analysis.RefreshLdapStmt;
 import org.apache.doris.analysis.RefreshTableStmt;
 import org.apache.doris.analysis.RestoreStmt;
-import org.apache.doris.analysis.ResumeJobStmt;
 import org.apache.doris.analysis.ResumeRoutineLoadStmt;
 import org.apache.doris.analysis.ResumeSyncJobStmt;
 import org.apache.doris.analysis.RevokeStmt;
 import org.apache.doris.analysis.SetUserPropertyStmt;
-import org.apache.doris.analysis.StopJobStmt;
 import org.apache.doris.analysis.StopRoutineLoadStmt;
 import org.apache.doris.analysis.StopSyncJobStmt;
 import org.apache.doris.analysis.SyncStmt;
@@ -121,8 +120,6 @@ import org.apache.doris.catalog.EncryptKeyHelper;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.util.ProfileManager;
-import org.apache.doris.job.common.JobStatus;
-import org.apache.doris.job.common.JobType;
 import org.apache.doris.load.sync.SyncJobManager;
 import org.apache.doris.persist.CleanQueryStatsInfo;
 import org.apache.doris.statistics.StatisticsRepository;
@@ -190,24 +187,17 @@ public class DdlExecutor {
             } catch (Exception e) {
                 throw new DdlException(e.getMessage());
             }
-        } else if (ddlStmt instanceof StopJobStmt) {
-            StopJobStmt stmt = (StopJobStmt) ddlStmt;
+        } else if (ddlStmt instanceof AlterJobStatusStmt) {
+            AlterJobStatusStmt stmt = (AlterJobStatusStmt) ddlStmt;
             try {
-                env.getJobManager().unregisterJob(stmt.getName(), 
JobType.INSERT);
+                env.getJobManager().alterJobStatus(stmt.getJobName(), 
stmt.getJobStatus());
             } catch (Exception e) {
                 throw new DdlException(e.getMessage());
             }
-        } else if (ddlStmt instanceof PauseJobStmt) {
-            PauseJobStmt stmt = (PauseJobStmt) ddlStmt;
+        } else if (ddlStmt instanceof CancelJobTaskStmt) {
+            CancelJobTaskStmt stmt = (CancelJobTaskStmt) ddlStmt;
             try {
-                env.getJobManager().alterJobStatus(stmt.getName(), 
JobStatus.PAUSED, JobType.INSERT);
-            } catch (Exception e) {
-                throw new DdlException(e.getMessage());
-            }
-        } else if (ddlStmt instanceof ResumeJobStmt) {
-            ResumeJobStmt stmt = (ResumeJobStmt) ddlStmt;
-            try {
-                env.getJobManager().alterJobStatus(stmt.getName(), 
JobStatus.RUNNING, JobType.INSERT);
+                env.getJobManager().cancelTaskById(stmt.getJobName(), 
stmt.getTaskId());
             } catch (Exception e) {
                 throw new DdlException(e.getMessage());
             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index 430f790ecd3..fd8c4896f48 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -1429,7 +1429,7 @@ public class ShowExecutor {
             return;
         }
         org.apache.doris.job.base.AbstractJob job = jobs.get(0);
-        List<AbstractTask> jobTasks = job.queryTasks();
+        List<AbstractTask> jobTasks = job.queryAllTasks();
         if (CollectionUtils.isEmpty(jobTasks)) {
             resultSet = new ShowResultSet(job.getTaskMetaData(), rows);
             return;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index dbb9b3bd57a..29e7aaa3b9c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -177,6 +177,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.protobuf.ByteString;
+import lombok.Setter;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
@@ -214,6 +215,8 @@ public class StmtExecutor {
     private StatementBase parsedStmt;
     private Analyzer analyzer;
     private ProfileType profileType = ProfileType.QUERY;
+
+    @Setter
     private volatile Coordinator coord = null;
     private MasterOpExecutor masterOpExecutor = null;
     private RedirectStatus redirectStatus = null;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
index eadc6c567d3..87d1430375a 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
@@ -48,22 +48,22 @@ public class JobExecutionConfigurationTest {
         configuration.setExecuteType(JobExecuteType.RECURRING);
 
         TimerDefinition timerDefinition = new TimerDefinition();
-        timerDefinition.setStartTimeMs(1000L); // Start time set to 1 second 
in the future
+        timerDefinition.setStartTimeMs(100000L); // Start time set to 1 second 
in the future
         timerDefinition.setInterval(10L); // Interval set to 10 milliseconds
-        timerDefinition.setIntervalUnit(IntervalUnit.SECOND);
+        timerDefinition.setIntervalUnit(IntervalUnit.MINUTE);
         configuration.setTimerDefinition(timerDefinition);
 
         List<Long> delayTimes = configuration.getTriggerDelayTimes(
-                0L, 0L, 11000L);
+                0L, 0L, 1100000L);
 
         Assertions.assertEquals(2, delayTimes.size());
-        Assertions.assertArrayEquals(new Long[]{1L, 11L}, 
delayTimes.toArray());
+        Assertions.assertArrayEquals(new Long[]{100L, 700L}, 
delayTimes.toArray());
         delayTimes = configuration.getTriggerDelayTimes(
-                   2000L, 0L, 11000L);
+                   200000L, 0L, 1100000L);
         Assertions.assertEquals(1, delayTimes.size());
-        Assertions.assertArrayEquals(new Long[]{ 9L}, delayTimes.toArray());
+        Assertions.assertArrayEquals(new Long[]{ 500L}, delayTimes.toArray());
         delayTimes = configuration.getTriggerDelayTimes(
-                1001L, 0L, 10000L);
+                1001000L, 0L, 1000000L);
         Assertions.assertEquals(0, delayTimes.size());
     }
 
diff --git a/regression-test/pipeline/p0/conf/fe.conf 
b/regression-test/pipeline/p0/conf/fe.conf
index df5890b5d12..fadfe8066be 100644
--- a/regression-test/pipeline/p0/conf/fe.conf
+++ b/regression-test/pipeline/p0/conf/fe.conf
@@ -95,3 +95,7 @@ edit_log_roll_num = 1000
 history_job_keep_max_second = 300
 streaming_label_keep_max_second = 300
 label_keep_max_second = 300
+
+# job test configurations
+#allows the creation of jobs with an interval of second
+enable_job_schedule_second_for_test = true
diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy 
b/regression-test/suites/job_p0/test_base_insert_job.groovy
index 76d38460ffb..adde94dc6f0 100644
--- a/regression-test/suites/job_p0/test_base_insert_job.groovy
+++ b/regression-test/suites/job_p0/test_base_insert_job.groovy
@@ -25,7 +25,7 @@ suite("test_base_insert_job") {
     def jobName = "insert_recovery_test_base_insert_job"
     sql """drop table if exists `${tableName}` force"""
     sql """
-        STOP JOB for ${jobName}
+        DROP JOB where jobname =  '${jobName}'
     """
 
     sql """
@@ -49,7 +49,7 @@ suite("test_base_insert_job") {
     println jobs
     assert 3>=jobs.size() >= (2 as Boolean) //at least 2 records, some times 3 
records
     sql """
-        STOP JOB for ${jobName}
+        DROP JOB where jobname =  '${jobName}'
     """
     sql """drop table if exists `${tableName}` force """
     sql """
@@ -70,42 +70,76 @@ suite("test_base_insert_job") {
 
     def formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
     def startTime= dateTime.format(formatter);
+    def dataCount = sql """select count(*) from ${tableName}"""
+    assert dataCount.get(0).get(0) == 0
     sql """
-          CREATE JOB ${jobName}  ON SCHEDULER at '${startTime}'   comment 
'test' DO insert into ${tableName} (timestamp, type, user_id) values 
('2023-03-18','1','12213');
+          CREATE JOB ${jobName}  ON SCHEDULER at '${startTime}'   comment 
'test for test&68686781jbjbhj//ncsa' DO insert into ${tableName}  values  
('2023-07-19', sleep(1000), 1001);
      """
 
-    Thread.sleep(2500)
-
-    def datas = sql """select * from ${tableName}"""
+    Thread.sleep(3000)
+    
+    // test cancel task
+    def datas = sql """show job tasks for ${jobName}"""
     println datas
-    //assert datas.size() == 1
+    assert datas.size() == 1
+    println datas.get(0).get(2)
+    assert datas.get(0).get(2) == "RUNNING"
+    def taskId = datas.get(0).get(0)
+    sql """cancel  task where jobName='${jobName}' and taskId= ${taskId}"""
+    def cancelTask = sql """ show job tasks for ${jobName}""" 
+    println cancelTask
+    //check task status
+    assert cancelTask.size() == 1
+    assert cancelTask.get(0).get(2) == "CANCELED"
+    // check table data
+    def dataCount1 = sql """select count(*) from ${tableName}"""
+    assert dataCount1.get(0).get(0) == 0
+    // check job status
+    def oncejob=sql """show job for  ${jobName} """
+    println oncejob
+    assert oncejob.get(0).get(5) == "FINISHED"
+    //assert comment
+    println oncejob.get(0).get(8)
+    //check comment
+    assert oncejob.get(0).get(8) == "test for test&68686781jbjbhj//ncsa"
+ 
     try{
         sql """
             CREATE JOB ${jobName}  ON SCHEDULER at '${startTime}'   comment 
'test' DO insert into ${tableName} (timestamp, type, user_id) values 
('2023-03-18','1','12213');
         """
     } catch (Exception e) {
-        assert true
+        assert e.getMessage().contains("startTimeMs must be greater than 
current time")
     }
     sql """
-        STOP JOB for test_one_time_error_starts
+        DROP JOB where jobname =  'test_one_time_error_starts'
     """
     try{
         sql """
             CREATE JOB test_one_time_error_starts  ON SCHEDULER at '2023-11-13 
14:18:07'   comment 'test' DO insert into ${tableName} (timestamp, type, 
user_id) values ('2023-03-18','1','12213');
         """
     } catch (Exception e) {
-        assert true
+        assert e.getMessage().contains("startTimeMs must be greater than 
current time")
     }
     sql """
-        STOP JOB for test_error_starts
+        DROP JOB where jobname =  'test_error_starts'
     """
     try{
         sql """
             CREATE JOB test_error_starts  ON SCHEDULER every 1 second ends 
'2023-11-13 14:18:07'   comment 'test' DO insert into ${tableName} (timestamp, 
type, user_id) values ('2023-03-18','1','12213');
         """
     } catch (Exception e) {
-        assert true
+        assert e.getMessage().contains("end time cannot be less than start 
time")
     }
 
-
+    sql """
+        DROP JOB where jobname =  'test_error_starts'
+    """
+    try{
+        sql """
+            CREATE JOB test_error_starts  ON SCHEDULER every 1 years ends 
'2023-11-13 14:18:07'   comment 'test' DO insert into ${tableName} (timestamp, 
type, user_id) values ('2023-03-18','1','12213');
+        """
+    } catch (Exception e) {
+        assert e.getMessage().contains("interval time unit can not be years")
+    }
+    
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to