Updated Branches: refs/heads/master b54ce84e7 -> 36bde4162
CRUNCH-241: Write side outputs from the map phase of a MapReduce job Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/36bde416 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/36bde416 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/36bde416 Branch: refs/heads/master Commit: 36bde4162ba624c5f9bdb85434c66fa889713060 Parents: b54ce84 Author: Josh Wills <[email protected]> Authored: Thu Jul 18 23:09:21 2013 -0700 Committer: Josh Wills <[email protected]> Committed: Mon Jul 22 15:00:05 2013 -0700 ---------------------------------------------------------------------- .../org/apache/crunch/MultipleOutputIT.java | 36 +++++++++++++++++--- .../impl/mr/emit/MultipleOutputEmitter.java | 2 -- .../crunch/impl/mr/exec/CrunchJobHooks.java | 14 ++++---- .../org/apache/crunch/impl/mr/plan/DoNode.java | 4 +-- .../crunch/impl/mr/plan/JobPrototype.java | 32 ++++++++++++++--- .../apache/crunch/impl/mr/plan/MSCRPlanner.java | 27 ++++++++++++--- .../java/org/apache/crunch/io/PathTarget.java | 3 +- .../crunch/io/avro/trevni/TrevniKeyTarget.java | 2 +- .../apache/crunch/io/impl/FileTargetImpl.java | 5 ++- .../crunch/io/impl/SourcePathTargetImpl.java | 4 +-- 10 files changed, 96 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/36bde416/crunch-core/src/it/java/org/apache/crunch/MultipleOutputIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/MultipleOutputIT.java b/crunch-core/src/it/java/org/apache/crunch/MultipleOutputIT.java index 1a85b6a..96971f8 100644 --- a/crunch-core/src/it/java/org/apache/crunch/MultipleOutputIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/MultipleOutputIT.java @@ -18,15 +18,19 @@ package org.apache.crunch; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.File; +import java.io.FilenameFilter; import java.io.IOException; import java.nio.charset.Charset; import java.util.Arrays; import java.util.List; +import org.apache.crunch.fn.Aggregators; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.io.At; +import org.apache.crunch.io.To; import org.apache.crunch.test.StringWrapper; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; @@ -47,7 +51,6 @@ public class MultipleOutputIT { public static PCollection<String> evenCountLetters(PCollection<String> words, PTypeFamily typeFamily) { return words.parallelDo("even", new FilterFn<String>() { - @Override public boolean accept(String input) { return input.length() % 2 == 0; @@ -57,7 +60,6 @@ public class MultipleOutputIT { public static PCollection<String> oddCountLetters(PCollection<String> words, PTypeFamily typeFamily) { return words.parallelDo("odd", new FilterFn<String>() { - @Override public boolean accept(String input) { return input.length() % 2 != 0; @@ -100,7 +102,8 @@ public class MultipleOutputIT { String inputPath = tmpDir.copyResourceFileName("letters.txt"); String outputPathEven = tmpDir.getFileName("even"); String outputPathOdd = tmpDir.getFileName("odd"); - + String outputPathReduce = tmpDir.getFileName("reduce"); + PCollection<String> words = pipeline.read(At.textFile(inputPath, typeFamily.strings())); PCollection<String> evenCountWords = evenCountLetters(words, typeFamily); @@ -108,14 +111,27 @@ public class MultipleOutputIT { pipeline.writeTextFile(evenCountWords, outputPathEven); pipeline.writeTextFile(oddCountWords, outputPathOdd); + evenCountWords.by(new FirstLetterFn(), typeFamily.strings()) + .groupByKey() + .combineValues(Aggregators.<String>FIRST_N(10)) + .write(To.textFile(outputPathReduce)); + PipelineResult result = pipeline.done(); checkFileContents(outputPathEven, Arrays.asList("bb")); checkFileContents(outputPathOdd, Arrays.asList("a")); - + checkNotEmpty(outputPathReduce); + return result; } + static class FirstLetterFn extends MapFn<String, String> { + @Override + public String map(String input) { + return input.substring(0, 1); + } + } + /** * Mutates the state of an input and then emits the mutated object. */ @@ -167,6 +183,18 @@ public class MultipleOutputIT { } + private void checkNotEmpty(String filePath) throws IOException { + File dir = new File(filePath); + File[] partFiles = dir.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.startsWith("part"); + } + }); + assertTrue(partFiles.length > 0); + assertTrue(Files.readLines(partFiles[0], Charset.defaultCharset()).size() > 0); + } + private void checkFileContents(String filePath, List<String> expected) throws IOException { File outputFile = new File(filePath, "part-m-00000"); List<String> lines = Files.readLines(outputFile, Charset.defaultCharset()); http://git-wip-us.apache.org/repos/asf/crunch/blob/36bde416/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java index 2e58fed..3d806ed 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java @@ -17,8 +17,6 @@ */ package org.apache.crunch.impl.mr.emit; -import java.io.IOException; - import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.Emitter; import org.apache.crunch.io.CrunchOutputs; http://git-wip-us.apache.org/repos/asf/crunch/blob/36bde416/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java index 0780431..6a15a0d 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java @@ -66,7 +66,8 @@ public final class CrunchJobHooks { private final Map<Integer, PathTarget> multiPaths; private final boolean mapOnlyJob; - public CompletionHook(Job job, Path workingPath, Map<Integer, PathTarget> multiPaths, boolean mapOnlyJob) { + public CompletionHook(Job job, Path workingPath, Map<Integer, PathTarget> multiPaths, + boolean mapOnlyJob) { this.job = job; this.workingPath = workingPath; this.multiPaths = multiPaths; @@ -80,12 +81,11 @@ public final class CrunchJobHooks { private synchronized void handleMultiPaths() throws IOException { try { - if (job.isSuccessful() && !multiPaths.isEmpty()) { - // Need to handle moving the data from the output directory of the - // job to the output locations specified in the paths. - FileSystem srcFs = workingPath.getFileSystem(job.getConfiguration()); - for (Map.Entry<Integer, PathTarget> entry : multiPaths.entrySet()) { - entry.getValue().handleOutputs(job.getConfiguration(), workingPath, entry.getKey(), mapOnlyJob); + if (job.isSuccessful()) { + if (!multiPaths.isEmpty()) { + for (Map.Entry<Integer, PathTarget> entry : multiPaths.entrySet()) { + entry.getValue().handleOutputs(job.getConfiguration(), workingPath, entry.getKey()); + } } } } catch(Exception ie) { http://git-wip-us.apache.org/repos/asf/crunch/blob/36bde416/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java index 865369c..2d6d590 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java @@ -107,7 +107,7 @@ public class DoNode { // TODO: This is sort of terrible, refactor the code to make this make more sense. boolean exists = false; for (DoNode child : children) { - if (node == child) { + if (node == child || (node.isOutputNode() && node.equals(child))) { exists = true; break; } @@ -152,7 +152,7 @@ public class DoNode { return true; } DoNode o = (DoNode) other; - return (name.equals(o.name) && fn.equals(o.fn) && source == o.source && outputConverter == o.outputConverter); + return name.equals(o.name) && fn.equals(o.fn) && source == o.source && outputConverter == o.outputConverter; } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/36bde416/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java index f22b5a1..da13611 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java @@ -66,6 +66,7 @@ class JobPrototype { private final Map<PCollectionImpl<?>, DoNode> nodes = Maps.newHashMap(); private final Path workingPath; + private HashMultimap<Target, NodePath> mapSideNodePaths; private HashMultimap<Target, NodePath> targetsToNodePaths; private DoTableImpl<?, ?> combineFnTable; @@ -107,6 +108,13 @@ class JobPrototype { return targetsToNodePaths; } + public void addMapSideOutputs(HashMultimap<Target, NodePath> mapSideNodePaths) { + if (group == null) { + throw new IllegalStateException("Cannot side-outputs to a map-only job"); + } + this.mapSideNodePaths = mapSideNodePaths; + } + public void addReducePaths(HashMultimap<Target, NodePath> outputPaths) { if (group == null) { throw new IllegalStateException("Cannot add a reduce phase to a map-only job"); @@ -135,10 +143,9 @@ class JobPrototype { job.setJarByClass(jarClass); Set<DoNode> outputNodes = Sets.newHashSet(); - Set<Target> targets = targetsToNodePaths.keySet(); Path outputPath = new Path(workingPath, "output"); MSCROutputHandler outputHandler = new MSCROutputHandler(job, outputPath, group == null); - for (Target target : targets) { + for (Target target : targetsToNodePaths.keySet()) { DoNode node = null; for (NodePath nodePath : targetsToNodePaths.get(target)) { if (node == null) { @@ -150,6 +157,22 @@ class JobPrototype { } } + Set<DoNode> mapSideNodes = Sets.newHashSet(); + if (mapSideNodePaths != null) { + for (Target target : mapSideNodePaths.keySet()) { + DoNode node = null; + for (NodePath nodePath : mapSideNodePaths.get(target)) { + if (node == null) { + PCollectionImpl<?> collect = nodePath.tail(); + node = DoNode.createOutputNode(target.toString(), collect.getPType()); + outputHandler.configureNode(node, target); + } + mapSideNodes.add(walkPath(nodePath.descendingIterator(), node)); + } + + } + } + job.setMapperClass(CrunchMapper.class); List<DoNode> inputNodes; DoNode reduceNode = null; @@ -171,7 +194,7 @@ class JobPrototype { group.configureShuffle(job); DoNode mapOutputNode = group.getGroupingNode(); - Set<DoNode> mapNodes = Sets.newHashSet(); + Set<DoNode> mapNodes = Sets.newHashSet(mapSideNodes); for (NodePath nodePath : mapNodePaths) { // Advance these one step, since we've already configured // the grouping node, and the PGroupedTableImpl is the tail @@ -192,8 +215,7 @@ class JobPrototype { inputNode.getSource().configureSource(job, -1); } else { for (int i = 0; i < inputNodes.size(); i++) { - DoNode inputNode = inputNodes.get(i); - inputNode.getSource().configureSource(job, i); + inputNodes.get(i).getSource().configureSource(job, i); } job.setInputFormatClass(CrunchInputFormat.class); } http://git-wip-us.apache.org/repos/asf/crunch/blob/36bde416/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java index 06ede5a..b5b37d7 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; @@ -268,17 +267,36 @@ public class MSCRPlanner { Set<Edge> usedEdges = Sets.newHashSet(); for (Vertex g : gbks) { Set<NodePath> inputs = Sets.newHashSet(); + HashMultimap<Target, NodePath> mapSideOutputPaths = HashMultimap.create(); for (Edge e : g.getIncomingEdges()) { inputs.addAll(e.getNodePaths()); usedEdges.add(e); + if (e.getHead().isInput()) { + for (Edge ep : e.getHead().getOutgoingEdges()) { + if (ep.getTail().isOutput() && !usedEdges.contains(ep)) { // map-side output + for (Target t : outputs.get(ep.getTail().getPCollection())) { + mapSideOutputPaths.putAll(t, ep.getNodePaths()); + } + usedEdges.add(ep); + } + } + } } JobPrototype prototype = JobPrototype.createMapReduceJob( ++lastJobID, (PGroupedTableImpl) g.getPCollection(), inputs, pipeline.createTempPath()); + prototype.addMapSideOutputs(mapSideOutputPaths); assignment.put(g, prototype); for (Edge e : g.getIncomingEdges()) { assignment.put(e.getHead(), prototype); - usedEdges.add(e); + if (e.getHead().isInput()) { + for (Edge ep : e.getHead().getOutgoingEdges()) { + if (ep.getTail().isOutput() && !assignment.containsKey(ep.getTail())) { // map-side output + assignment.put(ep.getTail(), prototype); + } + } + } } + HashMultimap<Target, NodePath> outputPaths = HashMultimap.create(); for (Edge e : g.getOutgoingEdges()) { Vertex output = e.getTail(); @@ -290,13 +308,12 @@ public class MSCRPlanner { } prototype.addReducePaths(outputPaths); } - + // Check for any un-assigned vertices, which should be map-side outputs // that we will need to run in a map-only job. HashMultimap<Target, NodePath> outputPaths = HashMultimap.create(); Set<Vertex> orphans = Sets.newHashSet(); for (Vertex v : component) { - // Check if this vertex has multiple inputs but only a subset of // them have already been assigned boolean vertexHasUnassignedIncomingEdges = false; @@ -334,7 +351,7 @@ public class MSCRPlanner { } } } - + return assignment; } http://git-wip-us.apache.org/repos/asf/crunch/blob/36bde416/crunch-core/src/main/java/org/apache/crunch/io/PathTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/PathTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/PathTarget.java index 4f7949f..9488c16 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/PathTarget.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/PathTarget.java @@ -44,8 +44,7 @@ public interface PathTarget extends MapReduceTarget { * @param conf The job {@code Configuration} * @param workingPath The temp directory that contains the output of the job * @param index The index of this target for jobs that write multiple output files to a single directory - * @param mapOnlyJob Whether or not this is a map-only job * @throws IOException */ - void handleOutputs(Configuration conf, Path workingPath, int index, boolean mapOnlyJob) throws IOException; + void handleOutputs(Configuration conf, Path workingPath, int index) throws IOException; } http://git-wip-us.apache.org/repos/asf/crunch/blob/36bde416/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java index 2ede024..2fefa59 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java @@ -134,7 +134,7 @@ public class TrevniKeyTarget extends FileTargetImpl { @Override protected Path getDestFile(final Configuration conf, final Path src, final Path dir, final boolean mapOnlyJob) throws IOException { - Path outputFilename = super.getDestFile(conf, src, dir, mapOnlyJob); + Path outputFilename = super.getDestFile(conf, src, dir, true); //make sure the dst file is unique in the case there are multiple part-#.trv files per partition. return new Path(outputFilename.toString()+"-"+src.getName()); } http://git-wip-us.apache.org/repos/asf/crunch/blob/36bde416/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java index 4d58830..07c63df 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java @@ -86,8 +86,7 @@ public class FileTargetImpl implements PathTarget { return true; } - public void handleOutputs(Configuration conf, Path workingPath, int index, - boolean mapOnlyJob) throws IOException { + public void handleOutputs(Configuration conf, Path workingPath, int index) throws IOException { FileSystem srcFs = workingPath.getFileSystem(conf); Path src = getSourcePattern(workingPath, index); Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(src), src); @@ -97,7 +96,7 @@ public class FileTargetImpl implements PathTarget { } boolean sameFs = isCompatible(srcFs, path); for (Path s : srcs) { - Path d = getDestFile(conf, s, path, mapOnlyJob); + Path d = getDestFile(conf, s, path, s.getName().contains("-m-")); if (sameFs) { srcFs.rename(s, d); } else { http://git-wip-us.apache.org/repos/asf/crunch/blob/36bde416/crunch-core/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java index a90bb7b..fbc2201 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java @@ -52,8 +52,8 @@ public class SourcePathTargetImpl<T> extends SourceTargetImpl<T> implements Path } @Override - public void handleOutputs(Configuration conf, Path workingPath, int index, boolean mapOnlyJob) + public void handleOutputs(Configuration conf, Path workingPath, int index) throws IOException { - ((PathTarget) target).handleOutputs(conf, workingPath, index, mapOnlyJob); + ((PathTarget) target).handleOutputs(conf, workingPath, index); } }
