Repository: incubator-gobblin Updated Branches: refs/heads/master 79878f992 -> 757b25b61
[GOBBLIN-653] Create JobSucceededTimer tracking event to accurately track successful Gobblin jobs.[] Closes #2522 from sv2000/jobSuccessTrackingEvent Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/757b25b6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/757b25b6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/757b25b6 Branch: refs/heads/master Commit: 757b25b611b195865328b52c3a4eb4cfa214e2af Parents: 79878f9 Author: suvasude <[email protected]> Authored: Tue Dec 11 15:25:42 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Tue Dec 11 15:25:42 2018 -0800 ---------------------------------------------------------------------- .../gobblin/metrics/event/TimingEvent.java | 1 + .../gobblin/runtime/AbstractJobLauncher.java | 37 +++++++++++++++----- 2 files changed, 29 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/757b25b6/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java index 8fd1462..0b3defa 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java @@ -40,6 +40,7 @@ public class TimingEvent { public static final String JOB_CANCEL = "JobCancelTimer"; public static final String JOB_COMPLETE = "JobCompleteTimer"; public static final String JOB_FAILED = "JobFailedTimer"; + public static final String JOB_SUCCEEDED = "JobSucceededTimer"; } public static class RunJobTimings { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/757b25b6/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java index 7f359dc..5b008d4 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java @@ -333,6 +333,7 @@ public abstract class AbstractJobLauncher implements JobLauncher { throws JobException { String jobId = this.jobContext.getJobId(); final JobState jobState = this.jobContext.getJobState(); + boolean isWorkUnitsEmpty = false; try { MDC.put(ConfigurationKeys.JOB_NAME_KEY, this.jobContext.getJobName()); @@ -380,14 +381,7 @@ public abstract class AbstractJobLauncher implements JobLauncher { this.eventSubmitter.submit(JobEvent.WORK_UNITS_EMPTY); LOG.warn("No work units have been created for job " + jobId); jobState.setState(JobState.RunningState.COMMITTED); - notifyListeners(this.jobContext, jobListener, TimingEvent.LauncherTimings.JOB_COMPLETE, - new JobListenerAction() { - @Override - public void apply(JobListener jobListener, JobContext jobContext) - throws Exception { - jobListener.onJobCompletion(jobContext); - } - }); + isWorkUnitsEmpty = true; return; } @@ -485,11 +479,28 @@ public abstract class AbstractJobLauncher implements JobLauncher { TimingEvent jobCleanupTimer = this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CLEANUP); cleanupStagingData(jobState); jobCleanupTimer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext, EventName.JOB_CLEANUP)); - // Write job execution info to the job history store upon job termination this.jobContext.storeJobExecutionInfo(); } finally { launchJobTimer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext, EventName.FULL_JOB_EXECUTION)); + if (isWorkUnitsEmpty) { + //If no WorkUnits are created, first send the JobCompleteTimer event. + notifyListeners(this.jobContext, jobListener, TimingEvent.LauncherTimings.JOB_COMPLETE, new JobListenerAction() { + @Override + public void apply(JobListener jobListener, JobContext jobContext) + throws Exception { + jobListener.onJobCompletion(jobContext); + } + }); + //Next, send the JobSucceededTimer event. + notifyListeners(this.jobContext, jobListener, TimingEvent.LauncherTimings.JOB_SUCCEEDED, new JobListenerAction() { + @Override + public void apply(JobListener jobListener, JobContext jobContext) + throws Exception { + jobListener.onJobFailure(jobContext); + } + }); + } } } @@ -519,6 +530,14 @@ public abstract class AbstractJobLauncher implements JobLauncher { } }); throw new JobException(String.format("Job %s failed", jobId)); + } else { + notifyListeners(this.jobContext, jobListener, TimingEvent.LauncherTimings.JOB_SUCCEEDED, new JobListenerAction() { + @Override + public void apply(JobListener jobListener, JobContext jobContext) + throws Exception { + jobListener.onJobFailure(jobContext); + } + }); } } finally { // Stop metrics reporting
