Updated Branches: refs/heads/master 9f53a5122 -> 9c42bab14
CRUNCH-281: A proper fix for the issue originally handled by CRUNCH-237. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/9c42bab1 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/9c42bab1 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/9c42bab1 Branch: refs/heads/master Commit: 9c42bab14faae4883ceccae23cfe9872d33213a4 Parents: 9f53a51 Author: Josh Wills <[email protected]> Authored: Mon Oct 14 22:59:42 2013 -0700 Committer: Josh Wills <[email protected]> Committed: Thu Oct 17 06:35:46 2013 -0700 ---------------------------------------------------------------------- .../apache/crunch/impl/mr/plan/MSCRPlanner.java | 32 ++++++++++++-------- 1 file changed, 20 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/9c42bab1/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java index f765313..1e0793c 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java @@ -78,7 +78,7 @@ public class MSCRPlanner { targetDeps.put(pcollect, pcollect.getTargetDependencies()); } - Multimap<Vertex, JobPrototype> assignments = HashMultimap.create(); + Multimap<Target, JobPrototype> assignments = HashMultimap.create(); while (!targetDeps.isEmpty()) { Set<Target> allTargets = Sets.newHashSet(); for (PCollectionImpl<?> pcollect : targetDeps.keySet()) { @@ -89,13 +89,11 @@ public class MSCRPlanner { // Walk the current plan tree and build a graph in which the vertices are // sources, targets, and GBK operations. Set<PCollectionImpl<?>> currentStage = Sets.newHashSet(); - Set<PCollectionImpl<?>> laterStage = Sets.newHashSet(); for (PCollectionImpl<?> output : targetDeps.keySet()) { - if (Sets.intersection(allTargets, targetDeps.get(output)).isEmpty()) { + Set<Target> deps = Sets.intersection(allTargets, targetDeps.get(output)); + if (deps.isEmpty()) { graphBuilder.visitOutput(output); currentStage.add(output); - } else { - laterStage.add(output); } } @@ -127,15 +125,25 @@ public class MSCRPlanner { } } - // Make all of the jobs in this stage dependent on existing job - // prototypes. - for (JobPrototype newPrototype : newAssignments.values()) { - for (JobPrototype oldPrototype : assignments.values()) { - newPrototype.addDependency(oldPrototype); + for (Map.Entry<Vertex, JobPrototype> e : newAssignments.entries()) { + if (e.getKey().isOutput()) { + PCollectionImpl<?> pcollect = e.getKey().getPCollection(); + JobPrototype current = e.getValue(); + + // Add in implicit dependencies via SourceTargets that are read into memory + for (Target pt : pcollect.getTargetDependencies()) { + for (JobPrototype parentJobProto : assignments.get(pt)) { + current.addDependency(parentJobProto); + } + } + + // Add this to the set of output assignments + for (Target t : outputs.get(pcollect)) { + assignments.put(t, e.getValue()); + } } } - assignments.putAll(newAssignments); - + // Remove completed outputs and mark materialized output locations // for subsequent job processing. for (PCollectionImpl<?> output : currentStage) {
