This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 447a3e9eb64 [Fix](Job)Fix some issues in the Insert job. (#44543)
447a3e9eb64 is described below
commit 447a3e9eb64ca44f31d0037ed05c5bdf80b2d4f2
Author: Calvin Kirs <[email protected]>
AuthorDate: Tue Nov 26 11:34:37 2024 +0800
[Fix](Job)Fix some issues in the Insert job. (#44543)
### What problem does this PR solve?
- The job does not account for tasks in the Canceled state.
- When a job is canceled, its status is marked as FAILED, and a
NullPointerException (NPE) occurs because resources have already been
released.
```
java.lang.NullPointerException: Cannot invoke
"org.apache.doris.qe.ConnectContext.getState()" because "this.ctx" is null
at
org.apache.doris.job.extensions.insert.InsertTask.run(InsertTask.java:200)
~[classes/:?]
at
org.apache.doris.job.task.AbstractTask.runTask(AbstractTask.java:167)
~[classes/:?]
at
org.apache.doris.job.executor.DefaultTaskExecutorHandler.onEvent(DefaultTaskExecutorHandler.java:50)
~[classes/:?]
at
org.apache.doris.job.executor.DefaultTaskExecutorHandler.onEvent(DefaultTaskExecutorHandler.java:33)
~[classes/:?]
at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:143)
~[disruptor-3.4.4.jar:?]
at java.lang.Thread.run(Thread.java:833) ~[?:?]15e8716a7ab9">
```
- The RESUME job does not immediately schedule the job.
---
.../src/main/java/org/apache/doris/job/base/AbstractJob.java | 8 +++++---
.../org/apache/doris/job/extensions/insert/InsertTask.java | 3 +++
.../main/java/org/apache/doris/job/manager/JobManager.java | 3 +++
.../src/main/java/org/apache/doris/job/task/AbstractTask.java | 3 +++
regression-test/suites/job_p0/test_base_insert_job.groovy | 11 ++++++++++-
5 files changed, 24 insertions(+), 4 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
index 62ac0c4d59d..906b86494fb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
@@ -155,6 +155,7 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
}
for (T task : runningTasks) {
task.cancel();
+ canceledTaskCount.incrementAndGet();
}
runningTasks = new CopyOnWriteArrayList<>();
logUpdateOperation();
@@ -185,6 +186,7 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
runningTasks.stream().filter(task ->
task.getTaskId().equals(taskId)).findFirst()
.orElseThrow(() -> new JobException("Not found task id: " +
taskId)).cancel();
runningTasks.removeIf(task -> task.getTaskId().equals(taskId));
+ canceledTaskCount.incrementAndGet();
if (jobConfig.getExecuteType().equals(JobExecuteType.ONE_TIME)) {
updateJobStatus(JobStatus.FINISHED);
}
@@ -418,13 +420,13 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
/**
* Generates a common error message when the execution queue is full.
*
- * @param taskId The ID of the task.
- * @param queueConfigName The name of the queue configuration.
+ * @param taskId The ID of the task.
+ * @param queueConfigName The name of the queue configuration.
* @param executeThreadConfigName The name of the execution thread
configuration.
* @return A formatted error message.
*/
protected String commonFormatMsgWhenExecuteQueueFull(Long taskId, String
queueConfigName,
- String
executeThreadConfigName) {
+ String
executeThreadConfigName) {
return String.format("Dispatch task failed, jobId: %d, jobName: %s,
taskId: %d, the queue size is full, "
+ "you can increase the queue size by setting the
property "
+ "%s in the fe.conf file or increase the value of "
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
index c997ebcd30e..23a367d5d6e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
@@ -209,6 +209,9 @@ public class InsertTask extends AbstractTask {
@Override
public void onFail() throws JobException {
+ if (isCanceled.get()) {
+ return;
+ }
isFinished.set(true);
super.onFail();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index 39646bab18f..47a3a0c5c19 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -189,6 +189,9 @@ public class JobManager<T extends AbstractJob<?, C>, C>
implements Writable {
public void alterJobStatus(Long jobId, JobStatus status) throws
JobException {
checkJobExist(jobId);
jobMap.get(jobId).updateJobStatus(status);
+ if (status.equals(JobStatus.RUNNING)) {
+ jobScheduler.scheduleOneJob(jobMap.get(jobId));
+ }
jobMap.get(jobId).logUpdateOperation();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
index f78446aaf85..8a230c0bd38 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
@@ -167,6 +167,9 @@ public abstract class AbstractTask implements Task {
run();
onSuccess();
} catch (Exception e) {
+ if (TaskStatus.CANCELED.equals(status)) {
+ return;
+ }
this.errMsg = e.getMessage();
onFail();
log.warn("execute task error, job id is {}, task id is {}", jobId,
taskId, e);
diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy
b/regression-test/suites/job_p0/test_base_insert_job.groovy
index 19f4422d64f..8a0bb34ca43 100644
--- a/regression-test/suites/job_p0/test_base_insert_job.groovy
+++ b/regression-test/suites/job_p0/test_base_insert_job.groovy
@@ -219,9 +219,11 @@ suite("test_base_insert_job") {
RESUME JOB where jobname = '${jobName}'
"""
println(tasks.size())
+ // test resume job success
Awaitility.await("resume-job-test").atMost(60, SECONDS).until({
def afterResumeTasks = sql """ select status from
tasks("type"="insert") where JobName= '${jobName}' """
println "resume tasks :" + afterResumeTasks
+ //resume tasks size should be greater than before pause
afterResumeTasks.size() > tasks.size()
})
@@ -247,7 +249,6 @@ suite("test_base_insert_job") {
CREATE JOB ${jobName} ON SCHEDULE at '2023-11-13 14:18:07'
comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values
('2023-03-18','1','12213');
"""
} catch (Exception e) {
- println e.getMessage()
assert e.getMessage().contains("startTimeMs must be greater than
current time")
}
// assert end time less than start time
@@ -281,6 +282,14 @@ suite("test_base_insert_job") {
} catch (Exception e) {
assert e.getMessage().contains("Invalid interval time unit: years")
}
+ // assert interval time unit is -1
+ try {
+ sql """
+ CREATE JOB test_error_starts ON SCHEDULE every -1 second
comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values
('2023-03-18','1','12213');
+ """
+ } catch (Exception e) {
+ assert e.getMessage().contains("expecting INTEGER_VALUE")
+ }
// test keyword as job name
sql """
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]