This is an automated email from the ASF dual-hosted git repository.
lijibing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f4bff84dc5f [enhance](mtmv)When drop MTMV, no longer wait for task
cancel to complete (#45995)
f4bff84dc5f is described below
commit f4bff84dc5f5ac54e199f10670bd8e90b898cee2
Author: zhangdong <[email protected]>
AuthorDate: Thu Dec 26 15:25:12 2024 +0800
[enhance](mtmv)When drop MTMV, no longer wait for task cancel to complete
(#45995)
### What problem does this PR solve?
problem:
- when drop db, will hold write lock of catalog, and drop all MTMV
- when drop MTMV, will drop Job,
- when drop Job, will cancel all tasks in this Job
- when cancel task, if task is running insert overwrite, will wait for a
long time
fix:
when drop job, not wait task cacncelled complete
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
### Release note
None
---
.../src/main/java/org/apache/doris/job/base/AbstractJob.java | 8 ++++----
fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java | 2 +-
.../java/org/apache/doris/job/executor/DispatchTaskHandler.java | 2 +-
.../java/org/apache/doris/job/extensions/insert/InsertJob.java | 4 ++--
.../java/org/apache/doris/job/extensions/insert/InsertTask.java | 2 +-
.../main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java | 4 ++--
.../src/main/java/org/apache/doris/job/task/AbstractTask.java | 8 ++++----
fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java | 4 +++-
fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java | 8 ++++++--
9 files changed, 24 insertions(+), 18 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
index 906b86494fb..b6f62f5121b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
@@ -149,12 +149,12 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
private Lock createTaskLock = new ReentrantLock();
@Override
- public void cancelAllTasks() throws JobException {
+ public void cancelAllTasks(boolean needWaitCancelComplete) throws
JobException {
if (CollectionUtils.isEmpty(runningTasks)) {
return;
}
for (T task : runningTasks) {
- task.cancel();
+ task.cancel(needWaitCancelComplete);
canceledTaskCount.incrementAndGet();
}
runningTasks = new CopyOnWriteArrayList<>();
@@ -184,7 +184,7 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
throw new JobException("no running task");
}
runningTasks.stream().filter(task ->
task.getTaskId().equals(taskId)).findFirst()
- .orElseThrow(() -> new JobException("Not found task id: " +
taskId)).cancel();
+ .orElseThrow(() -> new JobException("Not found task id: " +
taskId)).cancel(true);
runningTasks.removeIf(task -> task.getTaskId().equals(taskId));
canceledTaskCount.incrementAndGet();
if (jobConfig.getExecuteType().equals(JobExecuteType.ONE_TIME)) {
@@ -292,7 +292,7 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
this.finishTimeMs = System.currentTimeMillis();
}
if (JobStatus.PAUSED.equals(newJobStatus) ||
JobStatus.STOPPED.equals(newJobStatus)) {
- cancelAllTasks();
+ cancelAllTasks(JobStatus.STOPPED.equals(newJobStatus) ? false :
true);
}
jobStatus = newJobStatus;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
index a7e75554c71..69d1e5e55fb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
@@ -101,7 +101,7 @@ public interface Job<T extends AbstractTask, C> {
* Cancels all running tasks of this job.
* @throws JobException If cancelling a running task fails.
*/
- void cancelAllTasks() throws JobException;
+ void cancelAllTasks(boolean needWaitCancelComplete) throws JobException;
/**
* register job
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
index b8f726c4a0c..56222fd3e1f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
@@ -66,7 +66,7 @@ public class DispatchTaskHandler<T extends AbstractJob>
implements WorkHandler<T
JobType jobType = event.getJob().getJobType();
for (AbstractTask task : tasks) {
if (!disruptorMap.get(jobType).addTask(task)) {
- task.cancel();
+ task.cancel(true);
continue;
}
log.info("dispatch timer job success, job id is {}, task
id is {}",
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
index f4a91498fea..f87eb29d9d8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
@@ -297,12 +297,12 @@ public class InsertJob extends AbstractJob<InsertTask,
Map<Object, Object>> impl
}
@Override
- public void cancelAllTasks() throws JobException {
+ public void cancelAllTasks(boolean needWaitCancelComplete) throws
JobException {
try {
if
(getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
checkAuth("CANCEL LOAD");
}
- super.cancelAllTasks();
+ super.cancelAllTasks(needWaitCancelComplete);
this.failMsg = new FailMsg(FailMsg.CancelType.USER_CANCEL, "user
cancel");
} catch (DdlException e) {
throw new JobException(e);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
index a577250dc86..883c4265316 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
@@ -223,7 +223,7 @@ public class InsertTask extends AbstractTask {
}
@Override
- protected void executeCancelLogic() {
+ protected void executeCancelLogic(boolean needWaitCancelComplete) {
if (isFinished.get() || isCanceled.get()) {
return;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
index 31e6c8353e2..aa1bbe629fd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
@@ -300,10 +300,10 @@ public class MTMVTask extends AbstractTask {
}
@Override
- protected synchronized void executeCancelLogic() {
+ protected synchronized void executeCancelLogic(boolean
needWaitCancelComplete) {
LOG.info("mtmv task cancel, taskId: {}", super.getTaskId());
if (executor != null) {
- executor.cancel(new Status(TStatusCode.CANCELLED, "mtmv task
cancelled"));
+ executor.cancel(new Status(TStatusCode.CANCELLED, "mtmv task
cancelled"), needWaitCancelComplete);
}
after();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
index 8a230c0bd38..b356bc58d32 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
@@ -129,16 +129,16 @@ public abstract class AbstractTask implements Task {
/**
* Cancels the ongoing task, updating its status to {@link
TaskStatus#CANCELED} and releasing associated resources.
* This method encapsulates the core cancellation logic, calling the
abstract method
- * {@link #executeCancelLogic()} for task-specific actions.
+ * {@link #executeCancelLogic(boolean)} for task-specific actions.
*
* @throws JobException If an error occurs during the cancellation
process, a new JobException is thrown wrapping
* the original exception.
*/
@Override
- public void cancel() throws JobException {
+ public void cancel(boolean needWaitCancelComplete) throws JobException {
try {
status = TaskStatus.CANCELED;
- executeCancelLogic();
+ executeCancelLogic(needWaitCancelComplete);
} catch (Exception e) {
log.warn("cancel task failed, job id is {}, task id is {}", jobId,
taskId, e);
throw new JobException(e);
@@ -153,7 +153,7 @@ public abstract class AbstractTask implements Task {
*
* @throws Exception Any exception that might occur during the
cancellation process in the subclass.
*/
- protected abstract void executeCancelLogic() throws Exception;
+ protected abstract void executeCancelLogic(boolean needWaitCancelComplete)
throws Exception;
@Override
public void before() throws JobException {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java
b/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java
index ee205c55c31..d184f647075 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java
@@ -63,8 +63,10 @@ public interface Task {
/**
* This method is called to cancel the execution of the task.
* Implementations should define the necessary steps to cancel the task.
+ *
+ * @param needWaitCancelComplete Do we need to wait for the cancellation
to be completed.
*/
- void cancel() throws JobException;
+ void cancel(boolean needWaitCancelComplete) throws JobException;
/**
* get info for tvf `tasks`
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 055a4c31e90..b155b468a7e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1586,7 +1586,7 @@ public class StmtExecutor {
}
// Because this is called by other thread
- public void cancel(Status cancelReason) {
+ public void cancel(Status cancelReason, boolean needWaitCancelComplete) {
if (masterOpExecutor != null) {
try {
masterOpExecutor.cancel();
@@ -1610,12 +1610,16 @@ public class StmtExecutor {
if (parsedStmt instanceof AnalyzeTblStmt || parsedStmt instanceof
AnalyzeDBStmt) {
Env.getCurrentEnv().getAnalysisManager().cancelSyncTask(context);
}
- if (insertOverwriteTableCommand.isPresent()) {
+ if (insertOverwriteTableCommand.isPresent() && needWaitCancelComplete)
{
// Wait for the command to run or cancel completion
insertOverwriteTableCommand.get().waitNotRunning();
}
}
+ public void cancel(Status cancelReason) {
+ cancel(cancelReason, true);
+ }
+
private Optional<InsertOverwriteTableCommand>
getInsertOverwriteTableCommand() {
if (parsedStmt instanceof LogicalPlanAdapter) {
LogicalPlanAdapter logicalPlanAdapter = (LogicalPlanAdapter)
parsedStmt;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]