This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 0213b1e  [GOBBLIN-1597] Add error handling in dagmanager to continue 
if dag fails to process,… (#3452)
0213b1e is described below

commit 0213b1e89822f5d53f27a3d963f917e0d871886f
Author: William Lo <[email protected]>
AuthorDate: Wed Jan 19 11:51:25 2022 -0800

    [GOBBLIN-1597] Add error handling in dagmanager to continue if dag fails to 
process,… (#3452)
    
    * Add error handling in dagmanager to continue if dag fails to process, 
make Azkaban client retry on timeouts
    
    * Addressed comments
---
 .../modules/orchestration/AzkabanClient.java       |  2 +
 .../service/modules/orchestration/DagManager.java  | 84 ++++++++++++----------
 2 files changed, 47 insertions(+), 39 deletions(-)

diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
index c8e5380..70d8b06 100644
--- 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
@@ -42,6 +42,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import lombok.Builder;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.ObjectUtils;
@@ -112,6 +113,7 @@ public class AzkabanClient implements Closeable {
 
     this.retryer = RetryerBuilder.<AzkabanClientStatus>newBuilder()
         .retryIfExceptionOfType(InvalidSessionException.class)
+        .retryIfExceptionOfType(TimeoutException.class)
         
.withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(this.requestTimeout.toMillis(),
 TimeUnit.MILLISECONDS,
             this.executorService))
         .withWaitStrategy(WaitStrategies.exponentialWait(60, TimeUnit.SECONDS))
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index d29fc5c..354faed 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -709,48 +709,54 @@ public class DagManager extends AbstractIdleService {
       List<DagNode<JobExecutionPlan>> nodesToCleanUp = Lists.newArrayList();
 
       for (DagNode<JobExecutionPlan> node : this.jobToDag.keySet()) {
-        boolean slaKilled = slaKillIfNeeded(node);
-
-        JobStatus jobStatus = pollJobStatus(node);
-
-        boolean killOrphanFlow = killJobIfOrphaned(node, jobStatus);
-
-        ExecutionStatus status = getJobExecutionStatus(slaKilled, 
killOrphanFlow, jobStatus);
+        try {
+          boolean slaKilled = slaKillIfNeeded(node);
 
-        JobExecutionPlan jobExecutionPlan = 
DagManagerUtils.getJobExecutionPlan(node);
+          JobStatus jobStatus = pollJobStatus(node);
 
-        switch (status) {
-          case COMPLETE:
-            jobExecutionPlan.setExecutionStatus(COMPLETE);
-            nextSubmitted.putAll(onJobFinish(node));
-            nodesToCleanUp.add(node);
-            break;
-          case FAILED:
-            jobExecutionPlan.setExecutionStatus(FAILED);
-            nextSubmitted.putAll(onJobFinish(node));
-            nodesToCleanUp.add(node);
-            break;
-          case CANCELLED:
-            jobExecutionPlan.setExecutionStatus(CANCELLED);
-            nextSubmitted.putAll(onJobFinish(node));
-            nodesToCleanUp.add(node);
-            break;
-          case PENDING:
-            jobExecutionPlan.setExecutionStatus(PENDING);
-            break;
-          case PENDING_RETRY:
-            jobExecutionPlan.setExecutionStatus(PENDING_RETRY);
-            break;
-          default:
-            jobExecutionPlan.setExecutionStatus(RUNNING);
-            break;
-        }
+          boolean killOrphanFlow = killJobIfOrphaned(node, jobStatus);
+
+          ExecutionStatus status = getJobExecutionStatus(slaKilled, 
killOrphanFlow, jobStatus);
+
+          JobExecutionPlan jobExecutionPlan = 
DagManagerUtils.getJobExecutionPlan(node);
+
+          switch (status) {
+            case COMPLETE:
+              jobExecutionPlan.setExecutionStatus(COMPLETE);
+              nextSubmitted.putAll(onJobFinish(node));
+              nodesToCleanUp.add(node);
+              break;
+            case FAILED:
+              jobExecutionPlan.setExecutionStatus(FAILED);
+              nextSubmitted.putAll(onJobFinish(node));
+              nodesToCleanUp.add(node);
+              break;
+            case CANCELLED:
+              jobExecutionPlan.setExecutionStatus(CANCELLED);
+              nextSubmitted.putAll(onJobFinish(node));
+              nodesToCleanUp.add(node);
+              break;
+            case PENDING:
+              jobExecutionPlan.setExecutionStatus(PENDING);
+              break;
+            case PENDING_RETRY:
+              jobExecutionPlan.setExecutionStatus(PENDING_RETRY);
+              break;
+            default:
+              jobExecutionPlan.setExecutionStatus(RUNNING);
+              break;
+          }
 
-        if (jobStatus != null && jobStatus.isShouldRetry()) {
-          log.info("Retrying job: {}, current attempts: {}, max attempts: {}", 
DagManagerUtils.getFullyQualifiedJobName(node),
-              jobStatus.getCurrentAttempts(), jobStatus.getMaxAttempts());
-          submitJob(node);
-        }
+          if (jobStatus != null && jobStatus.isShouldRetry()) {
+            log.info("Retrying job: {}, current attempts: {}, max attempts: 
{}", DagManagerUtils.getFullyQualifiedJobName(node),
+                jobStatus.getCurrentAttempts(), jobStatus.getMaxAttempts());
+            submitJob(node);
+          }
+        } catch (Exception e) {
+            // Error occurred while processing dag, continue processing other 
dags assigned to this thread
+            log.error(String.format("Exception caught in DagManager while 
processing dag %s due to ",
+                DagManagerUtils.getFullyQualifiedDagName(node)), e);
+          }
       }
 
       for (Map.Entry<String, Set<DagNode<JobExecutionPlan>>> entry: 
nextSubmitted.entrySet()) {

Reply via email to