This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 59c7d50  [Bug][Load] Catch retry submit exception (#4796)
59c7d50 is described below

commit 59c7d5021d1e6e9a4859a1505bfe983ef7acbcef
Author: xinghuayu007 <[email protected]>
AuthorDate: Sun Nov 8 20:50:50 2020 +0800

    [Bug][Load] Catch retry submit exception (#4796)
    
    When `Load Job Task Queue` is filled, continue to submit more jobs to the 
queue will cause
    `RejectedExecutionException`.
    But `callback.onTaskFailed` function does not catch the exception, that 
will cause
    re-submitting job failed, and status is not updated to failed.
    issue: #4795
---
 .../main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java  |  8 ++++++++
 .../main/java/org/apache/doris/load/loadv2/BulkLoadJob.java    | 10 ++++++++--
 .../java/org/apache/doris/load/loadv2/LoadJobScheduler.java    |  7 +++++++
 3 files changed, 23 insertions(+), 2 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 9ca4c77..a253dbe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -52,6 +52,7 @@ import org.apache.logging.log4j.Logger;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.RejectedExecutionException;
 
 /**
  * There are 3 steps in BrokerLoadJob: BrokerPendingTask, LoadLoadingTask, 
CommitAndPublishTxn.
@@ -155,6 +156,13 @@ public class BrokerLoadJob extends BulkLoadJob {
                              .build(), e);
             cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_RUN_FAIL, 
e.getMessage()), true, true);
             return;
+        } catch (RejectedExecutionException e) {
+            LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
+                    .add("database_id", dbId)
+                    .add("error_msg", "the task queque is full.")
+                    .build(), e);
+            cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_RUN_FAIL, 
e.getMessage()), true, true);
+            return;
         }
 
         loadStartTimestamp = System.currentTimeMillis();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
index 04f3d93..1d649c6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
@@ -58,6 +58,7 @@ import java.io.StringReader;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.RejectedExecutionException;
 
 /**
  * parent class of BrokerLoadJob and SparkLoadJob from load stmt
@@ -233,8 +234,13 @@ public abstract class BulkLoadJob extends LoadJob {
                 loadTask.updateRetryInfo();
                 idToTasks.put(loadTask.getSignature(), loadTask);
                 // load id will be added to loadStatistic when executing this 
task
-                
Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(loadTask);
-                return;
+                try {
+                    
Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(loadTask);
+                } catch (RejectedExecutionException e) {
+                    unprotectedExecuteCancel(failMsg, true);
+                    logFinalOperation();
+                    return;
+                }
             }
         } finally {
             writeUnlock();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java
index 9a2b691..33d4677 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java
@@ -35,6 +35,7 @@ import org.apache.logging.log4j.Logger;
 
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
 
 /**
  * LoadScheduler will schedule the pending LoadJob which belongs to 
LoadManager.
@@ -99,6 +100,12 @@ public class LoadJobScheduler extends MasterDaemon {
                                  .build(), e);
                 needScheduleJobs.put(loadJob);
                 return;
+            } catch (RejectedExecutionException e) {
+                LOG.warn(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId())
+                        .add("error_msg", "Failed to submit etl job. Job queue 
is full.")
+                        .build(), e);
+                loadJob.cancelJobWithoutCheck(new 
FailMsg(FailMsg.CancelType.ETL_SUBMIT_FAIL, e.getMessage()),
+                        true, true);
             }
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to