This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2f7deb3dea6 [enhancement](bulk-load) cancel loading tasks directly
without retrying when timeout exceeded (#28666)
2f7deb3dea6 is described below
commit 2f7deb3dea6e5d8514d0492cb4145ddb18a948ca
Author: Siyang Tang <[email protected]>
AuthorDate: Sun Dec 24 21:57:31 2023 +0800
[enhancement](bulk-load) cancel loading tasks directly without retrying
when timeout exceeded (#28666)
---
.../org/apache/doris/load/loadv2/BulkLoadJob.java | 8 ++++++--
.../apache/doris/load/loadv2/LoadLoadingTask.java | 24 +++++++++++-----------
2 files changed, 18 insertions(+), 14 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
index d5b173d82fc..7a9f160b3f8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
@@ -67,6 +67,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
@@ -124,7 +125,7 @@ public abstract class BulkLoadJob extends LoadJob {
break;
case SPARK:
bulkLoadJob = new SparkLoadJob(db.getId(),
stmt.getLabel().getLabelName(), stmt.getResourceDesc(),
- stmt.getOrigStmt(), stmt.getUserInfo());
+ stmt.getOrigStmt(), stmt.getUserInfo());
break;
case MINI:
case DELETE:
@@ -209,7 +210,10 @@ public abstract class BulkLoadJob extends LoadJob {
if (loadTask == null) {
return;
}
- if (loadTask.getRetryTime() <= 0) {
+ Predicate<LoadTask> isTaskTimeout =
+ (LoadTask task) -> task instanceof LoadLoadingTask
+ && ((LoadLoadingTask) task).getLeftTimeMs() <= 0;
+ if (loadTask.getRetryTime() <= 0 || isTaskTimeout.test(loadTask)) {
unprotectedExecuteCancel(failMsg, true);
logFinalOperation();
return;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index fa2eadcfa65..97abfb0db90 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -110,7 +110,7 @@ public class LoadLoadingTask extends LoadTask {
}
public void init(TUniqueId loadId, List<List<TBrokerFileStatus>>
fileStatusList,
- int fileNum, UserIdentity userInfo) throws UserException {
+ int fileNum, UserIdentity userInfo) throws UserException {
this.loadId = loadId;
planner = new LoadingTaskPlanner(callback.getCallbackId(), txnId,
db.getId(), table, brokerDesc, fileGroups,
strictMode, isPartialUpdate, timezone, this.timeoutS,
this.loadParallelism, this.sendBatchParallelism,
@@ -154,23 +154,23 @@ public class LoadLoadingTask extends LoadTask {
* here we use exec_mem_limit to directly override the load_mem_limit
property.
*/
curCoordinator.setLoadMemLimit(execMemLimit);
- curCoordinator.setTimeout((int) (getLeftTimeMs() / 1000));
- curCoordinator.setMemTableOnSinkNode(enableMemTableOnSinkNode);
+
+ long leftTimeMs = getLeftTimeMs();
+ if (leftTimeMs <= 0) {
+ throw new LoadException("failed to execute loading task when
timeout");
+ }
+ int timeoutS = (int) (leftTimeMs / 1000);
+ curCoordinator.setTimeout(timeoutS);
try {
QeProcessorImpl.INSTANCE.registerQuery(loadId, curCoordinator);
- actualExecute(curCoordinator);
+ actualExecute(curCoordinator, timeoutS);
} finally {
QeProcessorImpl.INSTANCE.unregisterQuery(loadId);
}
}
- private void actualExecute(Coordinator curCoordinator) throws Exception {
- int waitSecond = (int) (getLeftTimeMs() / 1000);
- if (waitSecond <= 0) {
- throw new LoadException("failed to execute plan when the left time
is less than 0");
- }
-
+ private void actualExecute(Coordinator curCoordinator, int waitSecond)
throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId())
.add("task_id", signature)
@@ -199,8 +199,8 @@ public class LoadLoadingTask extends LoadTask {
}
}
- private long getLeftTimeMs() {
- return Math.max(jobDeadlineMs - System.currentTimeMillis(), 1000L);
+ public long getLeftTimeMs() {
+ return jobDeadlineMs - System.currentTimeMillis();
}
private void createProfile(Coordinator coord) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]