Repository: crunch Updated Branches: refs/heads/master 1f5323bd1 -> 7f85ee581
CRUNCH-536: Refactor CrunchControlledJob.Hook interface and make it client-accessible Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/7f85ee58 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/7f85ee58 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/7f85ee58 Branch: refs/heads/master Commit: 7f85ee5816a19eca0e87ce503ea0b03ea294433c Parents: 1f5323b Author: Josh Wills <[email protected]> Authored: Mon Jul 6 11:49:46 2015 -0700 Committer: Josh Wills <[email protected]> Committed: Mon Jul 6 11:49:46 2015 -0700 ---------------------------------------------------------------------- .../it/java/org/apache/crunch/CleanTextIT.java | 22 ++++++++++- .../lib/jobcontrol/CrunchControlledJob.java | 6 +-- .../org/apache/crunch/impl/mr/MRPipeline.java | 25 ++++++++++++ .../crunch/impl/mr/exec/CrunchJobHooks.java | 41 ++++++++++++-------- .../crunch/impl/mr/plan/JobPrototype.java | 25 ++++++++++-- 5 files changed, 94 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/7f85ee58/crunch-core/src/it/java/org/apache/crunch/CleanTextIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/CleanTextIT.java b/crunch-core/src/it/java/org/apache/crunch/CleanTextIT.java index 2f4004e..9d6f682 100644 --- a/crunch-core/src/it/java/org/apache/crunch/CleanTextIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/CleanTextIT.java @@ -20,9 +20,12 @@ package org.apache.crunch; import static org.junit.Assert.assertEquals; import java.io.File; +import java.io.IOException; import java.nio.charset.Charset; import java.util.List; +import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob; +import org.apache.crunch.impl.mr.MRJob; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.io.To; import org.apache.crunch.test.TemporaryPath; @@ -63,7 +66,11 @@ public class CleanTextIT { @Test public void testMapSideOutputs() throws Exception { - Pipeline pipeline = new MRPipeline(CleanTextIT.class, tmpDir.getDefaultConfiguration()); + MRPipeline pipeline = new MRPipeline(CleanTextIT.class, tmpDir.getDefaultConfiguration()); + JobHook prepareOne = new JobHook(); + JobHook prepareTwo = new JobHook(); + JobHook completed = new JobHook(); + pipeline.addPrepareHook(prepareOne).addPrepareHook(prepareTwo).addCompletionHook(completed); String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt"); PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath); @@ -78,5 +85,18 @@ public class CleanTextIT { File cleanFile = new File(cso, "part-m-00000"); List<String> lines = Files.readLines(cleanFile, Charset.defaultCharset()); assertEquals(LINES_IN_SHAKES, lines.size()); + assertEquals(1, prepareOne.called); + assertEquals(1, prepareTwo.called); + assertEquals(1, completed.called); + } + + static class JobHook implements CrunchControlledJob.Hook { + + int called = 0; + + @Override + public void run(MRJob job) throws IOException { + called++; + } } } http://git-wip-us.apache.org/repos/asf/crunch/blob/7f85ee58/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java index dceb217..e96b9bf 100644 --- a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java +++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java @@ -49,7 +49,7 @@ import org.slf4j.LoggerFactory; public class CrunchControlledJob implements MRJob { public static interface Hook { - public void run() throws IOException; + public void run(MRJob job) throws IOException; } private static final Logger LOG = LoggerFactory.getLogger(CrunchControlledJob.class); @@ -285,7 +285,7 @@ public class CrunchControlledJob implements MRJob { } } if (isCompleted()) { - completionHook.run(); + completionHook.run(this); this.postHookEndTimeMsec = System.currentTimeMillis(); } } @@ -335,7 +335,7 @@ public class CrunchControlledJob implements MRJob { protected synchronized void submit() { try { this.preHookStartTimeMsec = System.currentTimeMillis(); - prepareHook.run(); + prepareHook.run(this); this.jobStartTimeMsec = System.currentTimeMillis(); job.submit(); this.state = State.RUNNING; http://git-wip-us.apache.org/repos/asf/crunch/blob/7f85ee58/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java index 6cd2809..440bec7 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java @@ -23,10 +23,12 @@ import java.net.URISyntaxException; import java.net.URLEncoder; import java.text.SimpleDateFormat; import java.util.Date; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import com.google.common.base.Charsets; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.crunch.CachingOptions; @@ -34,6 +36,7 @@ import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.PCollection; import org.apache.crunch.PipelineExecution; import org.apache.crunch.PipelineResult; +import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob; import org.apache.crunch.impl.dist.DistributedPipeline; import org.apache.crunch.impl.dist.collect.PCollectionImpl; import org.apache.crunch.impl.mr.collect.MRCollectionFactory; @@ -57,6 +60,8 @@ public class MRPipeline extends DistributedPipeline { private static final Logger LOG = LoggerFactory.getLogger(MRPipeline.class); private final Class<?> jarClass; + private final List<CrunchControlledJob.Hook> prepareHooks; + private final List<CrunchControlledJob.Hook> completionHooks; /** * Instantiate with a default Configuration and name. @@ -98,6 +103,26 @@ public class MRPipeline extends DistributedPipeline { public MRPipeline(Class<?> jarClass, String name, Configuration conf) { super(name, conf, new MRCollectionFactory()); this.jarClass = jarClass; + this.prepareHooks = Lists.newArrayList(); + this.completionHooks = Lists.newArrayList(); + } + + public MRPipeline addPrepareHook(CrunchControlledJob.Hook hook) { + this.prepareHooks.add(hook); + return this; + } + + public List<CrunchControlledJob.Hook> getPrepareHooks() { + return prepareHooks; + } + + public MRPipeline addCompletionHook(CrunchControlledJob.Hook hook) { + this.completionHooks.add(hook); + return this; + } + + public List<CrunchControlledJob.Hook> getCompletionHooks() { + return completionHooks; } public MRExecutor plan() { http://git-wip-us.apache.org/repos/asf/crunch/blob/7f85ee58/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java index 6a15a0d..8af94c6 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java @@ -18,9 +18,11 @@ package org.apache.crunch.impl.mr.exec; import java.io.IOException; +import java.util.List; import java.util.Map; import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob; +import org.apache.crunch.impl.mr.MRJob; import org.apache.crunch.impl.mr.run.RuntimeParameters; import org.apache.crunch.io.PathTarget; import org.apache.hadoop.conf.Configuration; @@ -33,19 +35,29 @@ public final class CrunchJobHooks { private CrunchJobHooks() {} - /** Creates missing input directories before job is submitted. */ - public static final class PrepareHook implements CrunchControlledJob.Hook { - private final Job job; + public static final class CompositeHook implements CrunchControlledJob.Hook { + + private List<CrunchControlledJob.Hook> hooks; + + public CompositeHook(List<CrunchControlledJob.Hook> hooks) { + this.hooks = hooks; + } - public PrepareHook(Job job) { - this.job = job; + @Override + public void run(MRJob job) throws IOException { + for (CrunchControlledJob.Hook hook : hooks) { + hook.run(job); + } } + } + /** Creates missing input directories before job is submitted. */ + public static final class PrepareHook implements CrunchControlledJob.Hook { @Override - public void run() throws IOException { - Configuration conf = job.getConfiguration(); + public void run(MRJob job) throws IOException { + Configuration conf = job.getJob().getConfiguration(); if (conf.getBoolean(RuntimeParameters.CREATE_DIR, false)) { - Path[] inputPaths = FileInputFormat.getInputPaths(job); + Path[] inputPaths = FileInputFormat.getInputPaths(job.getJob()); for (Path inputPath : inputPaths) { FileSystem fs = inputPath.getFileSystem(conf); if (!fs.exists(inputPath)) { @@ -61,25 +73,20 @@ public final class CrunchJobHooks { /** Moving output files produced by the MapReduce job to specified directories. */ public static final class CompletionHook implements CrunchControlledJob.Hook { - private final Job job; private final Path workingPath; private final Map<Integer, PathTarget> multiPaths; - private final boolean mapOnlyJob; - public CompletionHook(Job job, Path workingPath, Map<Integer, PathTarget> multiPaths, - boolean mapOnlyJob) { - this.job = job; + public CompletionHook(Path workingPath, Map<Integer, PathTarget> multiPaths) { this.workingPath = workingPath; this.multiPaths = multiPaths; - this.mapOnlyJob = mapOnlyJob; } @Override - public void run() throws IOException { - handleMultiPaths(); + public void run(MRJob job) throws IOException { + handleMultiPaths(job.getJob()); } - private synchronized void handleMultiPaths() throws IOException { + private synchronized void handleMultiPaths(Job job) throws IOException { try { if (job.isSuccessful()) { if (!multiPaths.isEmpty()) { http://git-wip-us.apache.org/repos/asf/crunch/blob/7f85ee58/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java index 2863e00..d23de3b 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java @@ -27,6 +27,7 @@ import org.apache.crunch.Pipeline; import org.apache.crunch.Target; import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob; import org.apache.crunch.impl.dist.collect.PCollectionImpl; +import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.impl.mr.collect.DoTable; import org.apache.crunch.impl.dist.collect.MRCollection; import org.apache.crunch.impl.mr.collect.PGroupedTableImpl; @@ -130,7 +131,7 @@ class JobPrototype { } public CrunchControlledJob getCrunchJob( - Class<?> jarClass, Configuration conf, Pipeline pipeline, int numOfJobs) throws IOException { + Class<?> jarClass, Configuration conf, MRPipeline pipeline, int numOfJobs) throws IOException { if (job == null) { job = build(jarClass, conf, pipeline, numOfJobs); for (JobPrototype proto : dependencies) { @@ -141,7 +142,7 @@ class JobPrototype { } private CrunchControlledJob build( - Class<?> jarClass, Configuration conf, Pipeline pipeline, int numOfJobs) throws IOException { + Class<?> jarClass, Configuration conf, MRPipeline pipeline, int numOfJobs) throws IOException { Job job = new Job(conf); conf = job.getConfiguration(); conf.set(PlanningParameters.CRUNCH_WORKING_DIRECTORY, workingPath.toString()); @@ -229,13 +230,29 @@ class JobPrototype { } JobNameBuilder jobNameBuilder = createJobNameBuilder(conf, pipeline.getName(), inputNodes, reduceNode, numOfJobs); + CrunchControlledJob.Hook prepareHook = getHook(new CrunchJobHooks.PrepareHook(), pipeline.getPrepareHooks()); + CrunchControlledJob.Hook completionHook = getHook( + new CrunchJobHooks.CompletionHook(outputPath, outputHandler.getMultiPaths()), + pipeline.getCompletionHooks()); return new CrunchControlledJob( jobID, job, jobNameBuilder, allTargets, - new CrunchJobHooks.PrepareHook(job), - new CrunchJobHooks.CompletionHook(job, outputPath, outputHandler.getMultiPaths(), group == null)); + prepareHook, + completionHook); + } + + private static CrunchControlledJob.Hook getHook( + CrunchControlledJob.Hook base, + List<CrunchControlledJob.Hook> optional) { + if (optional.isEmpty()) { + return base; + } + List<CrunchControlledJob.Hook> hooks = Lists.newArrayList(); + hooks.add(base); + hooks.addAll(optional); + return new CrunchJobHooks.CompositeHook(hooks); } private void serialize(List<DoNode> nodes, Configuration conf, Path workingPath, NodeContext context)
