This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ac9b3d1132 [Fix](Job)cancel task is not cleared from running task 
(#29114)
0ac9b3d1132 is described below

commit 0ac9b3d1132f9521b213bce9f58e6cb6e99822ec
Author: Calvin Kirs <[email protected]>
AuthorDate: Wed Dec 27 13:59:12 2023 +0800

    [Fix](Job)cancel task is not cleared from running task (#29114)
---
 .../org/apache/doris/job/base/AbstractJob.java     | 19 +++---
 .../doris/job/extensions/insert/InsertJob.java     | 51 ++++++++++-----
 .../org/apache/doris/job/manager/JobManager.java   |  1 +
 .../apache/doris/job/scheduler/JobScheduler.java   |  2 +-
 .../suites/job_p0/test_base_insert_job.groovy      | 72 +++++++++++++++-------
 5 files changed, 98 insertions(+), 47 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
index 6e9cb48da1c..bc8f921d841 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
@@ -89,7 +89,8 @@ public abstract class AbstractJob<T extends AbstractTask, C> 
implements Job<T, C
     @SerializedName(value = "sql")
     String executeSql;
 
-    public AbstractJob() {}
+    public AbstractJob() {
+    }
 
     public AbstractJob(Long id) {
         jobId = id;
@@ -99,10 +100,10 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
      * executeSql and runningTasks is not required for load.
      */
     public AbstractJob(Long jobId, String jobName, JobStatus jobStatus,
-                            String currentDbName,
-                            String comment,
-                            UserIdentity createUser,
-                            JobExecutionConfiguration jobConfig) {
+                       String currentDbName,
+                       String comment,
+                       UserIdentity createUser,
+                       JobExecutionConfiguration jobConfig) {
         this(jobId, jobName, jobStatus, currentDbName, comment,
                 createUser, jobConfig, System.currentTimeMillis(), null, null);
     }
@@ -137,6 +138,7 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
         for (T task : runningTasks) {
             task.cancel();
         }
+        runningTasks = new ArrayList<>();
     }
 
     private static final ImmutableList<String> TITLE_NAMES =
@@ -163,6 +165,7 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
         }
         runningTasks.stream().filter(task -> 
task.getTaskId().equals(taskId)).findFirst()
                 .orElseThrow(() -> new JobException("no task id: " + 
taskId)).cancel();
+        runningTasks.removeIf(task -> task.getTaskId().equals(taskId));
         if (jobConfig.getExecuteType().equals(JobExecuteType.ONE_TIME)) {
             updateJobStatus(JobStatus.FINISHED);
         }
@@ -364,10 +367,12 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
     }
 
     @Override
-    public void onRegister() throws JobException {}
+    public void onRegister() throws JobException {
+    }
 
     @Override
-    public void onUnRegister() throws JobException {}
+    public void onUnRegister() throws JobException {
+    }
 
     @Override
     public void onReplayCreate() throws JobException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
index 9256864efca..90fc126b723 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
@@ -46,6 +46,7 @@ import org.apache.doris.load.loadv2.LoadStatistic;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.mysql.privilege.Privilege;
 import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
+import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.ShowResultSetMetaData;
@@ -67,7 +68,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 
-import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -84,7 +84,7 @@ import java.util.stream.Collectors;
 @EqualsAndHashCode(callSuper = true)
 @Data
 @Slf4j
