Repository: incubator-gobblin Updated Branches: refs/heads/master 66b1fcda9 -> 43c561c95
[GOBBLIN-658] Submit a JobFailed event when an exception is encountered during Job orchestration in Gobblin service.[] Closes #2527 from sv2000/dagManagerExceptionFailedEvent Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/43c561c9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/43c561c9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/43c561c9 Branch: refs/heads/master Commit: 43c561c95a9938e2fd222707ce42b24f9388f7db Parents: 66b1fcd Author: suvasude <[email protected]> Authored: Thu Dec 20 19:21:14 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Thu Dec 20 19:21:14 2018 -0800 ---------------------------------------------------------------------- .../gobblin/service/modules/orchestration/DagManager.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/43c561c9/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java index 1494ccf..4f1e10f 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java @@ -376,6 +376,7 @@ public class DagManager extends AbstractIdleService { JobExecutionPlan jobExecutionPlan = DagManagerUtils.getJobExecutionPlan(dagNode); jobExecutionPlan.setExecutionStatus(RUNNING); JobSpec jobSpec = DagManagerUtils.getJobSpec(dagNode); + Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan); // Run this spec on selected executor SpecProducer producer = null; @@ -387,7 +388,6 @@ public class DagManager extends AbstractIdleService { } log.info("Submitting job: {} on executor: {}", jobSpec, producer); - Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan); log.info("Going to orchestrate JobSpec: {} on Executor: {}", jobSpec, producer); TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get(). @@ -401,7 +401,12 @@ public class DagManager extends AbstractIdleService { log.info("Orchestrated JobSpec: {} on Executor: {}", jobSpec, producer); } catch (Exception e) { + TimingEvent jobFailedTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get(). + getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED) : null; log.error("Cannot submit job: {} on executor: {}", jobSpec, producer, e); + if (jobFailedTimer != null) { + jobFailedTimer.stop(jobMetadata); + } } }
