This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0ac9b3d1132 [Fix](Job)cancel task is not cleared from running task
(#29114)
0ac9b3d1132 is described below
commit 0ac9b3d1132f9521b213bce9f58e6cb6e99822ec
Author: Calvin Kirs <[email protected]>
AuthorDate: Wed Dec 27 13:59:12 2023 +0800
[Fix](Job)cancel task is not cleared from running task (#29114)
---
.../org/apache/doris/job/base/AbstractJob.java | 19 +++---
.../doris/job/extensions/insert/InsertJob.java | 51 ++++++++++-----
.../org/apache/doris/job/manager/JobManager.java | 1 +
.../apache/doris/job/scheduler/JobScheduler.java | 2 +-
.../suites/job_p0/test_base_insert_job.groovy | 72 +++++++++++++++-------
5 files changed, 98 insertions(+), 47 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
index 6e9cb48da1c..bc8f921d841 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
@@ -89,7 +89,8 @@ public abstract class AbstractJob<T extends AbstractTask, C>
implements Job<T, C
@SerializedName(value = "sql")
String executeSql;
- public AbstractJob() {}
+ public AbstractJob() {
+ }
public AbstractJob(Long id) {
jobId = id;
@@ -99,10 +100,10 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
* executeSql and runningTasks is not required for load.
*/
public AbstractJob(Long jobId, String jobName, JobStatus jobStatus,
- String currentDbName,
- String comment,
- UserIdentity createUser,
- JobExecutionConfiguration jobConfig) {
+ String currentDbName,
+ String comment,
+ UserIdentity createUser,
+ JobExecutionConfiguration jobConfig) {
this(jobId, jobName, jobStatus, currentDbName, comment,
createUser, jobConfig, System.currentTimeMillis(), null, null);
}
@@ -137,6 +138,7 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
for (T task : runningTasks) {
task.cancel();
}
+ runningTasks = new ArrayList<>();
}
private static final ImmutableList<String> TITLE_NAMES =
@@ -163,6 +165,7 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
}
runningTasks.stream().filter(task ->
task.getTaskId().equals(taskId)).findFirst()
.orElseThrow(() -> new JobException("no task id: " +
taskId)).cancel();
+ runningTasks.removeIf(task -> task.getTaskId().equals(taskId));
if (jobConfig.getExecuteType().equals(JobExecuteType.ONE_TIME)) {
updateJobStatus(JobStatus.FINISHED);
}
@@ -364,10 +367,12 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
}
@Override
- public void onRegister() throws JobException {}
+ public void onRegister() throws JobException {
+ }
@Override
- public void onUnRegister() throws JobException {}
+ public void onUnRegister() throws JobException {
+ }
@Override
public void onReplayCreate() throws JobException {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
index 9256864efca..90fc126b723 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
@@ -46,6 +46,7 @@ import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.mysql.privilege.Privilege;
import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
+import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSetMetaData;
@@ -67,7 +68,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
-import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
@@ -84,7 +84,7 @@ import java.util.stream.Collectors;
@EqualsAndHashCode(callSuper = true)
@Data
@Slf4j
-public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> {
+public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>>
implements GsonPostProcessable {
public static final ImmutableList<Column> SCHEMA = ImmutableList.of(
new Column("Id", ScalarType.createStringType()),
@@ -156,6 +156,31 @@ public class InsertJob extends AbstractJob<InsertTask,
Map<Object, Object>> {
// max save task num, do we need to config it?
private static final int MAX_SAVE_TASK_NUM = 100;
+ @Override
+ public void gsonPostProcess() throws IOException {
+ if (null == plans) {
+ plans = new ArrayList<>();
+ }
+ if (null == idToTasks) {
+ idToTasks = new ConcurrentHashMap<>();
+ }
+ if (null == loadStatistic) {
+ loadStatistic = new LoadStatistic();
+ }
+ if (null == finishedTaskIds) {
+ finishedTaskIds = new HashSet<>();
+ }
+ if (null == errorTabletInfos) {
+ errorTabletInfos = new ArrayList<>();
+ }
+ if (null == commitInfos) {
+ commitInfos = new ArrayList<>();
+ }
+ if (null == historyTaskIdList) {
+ historyTaskIdList = new ConcurrentLinkedQueue<>();
+ }
+ }
+
/**
* load job type
*/
@@ -197,13 +222,13 @@ public class InsertJob extends AbstractJob<InsertTask,
Map<Object, Object>> {
}
public InsertJob(ConnectContext ctx,
- StmtExecutor executor,
- String labelName,
- List<InsertIntoTableCommand> plans,
- Set<String> sinkTableNames,
- Map<String, String> properties,
- String comment,
- JobExecutionConfiguration jobConfig) {
+ StmtExecutor executor,
+ String labelName,
+ List<InsertIntoTableCommand> plans,
+ Set<String> sinkTableNames,
+ Map<String, String> properties,
+ String comment,
+ JobExecutionConfiguration jobConfig) {
super(getNextJobId(), labelName, JobStatus.RUNNING, null,
comment, ctx.getCurrentUserIdentity(), jobConfig);
this.ctx = ctx;
@@ -460,14 +485,6 @@ public class InsertJob extends AbstractJob<InsertTask,
Map<Object, Object>> {
return Config.broker_load_default_timeout_second;
}
-
- public static InsertJob readFields(DataInput in) throws IOException {
- String jsonJob = Text.readString(in);
- InsertJob job = GsonUtils.GSON.fromJson(jsonJob, InsertJob.class);
- job.setRunningTasks(new ArrayList<>());
- return job;
- }
-
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index 814d6b773ad..dae75b2b43e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -220,6 +220,7 @@ public class JobManager<T extends AbstractJob<?, C>, C>
implements Writable {
}
public void triggerJob(long jobId, C context) throws JobException {
+ log.info("trigger job, job id is {}", jobId);
checkJobExist(jobId);
jobScheduler.schedulerInstantJob(jobMap.get(jobId), TaskType.MANUAL,
context);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
index 03a7792cc50..597e39d96ed 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
@@ -154,7 +154,7 @@ public class JobScheduler<T extends AbstractJob<?, C>, C>
implements Closeable {
public void schedulerInstantJob(T job, TaskType taskType, C context) {
List<? extends AbstractTask> tasks = job.commonCreateTasks(taskType,
context);
if (CollectionUtils.isEmpty(tasks)) {
- log.info("job create task is empty, skip scheduler, job id is
{},job name is {}", job.getJobId(),
+ log.info("job create task is empty, skip scheduler, job id is {},
job name is {}", job.getJobId(),
job.getJobName());
if
(job.getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
job.setJobStatus(JobStatus.FINISHED);
diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy
b/regression-test/suites/job_p0/test_base_insert_job.groovy
index b41bf50738b..3ef2f86b8eb 100644
--- a/regression-test/suites/job_p0/test_base_insert_job.groovy
+++ b/regression-test/suites/job_p0/test_base_insert_job.groovy
@@ -50,6 +50,10 @@ suite("test_base_insert_job") {
DROP JOB where jobname = '${jobMixedName}'
"""
+ sql """
+ DROP JOB where jobname = '${jobName}'
+ """
+
sql """
CREATE TABLE IF NOT EXISTS `${tableName}`
(
@@ -69,7 +73,7 @@ suite("test_base_insert_job") {
Thread.sleep(2500)
def jobs = sql """select * from ${tableName}"""
println jobs
- assert 3>=jobs.size() >= (2 as Boolean) //at least 2 records, some times 3
records
+ assert 3 >= jobs.size() >= (2 as Boolean) //at least 2 records, some times
3 records
sql """
CREATE JOB ${jobMixedName} ON SCHEDULE every 1 second DO insert into
${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213');
"""
@@ -83,7 +87,7 @@ suite("test_base_insert_job") {
sql """
DROP JOB where jobname = '${jobMixedName}'
"""
-
+
sql """drop table if exists `${tableName}` force """
sql """
CREATE TABLE IF NOT EXISTS `${tableName}`
@@ -99,11 +103,11 @@ suite("test_base_insert_job") {
);
"""
// Enlarge this parameter to avoid other factors that cause time
verification to fail when submitting.
- def currentMs=System.currentTimeMillis()+20000;
- def dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(currentMs),
ZoneId.systemDefault());
+ def currentMs = System.currentTimeMillis() + 20000;
+ def dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(currentMs),
ZoneId.systemDefault());
def formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- def startTime= dateTime.format(formatter);
+ def startTime = dateTime.format(formatter);
def dataCount = sql """select count(*) from ${tableName}"""
assert dataCount.get(0).get(0) == 0
sql """
@@ -113,8 +117,8 @@ suite("test_base_insert_job") {
Thread.sleep(25000)
def onceJob = sql """ select id,ExecuteSql from jobs("type"="insert")
where Name like '%${jobName}%' and ExecuteType='ONE_TIME' """
assert onceJob.size() == 1
- def onceJobId= onceJob.get(0).get(0);
- def onceJobSql= onceJob.get(0).get(1);
+ def onceJobId = onceJob.get(0).get(0);
+ def onceJobSql = onceJob.get(0).get(1);
println onceJobSql
def assertSql = "insert into ${tableName} values (\'2023-07-19\',
sleep(10000), 1001);"
println assertSql
@@ -126,7 +130,7 @@ suite("test_base_insert_job") {
assert datas.get(0).get(0) == "RUNNING"
def taskId = datas.get(0).get(1)
sql """cancel task where jobName='${jobName}' and taskId= ${taskId}"""
- def cancelTask = sql """ select status from tasks("type"="insert") where
jobid= ${onceJobId}"""
+ def cancelTask = sql """ select status from tasks("type"="insert") where
jobid= ${onceJobId}"""
println cancelTask
//check task status
assert cancelTask.size() == 1
@@ -135,32 +139,56 @@ suite("test_base_insert_job") {
def dataCount1 = sql """select count(1) from ${tableName}"""
assert dataCount1.get(0).get(0) == 0
// check job status
- def oncejob=sql """select status,comment from jobs("type"="insert") where
Name='${jobName}' """
+ def oncejob = sql """select status,comment from jobs("type"="insert")
where Name='${jobName}' """
println oncejob
assert oncejob.get(0).get(0) == "FINISHED"
//assert comment
assert oncejob.get(0).get(1) == "test for test&68686781jbjbhj//ncsa"
-
+ sql """
+ DROP JOB where jobname = '${jobName}'
+ """
+
+ sql """
+ CREATE JOB ${jobName} ON SCHEDULE every 1 second comment 'test
for test&68686781jbjbhj//ncsa' DO insert into ${tableName} values
('2023-07-19', sleep(10000), 1001);
+ """
+
+ Thread.sleep(2500)
+
+ sql """
+ PAUSE JOB where jobname = '${jobName}'
+ """
+ def job = sql """ select id,ExecuteSql from jobs("type"="insert") where
Name like '%${jobName}%' """
+ assert job.size() == 1
+ def jobId = job.get(0).get(0);
+ def tasks = sql """ select status from tasks("type"="insert") where jobid=
${jobId} """
+ assert tasks.size() == 1
+ sql """
+ RESUME JOB where jobname = '${jobName}'
+ """
+ Thread.sleep(2500)
+ def resumeTasks = sql """ select status from tasks("type"="insert") where
jobid= ${jobId} """
+ println resumeTasks
+ assert resumeTasks.size() == 2
// assert same job name
try {
sql """
CREATE JOB ${jobName} ON SCHEDULE EVERY 10 second comment 'test
for test&68686781jbjbhj//ncsa' DO insert into ${tableName} values
('2023-07-19', sleep(10000), 1001);
"""
- }catch (Exception e) {
+ } catch (Exception e) {
assert e.getMessage().contains("job name exist,
jobName:insert_recovery_test_base_insert_job")
}
- def errorTblName="${tableName}qwertyuioppoiuyte"
+ def errorTblName = "${tableName}qwertyuioppoiuyte"
sql """drop table if exists `${errorTblName}` force"""
// assert error table name
try {
sql """
CREATE JOB ${jobName} ON SCHEDULE EVERY 10 second comment 'test
for test&68686781jbjbhj//ncsa' DO insert into ${errorTblName} values
('2023-07-19', sleep(10000), 1001);
"""
- }catch (Exception e) {
+ } catch (Exception e) {
assert e.getMessage().contains("Unknown table
't_test_BASE_inSert_jobqwertyuioppoiuyte'")
}
// assert not support stmt
- try{
+ try {
sql """
CREATE JOB ${jobName} ON SCHEDULE at '${startTime}' comment
'test' DO update ${tableName} set type=2 where type=1;
"""
@@ -168,7 +196,7 @@ suite("test_base_insert_job") {
assert e.getMessage().contains("Not support UpdateStmt type in job")
}
// assert start time greater than current time
- try{
+ try {
sql """
CREATE JOB ${jobName} ON SCHEDULE at '${startTime}' comment
'test' DO insert into ${tableName} (timestamp, type, user_id) values
('2023-03-18','1','12213');
"""
@@ -176,14 +204,14 @@ suite("test_base_insert_job") {
assert e.getMessage().contains("startTimeMs must be greater than
current time")
}
// assert end time less than start time
- try{
+ try {
sql """
CREATE JOB test_one_time_error_starts ON SCHEDULE at '2023-11-13
14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type,
user_id) values ('2023-03-18','1','12213');
"""
} catch (Exception e) {
assert e.getMessage().contains("startTimeMs must be greater than
current time")
}
- try{
+ try {
sql """
CREATE JOB inner_test ON SCHEDULE at '2023-11-13 14:18:07'
comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values
('2023-03-18','1','12213');
"""
@@ -191,7 +219,7 @@ suite("test_base_insert_job") {
assert e.getMessage().contains("job name can not start with inner_")
}
// assert end time less than start time
- try{
+ try {
sql """
CREATE JOB test_error_starts ON SCHEDULE every 1 second ends
'2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp,
type, user_id) values ('2023-03-18','1','12213');
"""
@@ -199,7 +227,7 @@ suite("test_base_insert_job") {
assert e.getMessage().contains("end time cannot be less than start
time")
}
// assert interval time unit can not be years
- try{
+ try {
sql """
CREATE JOB test_error_starts ON SCHEDULE every 1 years ends
'2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp,
type, user_id) values ('2023-03-18','1','12213');
"""
@@ -228,8 +256,8 @@ suite("test_base_insert_job") {
sql """
CREATE JOB ENDS ON SCHEDULE every 20 second comment 'test' DO
insert into ${tableName} (timestamp, type, user_id) values
('2023-03-18','1','12213');
"""
-
- def jobCountRsp = sql"""select count(1) from jobs("type"="insert") where
name in ('JOB','DO','SCHEDULE','AT','STARTS','ENDS')"""
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert") where
name in ('JOB','DO','SCHEDULE','AT','STARTS','ENDS')"""
assert jobCountRsp.get(0).get(0) == 6
-
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]