This is an automated email from the ASF dual-hosted git repository.
zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 84ea93652d9 [Fix](Job)Incorrect task query result of insert type
(#30024)
84ea93652d9 is described below
commit 84ea93652d93ca8e315f31f85fb0645c3e003611
Author: Calvin Kirs <[email protected]>
AuthorDate: Mon Jan 22 14:46:40 2024 +0800
[Fix](Job)Incorrect task query result of insert type (#30024)
- IdToTask has no persistence, so the queried task will be lost once it is
restarted.
- The cancel task does not update metadata after being removed from the
running task.
- tvf displays an error when some fields in the query task result are empty
- cycle scheduling job should not be STOP when task fail
---
.../org/apache/doris/job/base/AbstractJob.java | 15 ++++---
.../doris/job/extensions/insert/InsertJob.java | 46 ++++++++++++++++------
.../doris/job/extensions/insert/InsertTask.java | 30 ++++++++------
.../org/apache/doris/job/manager/JobManager.java | 23 +++++++----
.../apache/doris/job/scheduler/JobScheduler.java | 6 +--
.../suites/job_p0/test_base_insert_job.groovy | 24 +++++++----
6 files changed, 95 insertions(+), 49 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
index 2416a6bca5f..091ac158c1c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
@@ -166,7 +166,7 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
throw new JobException("no running task");
}
runningTasks.stream().filter(task ->
task.getTaskId().equals(taskId)).findFirst()
- .orElseThrow(() -> new JobException("no task id: " +
taskId)).cancel();
+ .orElseThrow(() -> new JobException("Not found task id: " +
taskId)).cancel();
runningTasks.removeIf(task -> task.getTaskId().equals(taskId));
if (jobConfig.getExecuteType().equals(JobExecuteType.ONE_TIME)) {
updateJobStatus(JobStatus.FINISHED);
@@ -289,19 +289,19 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
@Override
public void onTaskFail(T task) throws JobException {
- updateJobStatusIfEnd();
+ updateJobStatusIfEnd(false);
runningTasks.remove(task);
}
@Override
public void onTaskSuccess(T task) throws JobException {
- updateJobStatusIfEnd();
+ updateJobStatusIfEnd(true);
runningTasks.remove(task);
}
- private void updateJobStatusIfEnd() throws JobException {
+ private void updateJobStatusIfEnd(boolean taskSuccess) throws JobException
{
JobExecuteType executeType = getJobConfig().getExecuteType();
if (executeType.equals(JobExecuteType.MANUAL)) {
return;
@@ -309,7 +309,12 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
switch (executeType) {
case ONE_TIME:
case INSTANT:
-
Env.getCurrentEnv().getJobManager().getJob(jobId).updateJobStatus(JobStatus.FINISHED);
+ this.finishTimeMs = System.currentTimeMillis();
+ if (taskSuccess) {
+
Env.getCurrentEnv().getJobManager().getJob(jobId).updateJobStatus(JobStatus.FINISHED);
+ } else {
+
Env.getCurrentEnv().getJobManager().getJob(jobId).updateJobStatus(JobStatus.STOPPED);
+ }
break;
case RECURRING:
TimerDefinition timerDefinition =
getJobConfig().getTimerDefinition();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
index ce918c426f8..15e0c37987f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
@@ -30,6 +30,7 @@ import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
@@ -42,6 +43,7 @@ import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.load.FailMsg;
+import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.mysql.privilege.Privilege;
@@ -272,14 +274,15 @@ public class InsertJob extends AbstractJob<InsertTask,
Map<Object, Object>> impl
}
if (CollectionUtils.isEmpty(historyTaskIdList)) {
historyTaskIdList = new ConcurrentLinkedQueue<>();
- Env.getCurrentEnv().getEditLog().logUpdateJob(this);
historyTaskIdList.add(id);
+ Env.getCurrentEnv().getEditLog().logUpdateJob(this);
return;
}
historyTaskIdList.add(id);
if (historyTaskIdList.size() >= Config.max_persistence_task_count) {
historyTaskIdList.poll();
}
+ Env.getCurrentEnv().getEditLog().logUpdateJob(this);
}
@Override
@@ -320,22 +323,44 @@ public class InsertJob extends AbstractJob<InsertTask,
Map<Object, Object>> impl
}
//TODO it's will be refactor, we will storage task info in job inner
and query from it
List<Long> taskIdList = new ArrayList<>(this.historyTaskIdList);
+ if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
+ Collections.reverse(taskIdList);
+ return queryLoadTasksByTaskIds(taskIdList);
+ }
+ List<LoadJob> loadJobs =
Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIdList);
+ if (CollectionUtils.isEmpty(loadJobs)) {
+ return new ArrayList<>();
+ }
+ List<InsertTask> tasks = new ArrayList<>();
+ loadJobs.forEach(loadJob -> {
+ InsertTask task;
+ try {
+ task = new InsertTask(loadJob.getLabel(),
loadJob.getDb().getFullName(), null, getCreateUser());
+ task.setCreateTimeMs(loadJob.getCreateTimestamp());
+ } catch (MetaNotFoundException e) {
+ log.warn("load job not found, job id is {}", loadJob.getId());
+ return;
+ }
+ task.setJobId(getJobId());
+ task.setTaskId(loadJob.getId());
+ task.setJobInfo(loadJob);
+ tasks.add(task);
+ });
+ return tasks;
- Collections.reverse(taskIdList);
- return queryLoadTasksByTaskIds(taskIdList);
}
public List<InsertTask> queryLoadTasksByTaskIds(List<Long> taskIdList) {
if (taskIdList.isEmpty()) {
return new ArrayList<>();
}
- List<InsertTask> jobs = new ArrayList<>();
+ List<InsertTask> tasks = new ArrayList<>();
taskIdList.forEach(id -> {
if (null != idToTasks.get(id)) {
- jobs.add(idToTasks.get(id));
+ tasks.add(idToTasks.get(id));
}
});
- return jobs;
+ return tasks;
}
@Override
@@ -354,14 +379,11 @@ public class InsertJob extends AbstractJob<InsertTask,
Map<Object, Object>> impl
}
@Override
- public void onTaskFail(InsertTask task) {
- try {
- updateJobStatus(JobStatus.STOPPED);
+ public void onTaskFail(InsertTask task) throws JobException {
+ if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
this.failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL,
task.getErrMsg());
- } catch (JobException e) {
- throw new RuntimeException(e);
}
- getRunningTasks().remove(task);
+ super.onTaskFail(task);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
index e85e7a1b027..b5d8ea7fc17 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
@@ -21,11 +21,11 @@ import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ScalarType;
-import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.load.FailMsg;
+import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
@@ -89,7 +89,7 @@ public class InsertTask extends AbstractTask {
@Getter
@Setter
- private InsertJob jobInfo;
+ private LoadJob jobInfo;
private TaskType taskType = TaskType.PENDING;
private MergeType mergeType = MergeType.APPEND;
@@ -127,7 +127,7 @@ public class InsertTask extends AbstractTask {
}
public InsertTask(String labelName, InsertIntoTableCommand insertInto,
- ConnectContext ctx, StmtExecutor executor,
LoadStatistic statistic) {
+ ConnectContext ctx, StmtExecutor executor, LoadStatistic
statistic) {
this.labelName = labelName;
this.command = insertInto;
this.userIdentity = ctx.getCurrentUserIdentity();
@@ -216,23 +216,27 @@ public class InsertTask extends AbstractTask {
// if task not start, load job is null,return pending task show
info
return getPendingTaskTVFInfo();
}
- trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(jobInfo.getJobId())));
+ trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(jobInfo.getId())));
trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(getJobId())));
trow.addToColumnValue(new TCell().setStringVal(labelName));
- trow.addToColumnValue(new
TCell().setStringVal(jobInfo.getJobStatus().name()));
+ trow.addToColumnValue(new
TCell().setStringVal(jobInfo.getState().name()));
// err msg
- String errMsg = FeConstants.null_string;
+ String errMsg = "";
if (failMsg != null) {
errMsg = "type:" + failMsg.getCancelType() + "; msg:" +
failMsg.getMsg();
}
trow.addToColumnValue(new TCell().setStringVal(errMsg));
// create time
- trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getCreateTimeMs())));
+ trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getCreateTimestamp())));
// load end time
- trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getFinishTimeMs())));
+ trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getFinishTimestamp())));
// tracking url
trow.addToColumnValue(new TCell().setStringVal(trackingUrl));
- trow.addToColumnValue(new
TCell().setStringVal(loadStatistic.toJson()));
+ if (null != loadStatistic) {
+ trow.addToColumnValue(new
TCell().setStringVal(loadStatistic.toJson()));
+ } else {
+ trow.addToColumnValue(new TCell().setStringVal(""));
+ }
trow.addToColumnValue(new
TCell().setStringVal(userIdentity.getQualifiedUser()));
return trow;
}
@@ -244,11 +248,11 @@ public class InsertTask extends AbstractTask {
trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(getJobId())));
trow.addToColumnValue(new TCell().setStringVal(getJobId() +
LABEL_SPLITTER + getTaskId()));
trow.addToColumnValue(new TCell().setStringVal(getStatus().name()));
- trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
+ trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
- trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
- trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
- trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
+ trow.addToColumnValue(new TCell().setStringVal(""));
+ trow.addToColumnValue(new TCell().setStringVal(""));
+ trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new
TCell().setStringVal(userIdentity.getQualifiedUser()));
return trow;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index b069fd5eca6..7e8b01ce287 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -137,16 +137,21 @@ public class JobManager<T extends AbstractJob<?, C>, C>
implements Writable {
* @param ifExists is is true, if job not exist,we will ignore job not
exist exception, else throw exception
*/
public void unregisterJob(String jobName, boolean ifExists) throws
JobException {
- T dropJob = null;
- for (T job : jobMap.values()) {
- if (job.getJobName().equals(jobName)) {
- dropJob = job;
+ try {
+ T dropJob = null;
+ for (T job : jobMap.values()) {
+ if (job.getJobName().equals(jobName)) {
+ dropJob = job;
+ }
}
+ if (dropJob == null && ifExists) {
+ return;
+ }
+ dropJob(dropJob, jobName);
+ } catch (Exception e) {
+ log.error("drop job error, jobName:" + jobName, e);
+ throw new JobException("unregister job error, jobName:" + jobName);
}
- if (dropJob == null && ifExists) {
- return;
- }
- dropJob(dropJob, jobName);
}
private void dropJob(T dropJob, String jobName) throws JobException {
@@ -284,6 +289,7 @@ public class JobManager<T extends AbstractJob<?, C>, C>
implements Writable {
for (T job : jobMap.values()) {
if (job.getJobName().equals(jobName)) {
job.cancelTaskById(taskId);
+ job.logUpdateOperation();
return;
}
}
@@ -378,6 +384,7 @@ public class JobManager<T extends AbstractJob<?, C>, C>
implements Writable {
}
}
+ //todo it's not belong to JobManager
public void cancelLoadJob(CancelLoadStmt cs)
throws JobException, AnalysisException, DdlException {
String dbName = cs.getDbName();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
index 597e39d96ed..a104d3895e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
@@ -184,8 +184,8 @@ public class JobScheduler<T extends AbstractJob<?, C>, C>
implements Closeable {
}
for (Map.Entry<Long, T> entry : jobMap.entrySet()) {
T job = entry.getValue();
- if (job.getJobStatus().equals(JobStatus.FINISHED)) {
- clearFinishedJob(job);
+ if (job.getJobStatus().equals(JobStatus.FINISHED) ||
job.getJobStatus().equals(JobStatus.STOPPED)) {
+ clearEndJob(job);
continue;
}
if (!job.getJobStatus().equals(JobStatus.RUNNING) &&
!job.getJobConfig().checkIsTimerJob()) {
@@ -195,7 +195,7 @@ public class JobScheduler<T extends AbstractJob<?, C>, C>
implements Closeable {
}
}
- private void clearFinishedJob(T job) {
+ private void clearEndJob(T job) {
if (job.getFinishTimeMs() + FINISHED_JOB_CLEANUP_THRESHOLD_TIME_MS <
System.currentTimeMillis()) {
return;
}
diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy
b/regression-test/suites/job_p0/test_base_insert_job.groovy
index f4db5907fa2..d9ebb832152 100644
--- a/regression-test/suites/job_p0/test_base_insert_job.groovy
+++ b/regression-test/suites/job_p0/test_base_insert_job.groovy
@@ -71,9 +71,18 @@ suite("test_base_insert_job") {
CREATE JOB ${jobName} ON SCHEDULE every 1 second comment 'test' DO
insert into ${tableName} (timestamp, type, user_id) values
('2023-03-18','1','12213');
"""
Thread.sleep(2500)
- def jobs = sql """select * from ${tableName}"""
- println jobs
- assert 3 >= jobs.size() >= (2 as Boolean) //at least 2 records, some times
3 records
+ sql """
+ PAUSE JOB where jobname = '${jobName}'
+ """
+ def tblDatas = sql """select * from ${tableName}"""
+ println tblDatas
+ assert 3 >= tblDatas.size() >= (2 as Boolean) //at least 2 records, some
times 3 records
+ def pauseJobId = sql """select id from jobs("type"="insert") where
Name='${jobName}'"""
+ def taskStatus = sql """select status from tasks("type"="insert") where
jobid= '${pauseJobId.get(0).get(0)}'"""
+ println taskStatus
+ for (int i = 0; i < taskStatus.size(); i++) {
+ assert taskStatus.get(i).get(0) != "FAILED"||taskStatus.get(i).get(0)
!= "STOPPED"||taskStatus.get(i).get(0) != "STOPPED"
+ }
sql """
CREATE JOB ${jobMixedName} ON SCHEDULE every 1 second DO insert into
${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213');
"""
@@ -132,9 +141,8 @@ suite("test_base_insert_job") {
sql """cancel task where jobName='${jobName}' and taskId= ${taskId}"""
def cancelTask = sql """ select status from tasks("type"="insert") where
jobid= ${onceJobId}"""
println cancelTask
- //check task status
- assert cancelTask.size() == 1
- assert cancelTask.get(0).get(0) == "CANCELED"
+ //check task size is 0, cancel task where be deleted
+ assert cancelTask.size() == 0
// check table data
def dataCount1 = sql """select count(1) from ${tableName}"""
assert dataCount1.get(0).get(0) == 0
@@ -161,14 +169,14 @@ suite("test_base_insert_job") {
assert job.size() == 1
def jobId = job.get(0).get(0);
def tasks = sql """ select status from tasks("type"="insert") where jobid=
${jobId} """
- assert tasks.size() == 1
+ assert tasks.size() == 0
sql """
RESUME JOB where jobname = '${jobName}'
"""
Thread.sleep(2500)
def resumeTasks = sql """ select status from tasks("type"="insert") where
jobid= ${jobId} """
println resumeTasks
- assert resumeTasks.size() == 2
+ assert resumeTasks.size() == 1
// assert same job name
try {
sql """
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]