Updated Branches: refs/heads/master af3df548a -> ecf5dd01c
CRUNCH-264: Add map-side outputs to the dot.plan file Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ecf5dd01 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ecf5dd01 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ecf5dd01 Branch: refs/heads/master Commit: ecf5dd01c21e3427cbd2de1f1bc8ca7996ba7c9e Parents: af3df54 Author: Josh Wills <[email protected]> Authored: Sat Sep 28 17:48:34 2013 -0700 Committer: Josh Wills <[email protected]> Committed: Sun Sep 29 11:29:11 2013 -0700 ---------------------------------------------------------------------- .../crunch/impl/mr/plan/DotfileWriter.java | 31 ++++++++++++-------- .../crunch/impl/mr/plan/JobPrototype.java | 4 +-- 2 files changed, 21 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/ecf5dd01/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java index 9541b99..2834fb9 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java @@ -133,7 +133,8 @@ public class DotfileWriter { } MRTaskType taskType = groupingEncountered ? MRTaskType.REDUCE : MRTaskType.MAP; - jobNodeDeclarations.put(Pair.of(jobPrototype, taskType), formatPCollectionNodeDeclaration(pcollectionImpl, jobPrototype)); + jobNodeDeclarations.put(Pair.of(jobPrototype, taskType), + formatPCollectionNodeDeclaration(pcollectionImpl, jobPrototype)); } } } @@ -164,6 +165,21 @@ public class DotfileWriter { } } + private void processNodePaths(JobPrototype jobPrototype, HashMultimap<Target, NodePath> nodePaths) { + if (nodePaths != null) { + for (Target target : nodePaths.keySet()) { + globalNodeDeclarations.add(formatTargetNodeDeclaration(target)); + for (NodePath nodePath : nodePaths.get(target)) { + addNodePathDeclarations(jobPrototype, nodePath); + addNodePathChain(nodePath, jobPrototype); + nodePathChains.add(formatNodeCollection( + Lists.newArrayList(formatPCollection(nodePath.descendingIterator().next(), jobPrototype), + String.format("\"%s\"", target.toString())))); + } + } + } + } + /** * Add the contents of a {@link JobPrototype} to the graph describing a * pipeline. @@ -178,18 +194,9 @@ public class DotfileWriter { addNodePathDeclarations(jobPrototype, nodePath); addNodePathChain(nodePath, jobPrototype); } + processNodePaths(jobPrototype, jobPrototype.getMapSideNodePaths()); } - - HashMultimap<Target, NodePath> targetsToNodePaths = jobPrototype.getTargetsToNodePaths(); - for (Target target : targetsToNodePaths.keySet()) { - globalNodeDeclarations.add(formatTargetNodeDeclaration(target)); - for (NodePath nodePath : targetsToNodePaths.get(target)) { - addNodePathDeclarations(jobPrototype, nodePath); - addNodePathChain(nodePath, jobPrototype); - nodePathChains.add(formatNodeCollection(Lists.newArrayList(formatPCollection(nodePath.descendingIterator() - .next(), jobPrototype), String.format("\"%s\"", target.toString())))); - } - } + processNodePaths(jobPrototype, jobPrototype.getTargetsToNodePaths()); } /** http://git-wip-us.apache.org/repos/asf/crunch/blob/ecf5dd01/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java index c9b7111..0699db5 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java @@ -101,8 +101,8 @@ class JobPrototype { return mapNodePaths; } - PGroupedTableImpl<?, ?> getGroupingTable() { - return group; + HashMultimap<Target, NodePath> getMapSideNodePaths() { + return mapSideNodePaths; } HashMultimap<Target, NodePath> getTargetsToNodePaths() {
