This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 0213b1e [GOBBLIN-1597] Add error handling in dagmanager to continue
if dag fails to process,… (#3452)
0213b1e is described below
commit 0213b1e89822f5d53f27a3d963f917e0d871886f
Author: William Lo <[email protected]>
AuthorDate: Wed Jan 19 11:51:25 2022 -0800
[GOBBLIN-1597] Add error handling in dagmanager to continue if dag fails to
process,… (#3452)
* Add error handling in dagmanager to continue if dag fails to process,
make Azkaban client retry on timeouts
* Addressed comments
---
.../modules/orchestration/AzkabanClient.java | 2 +
.../service/modules/orchestration/DagManager.java | 84 ++++++++++++----------
2 files changed, 47 insertions(+), 39 deletions(-)
diff --git
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
index c8e5380..70d8b06 100644
---
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
+++
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
@@ -42,6 +42,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import lombok.Builder;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ObjectUtils;
@@ -112,6 +113,7 @@ public class AzkabanClient implements Closeable {
this.retryer = RetryerBuilder.<AzkabanClientStatus>newBuilder()
.retryIfExceptionOfType(InvalidSessionException.class)
+ .retryIfExceptionOfType(TimeoutException.class)
.withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(this.requestTimeout.toMillis(),
TimeUnit.MILLISECONDS,
this.executorService))
.withWaitStrategy(WaitStrategies.exponentialWait(60, TimeUnit.SECONDS))
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index d29fc5c..354faed 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -709,48 +709,54 @@ public class DagManager extends AbstractIdleService {
List<DagNode<JobExecutionPlan>> nodesToCleanUp = Lists.newArrayList();
for (DagNode<JobExecutionPlan> node : this.jobToDag.keySet()) {
- boolean slaKilled = slaKillIfNeeded(node);
-
- JobStatus jobStatus = pollJobStatus(node);
-
- boolean killOrphanFlow = killJobIfOrphaned(node, jobStatus);
-
- ExecutionStatus status = getJobExecutionStatus(slaKilled,
killOrphanFlow, jobStatus);
+ try {
+ boolean slaKilled = slaKillIfNeeded(node);
- JobExecutionPlan jobExecutionPlan =
DagManagerUtils.getJobExecutionPlan(node);
+ JobStatus jobStatus = pollJobStatus(node);
- switch (status) {
- case COMPLETE:
- jobExecutionPlan.setExecutionStatus(COMPLETE);
- nextSubmitted.putAll(onJobFinish(node));
- nodesToCleanUp.add(node);
- break;
- case FAILED:
- jobExecutionPlan.setExecutionStatus(FAILED);
- nextSubmitted.putAll(onJobFinish(node));
- nodesToCleanUp.add(node);
- break;
- case CANCELLED:
- jobExecutionPlan.setExecutionStatus(CANCELLED);
- nextSubmitted.putAll(onJobFinish(node));
- nodesToCleanUp.add(node);
- break;
- case PENDING:
- jobExecutionPlan.setExecutionStatus(PENDING);
- break;
- case PENDING_RETRY:
- jobExecutionPlan.setExecutionStatus(PENDING_RETRY);
- break;
- default:
- jobExecutionPlan.setExecutionStatus(RUNNING);
- break;
- }
+ boolean killOrphanFlow = killJobIfOrphaned(node, jobStatus);
+
+ ExecutionStatus status = getJobExecutionStatus(slaKilled,
killOrphanFlow, jobStatus);
+
+ JobExecutionPlan jobExecutionPlan =
DagManagerUtils.getJobExecutionPlan(node);
+
+ switch (status) {
+ case COMPLETE:
+ jobExecutionPlan.setExecutionStatus(COMPLETE);
+ nextSubmitted.putAll(onJobFinish(node));
+ nodesToCleanUp.add(node);
+ break;
+ case FAILED:
+ jobExecutionPlan.setExecutionStatus(FAILED);
+ nextSubmitted.putAll(onJobFinish(node));
+ nodesToCleanUp.add(node);
+ break;
+ case CANCELLED:
+ jobExecutionPlan.setExecutionStatus(CANCELLED);
+ nextSubmitted.putAll(onJobFinish(node));
+ nodesToCleanUp.add(node);
+ break;
+ case PENDING:
+ jobExecutionPlan.setExecutionStatus(PENDING);
+ break;
+ case PENDING_RETRY:
+ jobExecutionPlan.setExecutionStatus(PENDING_RETRY);
+ break;
+ default:
+ jobExecutionPlan.setExecutionStatus(RUNNING);
+ break;
+ }
- if (jobStatus != null && jobStatus.isShouldRetry()) {
- log.info("Retrying job: {}, current attempts: {}, max attempts: {}",
DagManagerUtils.getFullyQualifiedJobName(node),
- jobStatus.getCurrentAttempts(), jobStatus.getMaxAttempts());
- submitJob(node);
- }
+ if (jobStatus != null && jobStatus.isShouldRetry()) {
+ log.info("Retrying job: {}, current attempts: {}, max attempts:
{}", DagManagerUtils.getFullyQualifiedJobName(node),
+ jobStatus.getCurrentAttempts(), jobStatus.getMaxAttempts());
+ submitJob(node);
+ }
+ } catch (Exception e) {
+ // Error occurred while processing dag, continue processing other
dags assigned to this thread
+ log.error(String.format("Exception caught in DagManager while
processing dag %s due to ",
+ DagManagerUtils.getFullyQualifiedDagName(node)), e);
+ }
}
for (Map.Entry<String, Set<DagNode<JobExecutionPlan>>> entry:
nextSubmitted.entrySet()) {