This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 59c7d50 [Bug][Load] Catch retry submit exception (#4796)
59c7d50 is described below
commit 59c7d5021d1e6e9a4859a1505bfe983ef7acbcef
Author: xinghuayu007 <[email protected]>
AuthorDate: Sun Nov 8 20:50:50 2020 +0800
[Bug][Load] Catch retry submit exception (#4796)
When `Load Job Task Queue` is filled, continue to submit more jobs to the
queue will cause
`RejectedExecutionException`.
But `callback.onTaskFailed` function does not catch the exception, that
will cause
re-submitting job failed, and status is not updated to failed.
issue: #4795
---
.../main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java | 8 ++++++++
.../main/java/org/apache/doris/load/loadv2/BulkLoadJob.java | 10 ++++++++--
.../java/org/apache/doris/load/loadv2/LoadJobScheduler.java | 7 +++++++
3 files changed, 23 insertions(+), 2 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 9ca4c77..a253dbe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -52,6 +52,7 @@ import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.RejectedExecutionException;
/**
* There are 3 steps in BrokerLoadJob: BrokerPendingTask, LoadLoadingTask,
CommitAndPublishTxn.
@@ -155,6 +156,13 @@ public class BrokerLoadJob extends BulkLoadJob {
.build(), e);
cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_RUN_FAIL,
e.getMessage()), true, true);
return;
+ } catch (RejectedExecutionException e) {
+ LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
+ .add("database_id", dbId)
+ .add("error_msg", "the task queque is full.")
+ .build(), e);
+ cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_RUN_FAIL,
e.getMessage()), true, true);
+ return;
}
loadStartTimestamp = System.currentTimeMillis();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
index 04f3d93..1d649c6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
@@ -58,6 +58,7 @@ import java.io.StringReader;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.RejectedExecutionException;
/**
* parent class of BrokerLoadJob and SparkLoadJob from load stmt
@@ -233,8 +234,13 @@ public abstract class BulkLoadJob extends LoadJob {
loadTask.updateRetryInfo();
idToTasks.put(loadTask.getSignature(), loadTask);
// load id will be added to loadStatistic when executing this
task
-
Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(loadTask);
- return;
+ try {
+
Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(loadTask);
+ } catch (RejectedExecutionException e) {
+ unprotectedExecuteCancel(failMsg, true);
+ logFinalOperation();
+ return;
+ }
}
} finally {
writeUnlock();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java
index 9a2b691..33d4677 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java
@@ -35,6 +35,7 @@ import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
/**
* LoadScheduler will schedule the pending LoadJob which belongs to
LoadManager.
@@ -99,6 +100,12 @@ public class LoadJobScheduler extends MasterDaemon {
.build(), e);
needScheduleJobs.put(loadJob);
return;
+ } catch (RejectedExecutionException e) {
+ LOG.warn(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId())
+ .add("error_msg", "Failed to submit etl job. Job queue
is full.")
+ .build(), e);
+ loadJob.cancelJobWithoutCheck(new
FailMsg(FailMsg.CancelType.ETL_SUBMIT_FAIL, e.getMessage()),
+ true, true);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]