-public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> {
+public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> 
implements GsonPostProcessable {
 
     public static final ImmutableList<Column> SCHEMA = ImmutableList.of(
             new Column("Id", ScalarType.createStringType()),
@@ -156,6 +156,31 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> {
     // max save task num, do we need to config it?
     private static final int MAX_SAVE_TASK_NUM = 100;
 
+    @Override
+    public void gsonPostProcess() throws IOException {
+        if (null == plans) {
+            plans = new ArrayList<>();
+        }
+        if (null == idToTasks) {
+            idToTasks = new ConcurrentHashMap<>();
+        }
+        if (null == loadStatistic) {
+            loadStatistic = new LoadStatistic();
+        }
+        if (null == finishedTaskIds) {
+            finishedTaskIds = new HashSet<>();
+        }
+        if (null == errorTabletInfos) {
+            errorTabletInfos = new ArrayList<>();
+        }
+        if (null == commitInfos) {
+            commitInfos = new ArrayList<>();
+        }
+        if (null == historyTaskIdList) {
+            historyTaskIdList = new ConcurrentLinkedQueue<>();
+        }
+    }
+
     /**
      * load job type
      */
@@ -197,13 +222,13 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> {
     }
 
     public InsertJob(ConnectContext ctx,
-                      StmtExecutor executor,
-                      String labelName,
-                      List<InsertIntoTableCommand> plans,
-                      Set<String> sinkTableNames,
-                      Map<String, String> properties,
-                      String comment,
-                      JobExecutionConfiguration jobConfig) {
+                     StmtExecutor executor,
+                     String labelName,
+                     List<InsertIntoTableCommand> plans,
+                     Set<String> sinkTableNames,
+                     Map<String, String> properties,
+                     String comment,
+                     JobExecutionConfiguration jobConfig) {
         super(getNextJobId(), labelName, JobStatus.RUNNING, null,
                 comment, ctx.getCurrentUserIdentity(), jobConfig);
         this.ctx = ctx;
@@ -460,14 +485,6 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> {
         return Config.broker_load_default_timeout_second;
     }
 
-
-    public static InsertJob readFields(DataInput in) throws IOException {
-        String jsonJob = Text.readString(in);
-        InsertJob job = GsonUtils.GSON.fromJson(jsonJob, InsertJob.class);
-        job.setRunningTasks(new ArrayList<>());
-        return job;
-    }
-
     @Override
     public void write(DataOutput out) throws IOException {
         Text.writeString(out, GsonUtils.GSON.toJson(this));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index 814d6b773ad..dae75b2b43e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -220,6 +220,7 @@ public class JobManager<T extends AbstractJob<?, C>, C> 
implements Writable {
     }
 
     public void triggerJob(long jobId, C context) throws JobException {
+        log.info("trigger job, job id is {}", jobId);
         checkJobExist(jobId);
         jobScheduler.schedulerInstantJob(jobMap.get(jobId), TaskType.MANUAL, 
context);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
index 03a7792cc50..597e39d96ed 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
@@ -154,7 +154,7 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> 
implements Closeable {
     public void schedulerInstantJob(T job, TaskType taskType, C context) {
         List<? extends AbstractTask> tasks = job.commonCreateTasks(taskType, 
context);
         if (CollectionUtils.isEmpty(tasks)) {
-            log.info("job create task is empty, skip scheduler, job id is 
{},job name is {}", job.getJobId(),
+            log.info("job create task is empty, skip scheduler, job id is {}, 
job name is {}", job.getJobId(),
                     job.getJobName());
             if 
(job.getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
                 job.setJobStatus(JobStatus.FINISHED);
diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy 
b/regression-test/suites/job_p0/test_base_insert_job.groovy
index b41bf50738b..3ef2f86b8eb 100644
--- a/regression-test/suites/job_p0/test_base_insert_job.groovy
+++ b/regression-test/suites/job_p0/test_base_insert_job.groovy
@@ -50,6 +50,10 @@ suite("test_base_insert_job") {
         DROP JOB where jobname =  '${jobMixedName}'
     """
 
+    sql """
+        DROP JOB where jobname =  '${jobName}'
+    """
+
     sql """
         CREATE TABLE IF NOT EXISTS `${tableName}`
         (
@@ -69,7 +73,7 @@ suite("test_base_insert_job") {
     Thread.sleep(2500)
     def jobs = sql """select * from ${tableName}"""
     println jobs
-    assert 3>=jobs.size() >= (2 as Boolean) //at least 2 records, some times 3 
records
+    assert 3 >= jobs.size() >= (2 as Boolean) //at least 2 records, some times 
3 records
     sql """
        CREATE JOB ${jobMixedName}  ON SCHEDULE every 1 second  DO insert into 
${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213');
     """
@@ -83,7 +87,7 @@ suite("test_base_insert_job") {
     sql """
         DROP JOB where jobname =  '${jobMixedName}'
     """
-    
+
     sql """drop table if exists `${tableName}` force """
     sql """
         CREATE TABLE IF NOT EXISTS `${tableName}`
@@ -99,11 +103,11 @@ suite("test_base_insert_job") {
         );
         """
     // Enlarge this parameter to avoid other factors that cause time 
verification to fail when submitting.
-    def currentMs=System.currentTimeMillis()+20000;
-    def   dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(currentMs), 
ZoneId.systemDefault());
+    def currentMs = System.currentTimeMillis() + 20000;
+    def dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(currentMs), 
ZoneId.systemDefault());
 
     def formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
-    def startTime= dateTime.format(formatter);
+    def startTime = dateTime.format(formatter);
     def dataCount = sql """select count(*) from ${tableName}"""
     assert dataCount.get(0).get(0) == 0
     sql """
@@ -113,8 +117,8 @@ suite("test_base_insert_job") {
     Thread.sleep(25000)
     def onceJob = sql """ select id,ExecuteSql from jobs("type"="insert") 
where Name like '%${jobName}%' and ExecuteType='ONE_TIME' """
     assert onceJob.size() == 1
-    def onceJobId= onceJob.get(0).get(0);
-    def onceJobSql= onceJob.get(0).get(1);
+    def onceJobId = onceJob.get(0).get(0);
+    def onceJobSql = onceJob.get(0).get(1);
     println onceJobSql
     def assertSql = "insert into ${tableName}  values  (\'2023-07-19\', 
sleep(10000), 1001);"
     println assertSql
@@ -126,7 +130,7 @@ suite("test_base_insert_job") {
     assert datas.get(0).get(0) == "RUNNING"
     def taskId = datas.get(0).get(1)
     sql """cancel  task where jobName='${jobName}' and taskId= ${taskId}"""
-    def cancelTask = sql """ select status from tasks("type"="insert") where 
jobid= ${onceJobId}""" 
+    def cancelTask = sql """ select status from tasks("type"="insert") where 
jobid= ${onceJobId}"""
     println cancelTask
     //check task status
     assert cancelTask.size() == 1
@@ -135,32 +139,56 @@ suite("test_base_insert_job") {
     def dataCount1 = sql """select count(1) from ${tableName}"""
     assert dataCount1.get(0).get(0) == 0
     // check job status
-    def oncejob=sql """select status,comment from jobs("type"="insert") where 
Name='${jobName}' """
+    def oncejob = sql """select status,comment from jobs("type"="insert") 
where Name='${jobName}' """
     println oncejob
     assert oncejob.get(0).get(0) == "FINISHED"
     //assert comment
     assert oncejob.get(0).get(1) == "test for test&68686781jbjbhj//ncsa"
-    
+    sql """
+        DROP JOB where jobname =  '${jobName}'
+    """
+
+    sql """
+          CREATE JOB ${jobName}  ON SCHEDULE every 1 second   comment 'test 
for test&68686781jbjbhj//ncsa' DO insert into ${tableName}  values  
('2023-07-19', sleep(10000), 1001);
+     """
+
+    Thread.sleep(2500)
+
+    sql """
+        PAUSE JOB where jobname =  '${jobName}'
+    """
+    def job = sql """ select id,ExecuteSql from jobs("type"="insert") where 
Name like '%${jobName}%'  """
+    assert job.size() == 1
+    def jobId = job.get(0).get(0);
+    def tasks = sql """ select status from tasks("type"="insert") where jobid= 
${jobId}  """
+    assert tasks.size() == 1
+    sql """
+        RESUME JOB where jobname =  '${jobName}'
+    """
+    Thread.sleep(2500)
+    def resumeTasks = sql """ select status from tasks("type"="insert") where 
jobid= ${jobId}  """
+    println resumeTasks
+    assert resumeTasks.size() == 2
     // assert same job name
     try {
         sql """
           CREATE JOB ${jobName}  ON SCHEDULE EVERY 10 second   comment 'test 
for test&68686781jbjbhj//ncsa' DO insert into ${tableName}  values  
('2023-07-19', sleep(10000), 1001);
      """
-    }catch (Exception e) {
+    } catch (Exception e) {
         assert e.getMessage().contains("job name exist, 
jobName:insert_recovery_test_base_insert_job")
     }
-    def errorTblName="${tableName}qwertyuioppoiuyte"
+    def errorTblName = "${tableName}qwertyuioppoiuyte"
     sql """drop table if exists `${errorTblName}` force"""
     // assert error table name
     try {
         sql """
           CREATE JOB ${jobName}  ON SCHEDULE EVERY 10 second   comment 'test 
for test&68686781jbjbhj//ncsa' DO insert into ${errorTblName}  values  
('2023-07-19', sleep(10000), 1001);
      """
-    }catch (Exception e) {
+    } catch (Exception e) {
         assert e.getMessage().contains("Unknown table 
't_test_BASE_inSert_jobqwertyuioppoiuyte'")
     }
     // assert not support stmt
-    try{
+    try {
         sql """
             CREATE JOB ${jobName}  ON SCHEDULE at '${startTime}'   comment 
'test' DO update ${tableName} set type=2 where type=1;
         """
@@ -168,7 +196,7 @@ suite("test_base_insert_job") {
         assert e.getMessage().contains("Not support UpdateStmt type in job")
     }
     // assert start time greater than current time
-    try{
+    try {
         sql """
             CREATE JOB ${jobName}  ON SCHEDULE at '${startTime}'   comment 
'test' DO insert into ${tableName} (timestamp, type, user_id) values 
('2023-03-18','1','12213');
         """
@@ -176,14 +204,14 @@ suite("test_base_insert_job") {
         assert e.getMessage().contains("startTimeMs must be greater than 
current time")
     }
     // assert end time less than start time
-    try{
+    try {
         sql """
             CREATE JOB test_one_time_error_starts  ON SCHEDULE at '2023-11-13 
14:18:07'   comment 'test' DO insert into ${tableName} (timestamp, type, 
user_id) values ('2023-03-18','1','12213');
         """
     } catch (Exception e) {
         assert e.getMessage().contains("startTimeMs must be greater than 
current time")
     }
-    try{
+    try {
         sql """
             CREATE JOB inner_test  ON SCHEDULE at '2023-11-13 14:18:07'   
comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values 
('2023-03-18','1','12213');
         """
@@ -191,7 +219,7 @@ suite("test_base_insert_job") {
         assert e.getMessage().contains("job name can not start with inner_")
     }
     // assert end time less than start time
-    try{
+    try {
         sql """
             CREATE JOB test_error_starts  ON SCHEDULE every 1 second ends 
'2023-11-13 14:18:07'   comment 'test' DO insert into ${tableName} (timestamp, 
type, user_id) values ('2023-03-18','1','12213');
         """
@@ -199,7 +227,7 @@ suite("test_base_insert_job") {
         assert e.getMessage().contains("end time cannot be less than start 
time")
     }
     // assert interval time unit can not be years
-    try{
+    try {
         sql """
             CREATE JOB test_error_starts  ON SCHEDULE every 1 years ends 
'2023-11-13 14:18:07'   comment 'test' DO insert into ${tableName} (timestamp, 
type, user_id) values ('2023-03-18','1','12213');
         """
@@ -228,8 +256,8 @@ suite("test_base_insert_job") {
     sql """
         CREATE JOB ENDS  ON SCHEDULE every 20 second   comment 'test' DO 
insert into ${tableName} (timestamp, type, user_id) values 
('2023-03-18','1','12213');
     """
-    
-    def jobCountRsp = sql"""select count(1) from jobs("type"="insert") where 
name in ('JOB','DO','SCHEDULE','AT','STARTS','ENDS')"""
+
+    def jobCountRsp = sql """select count(1) from jobs("type"="insert") where 
name in ('JOB','DO','SCHEDULE','AT','STARTS','ENDS')"""
     assert jobCountRsp.get(0).get(0) == 6
-    
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to