Repository: crunch Updated Branches: refs/heads/apache-crunch-0.8 652963cfb -> fd9778c90
CRUNCH-355: Make the sequence ids correct in the Crunch-generated job names Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/fd9778c9 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/fd9778c9 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/fd9778c9 Branch: refs/heads/apache-crunch-0.8 Commit: fd9778c908a0a192ffd5c64d4f49df65d025c93b Parents: 652963c Author: Josh Wills <[email protected]> Authored: Tue May 27 20:16:37 2014 -0700 Committer: Josh Wills <[email protected]> Committed: Thu May 29 09:44:12 2014 -0700 ---------------------------------------------------------------------- .../lib/jobcontrol/CrunchControlledJob.java | 16 +++++++--------- .../mapreduce/lib/jobcontrol/CrunchJobControl.java | 3 +++ .../apache/crunch/impl/mr/plan/JobNameBuilder.java | 11 +++++++++-- .../apache/crunch/impl/mr/plan/JobPrototype.java | 13 +++++++++---- .../lib/jobcontrol/CrunchJobControlTest.java | 2 ++ .../crunch/impl/mr/plan/JobNameBuilderTest.java | 6 +++--- 6 files changed, 33 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/fd9778c9/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java index 5dbb43e..06d886d 100644 --- a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java +++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.crunch.impl.mr.MRJob; +import org.apache.crunch.impl.mr.plan.JobNameBuilder; import org.apache.crunch.impl.mr.run.RuntimeParameters; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; @@ -53,6 +54,8 @@ public class CrunchControlledJob implements MRJob { private final int jobID; private final Job job; // mapreduce job to be executed. + private final JobNameBuilder jobNameBuilder; + // the jobs the current job depends on private final List<CrunchControlledJob> dependingJobs; private final Hook prepareHook; @@ -79,9 +82,10 @@ public class CrunchControlledJob implements MRJob { * @param completionHook * a piece of code that will run after this job gets completed. */ - public CrunchControlledJob(int jobID, Job job, Hook prepareHook, Hook completionHook) { + public CrunchControlledJob(int jobID, Job job, JobNameBuilder jobNameBuilder, Hook prepareHook, Hook completionHook) { this.jobID = jobID; this.job = job; + this.jobNameBuilder = jobNameBuilder; this.dependingJobs = Lists.newArrayList(); this.prepareHook = prepareHook; this.completionHook = completionHook; @@ -118,14 +122,8 @@ public class CrunchControlledJob implements MRJob { return job.getJobName(); } - /** - * Set the job name for this job. - * - * @param jobName - * the job name - */ - public void setJobName(String jobName) { - job.setJobName(jobName); + public void setJobSequence(int jobSequence) { + this.job.setJobName(jobNameBuilder.jobSequence(jobSequence).build()); } /** http://git-wip-us.apache.org/repos/asf/crunch/blob/fd9778c9/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java index ce7a6d9..8a650c7 100644 --- a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java +++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java @@ -52,6 +52,7 @@ public class CrunchJobControl { private final String groupName; private final int maxRunningJobs; + private int jobSequence = 1; /** * Construct a job control for a group of jobs. @@ -198,6 +199,8 @@ public class CrunchJobControl { // stop submitting new jobs and wait until some running job completes. if (runningJobs.size() < maxRunningJobs) { // Submitting Job to Hadoop + nextJob.setJobSequence(jobSequence); + jobSequence++; nextJob.submit(); } this.addToQueue(nextJob); http://git-wip-us.apache.org/repos/asf/crunch/blob/fd9778c9/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java index 6fac1be..4a77231 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java @@ -28,7 +28,7 @@ import org.apache.hadoop.conf.Configuration; * Visitor that traverses the {@code DoNode} instances in a job and builds a * String that identifies the stages of the pipeline that belong to this job. */ -class JobNameBuilder { +public class JobNameBuilder { private static final Joiner JOINER = Joiner.on("+"); private static final Joiner CHILD_JOINER = Joiner.on("/"); @@ -36,6 +36,7 @@ class JobNameBuilder { private final String pipelineName; private final int jobID; + private int jobSequence; private final int numOfJobs; List<String> rootStack = Lists.newArrayList(); private final int maxStackNameLength; @@ -48,6 +49,11 @@ class JobNameBuilder { PlanningParameters.JOB_NAME_MAX_STACK_LENGTH, DEFAULT_JOB_NAME_MAX_STACK_LENGTH); } + public JobNameBuilder jobSequence(int jobSequence) { + this.jobSequence = jobSequence; + return this; + } + public void visit(DoNode node) { visit(node, rootStack); } @@ -84,10 +90,11 @@ class JobNameBuilder { } public String build() { - return String.format("%s: %s (%d/%d)", + return String.format("%s: %s ID=%d (%d/%d)", pipelineName, shortenRootStackName(JOINER.join(rootStack), maxStackNameLength), jobID, + jobSequence, numOfJobs); } http://git-wip-us.apache.org/repos/asf/crunch/blob/fd9778c9/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java index e7a1e17..41da5a6 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java @@ -223,11 +223,12 @@ class JobPrototype { } job.setInputFormatClass(CrunchInputFormat.class); } - job.setJobName(createJobName(conf, pipeline.getName(), inputNodes, reduceNode, numOfJobs)); + JobNameBuilder jobNameBuilder = createJobNameBuilder(conf, pipeline.getName(), inputNodes, reduceNode, numOfJobs); return new CrunchControlledJob( jobID, job, + jobNameBuilder, new CrunchJobHooks.PrepareHook(job), new CrunchJobHooks.CompletionHook(job, outputPath, outputHandler.getMultiPaths(), group == null)); } @@ -242,14 +243,18 @@ class JobPrototype { DistCache.write(conf, path, rtNodes); } - private String createJobName( - Configuration conf, String pipelineName, List<DoNode> mapNodes, DoNode reduceNode, int numOfJobs) { + private JobNameBuilder createJobNameBuilder( + Configuration conf, + String pipelineName, + List<DoNode> mapNodes, + DoNode reduceNode, + int numOfJobs) { JobNameBuilder builder = new JobNameBuilder(conf, pipelineName, jobID, numOfJobs); builder.visit(mapNodes); if (reduceNode != null) { builder.visit(reduceNode); } - return builder.build(); + return builder; } private DoNode walkPath(Iterator<PCollectionImpl<?>> iter, DoNode working) { http://git-wip-us.apache.org/repos/asf/crunch/blob/fd9778c9/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java b/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java index 562e99d..2c1f1be 100644 --- a/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java @@ -18,6 +18,7 @@ package org.apache.crunch.hadoop.mapreduce.lib.jobcontrol; import org.apache.crunch.impl.mr.MRJob; +import org.apache.crunch.impl.mr.plan.JobNameBuilder; import org.apache.crunch.impl.mr.run.RuntimeParameters; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; @@ -65,6 +66,7 @@ public class CrunchJobControlTest { CrunchControlledJob job = new CrunchControlledJob( jobID, mrJob, + new JobNameBuilder(mrJob.getConfiguration(), "test", 1, 1), mock(CrunchControlledJob.Hook.class), mock(CrunchControlledJob.Hook.class)); return spy(job); http://git-wip-us.apache.org/repos/asf/crunch/blob/fd9778c9/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java index eef318e..ee7b398 100644 --- a/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java @@ -37,9 +37,9 @@ public class JobNameBuilderTest { DoNode doNode = createDoNode(nodeName); JobNameBuilder jobNameBuilder = new JobNameBuilder(CONF, pipelineName, 1, 1); jobNameBuilder.visit(Lists.newArrayList(doNode)); - String jobName = jobNameBuilder.build(); + String jobName = jobNameBuilder.jobSequence(1).build(); - assertEquals(String.format("%s: %s (1/1)", pipelineName, nodeName), jobName); + assertEquals(String.format("%s: %s ID=1 (1/1)", pipelineName, nodeName), jobName); } @Test @@ -49,7 +49,7 @@ public class JobNameBuilderTest { DoNode doNode = createDoNode(nodeName); JobNameBuilder jobNameBuilder = new JobNameBuilder(CONF, pipelineName, 1, 1); jobNameBuilder.visit(Lists.newArrayList(doNode)); - String jobName = jobNameBuilder.build(); + String jobName = jobNameBuilder.jobSequence(1).build(); assertFalse(jobName.contains(nodeName)); // Tests that the very long node name was shorten }
