Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 66b1fcda9 -> 43c561c95


[GOBBLIN-658] Submit a JobFailed event when an exception is encountered during 
Job orchestration in Gobblin service.[]

Closes #2527 from
sv2000/dagManagerExceptionFailedEvent


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/43c561c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/43c561c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/43c561c9

Branch: refs/heads/master
Commit: 43c561c95a9938e2fd222707ce42b24f9388f7db
Parents: 66b1fcd
Author: suvasude <[email protected]>
Authored: Thu Dec 20 19:21:14 2018 -0800
Committer: Hung Tran <[email protected]>
Committed: Thu Dec 20 19:21:14 2018 -0800

----------------------------------------------------------------------
 .../gobblin/service/modules/orchestration/DagManager.java     | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/43c561c9/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 1494ccf..4f1e10f 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -376,6 +376,7 @@ public class DagManager extends AbstractIdleService {
       JobExecutionPlan jobExecutionPlan = 
DagManagerUtils.getJobExecutionPlan(dagNode);
       jobExecutionPlan.setExecutionStatus(RUNNING);
       JobSpec jobSpec = DagManagerUtils.getJobSpec(dagNode);
+      Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
 
       // Run this spec on selected executor
       SpecProducer producer = null;
@@ -387,7 +388,6 @@ public class DagManager extends AbstractIdleService {
         }
         log.info("Submitting job: {} on executor: {}", jobSpec, producer);
 
-        Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
         log.info("Going to orchestrate JobSpec: {} on Executor: {}", jobSpec, 
producer);
 
         TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ? 
this.eventSubmitter.get().
@@ -401,7 +401,12 @@ public class DagManager extends AbstractIdleService {
 
         log.info("Orchestrated JobSpec: {} on Executor: {}", jobSpec, 
producer);
       } catch (Exception e) {
+        TimingEvent jobFailedTimer = this.eventSubmitter.isPresent() ? 
this.eventSubmitter.get().
+            getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED) : null;
         log.error("Cannot submit job: {} on executor: {}", jobSpec, producer, 
e);
+        if (jobFailedTimer != null) {
+          jobFailedTimer.stop(jobMetadata);
+        }
       }
     }
 

Reply via email to