[GOBBLIN-400] Allow skipping execution of MR job in MR tasks. Closes #2274 from ibuenros/mr-job-ski
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/fd3a547e Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/fd3a547e Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/fd3a547e Branch: refs/heads/0.12.0 Commit: fd3a547ec2ea5deac87febc8b95268ed63d78240 Parents: af68d7e Author: ibuenros <[email protected]> Authored: Thu Feb 1 13:06:16 2018 -0800 Committer: Issac Buenrostro <[email protected]> Committed: Thu Feb 1 13:06:16 2018 -0800 ---------------------------------------------------------------------- .../gobblin/runtime/mapreduce/MRTask.java | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/fd3a547e/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRTask.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRTask.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRTask.java index 3abee65..a2e56d7 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRTask.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRTask.java @@ -49,6 +49,7 @@ public class MRTask extends BaseAbstractTask { public static final String MR_JOB_STARTED_EVENT = "MRJobStarted"; public static final String MR_JOB_SUCCESSFUL = "MRJobSuccessful"; public static final String MR_JOB_FAILED = "MRJobFailed"; + public static final String MR_JOB_SKIPPED = "MRJobSkipped"; public static final String JOB_URL = "jobTrackingUrl"; public static final String FAILURE_CONTEXT = "failureContext"; @@ -93,6 +94,14 @@ public class MRTask extends BaseAbstractTask { try { Job job = createJob(); + if (job == null) { + log.info("No MR job created. Skipping."); + this.workingState = WorkUnitState.WorkingState.SUCCESSFUL; + this.eventSubmitter.submit(Events.MR_JOB_SKIPPED); + onSkippedMRJob(); + return; + } + job.submit(); this.eventSubmitter.submit(Events.MR_JOB_STARTED_EVENT, Events.JOB_URL, job.getTrackingURL()); job.waitForCompletion(false); @@ -116,6 +125,10 @@ public class MRTask extends BaseAbstractTask { return Maps.newHashMap(); } + /** + * Create the {@link Job} to run in this task. + * @return the {@link Job} to run. If this method returns null, no job will be run and the task will be marked as successful. + */ protected Job createJob() throws IOException { Job job = Job.getInstance(new Configuration()); for (Map.Entry<Object, Object> entry : this.taskContext.getTaskState().getProperties().entrySet()) { @@ -127,4 +140,11 @@ public class MRTask extends BaseAbstractTask { return job; } + /** + * Called when a job is skipped (because {@link #createJob()} returned null). + */ + protected void onSkippedMRJob() { + // do nothing + } + }
