This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f7deb3dea6 [enhancement](bulk-load) cancel loading tasks directly 
without retrying when timeout exceeded (#28666)
2f7deb3dea6 is described below

commit 2f7deb3dea6e5d8514d0492cb4145ddb18a948ca
Author: Siyang Tang <[email protected]>
AuthorDate: Sun Dec 24 21:57:31 2023 +0800

    [enhancement](bulk-load) cancel loading tasks directly without retrying 
when timeout exceeded (#28666)
---
 .../org/apache/doris/load/loadv2/BulkLoadJob.java  |  8 ++++++--
 .../apache/doris/load/loadv2/LoadLoadingTask.java  | 24 +++++++++++-----------
 2 files changed, 18 insertions(+), 14 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
index d5b173d82fc..7a9f160b3f8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
@@ -67,6 +67,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 /**
@@ -124,7 +125,7 @@ public abstract class BulkLoadJob extends LoadJob {
                     break;
                 case SPARK:
                     bulkLoadJob = new SparkLoadJob(db.getId(), 
stmt.getLabel().getLabelName(), stmt.getResourceDesc(),
-                        stmt.getOrigStmt(), stmt.getUserInfo());
+                            stmt.getOrigStmt(), stmt.getUserInfo());
                     break;
                 case MINI:
                 case DELETE:
@@ -209,7 +210,10 @@ public abstract class BulkLoadJob extends LoadJob {
             if (loadTask == null) {
                 return;
             }
-            if (loadTask.getRetryTime() <= 0) {
+            Predicate<LoadTask> isTaskTimeout =
+                    (LoadTask task) -> task instanceof LoadLoadingTask
+                            && ((LoadLoadingTask) task).getLeftTimeMs() <= 0;
+            if (loadTask.getRetryTime() <= 0 || isTaskTimeout.test(loadTask)) {
                 unprotectedExecuteCancel(failMsg, true);
                 logFinalOperation();
                 return;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index fa2eadcfa65..97abfb0db90 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -110,7 +110,7 @@ public class LoadLoadingTask extends LoadTask {
     }
 
     public void init(TUniqueId loadId, List<List<TBrokerFileStatus>> 
fileStatusList,
-            int fileNum, UserIdentity userInfo) throws UserException {
+                     int fileNum, UserIdentity userInfo) throws UserException {
         this.loadId = loadId;
         planner = new LoadingTaskPlanner(callback.getCallbackId(), txnId, 
db.getId(), table, brokerDesc, fileGroups,
                 strictMode, isPartialUpdate, timezone, this.timeoutS, 
this.loadParallelism, this.sendBatchParallelism,
@@ -154,23 +154,23 @@ public class LoadLoadingTask extends LoadTask {
          * here we use exec_mem_limit to directly override the load_mem_limit 
property.
          */
         curCoordinator.setLoadMemLimit(execMemLimit);
-        curCoordinator.setTimeout((int) (getLeftTimeMs() / 1000));
-        curCoordinator.setMemTableOnSinkNode(enableMemTableOnSinkNode);
+
+        long leftTimeMs = getLeftTimeMs();
+        if (leftTimeMs <= 0) {
+            throw new LoadException("failed to execute loading task when 
timeout");
+        }
+        int timeoutS = (int) (leftTimeMs / 1000);
+        curCoordinator.setTimeout(timeoutS);
 
         try {
             QeProcessorImpl.INSTANCE.registerQuery(loadId, curCoordinator);
-            actualExecute(curCoordinator);
+            actualExecute(curCoordinator, timeoutS);
         } finally {
             QeProcessorImpl.INSTANCE.unregisterQuery(loadId);
         }
     }
 
-    private void actualExecute(Coordinator curCoordinator) throws Exception {
-        int waitSecond = (int) (getLeftTimeMs() / 1000);
-        if (waitSecond <= 0) {
-            throw new LoadException("failed to execute plan when the left time 
is less than 0");
-        }
-
+    private void actualExecute(Coordinator curCoordinator, int waitSecond) 
throws Exception {
         if (LOG.isDebugEnabled()) {
             LOG.debug(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId())
                     .add("task_id", signature)
@@ -199,8 +199,8 @@ public class LoadLoadingTask extends LoadTask {
         }
     }
 
-    private long getLeftTimeMs() {
-        return Math.max(jobDeadlineMs - System.currentTimeMillis(), 1000L);
+    public long getLeftTimeMs() {
+        return jobDeadlineMs - System.currentTimeMillis();
     }
 
     private void createProfile(Coordinator coord) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to