Updated Branches: refs/heads/master cad1b053b -> 739a4703a
CRUNCH-34: Refactor the MSCRPlanner. Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/739a4703 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/739a4703 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/739a4703 Branch: refs/heads/master Commit: 739a4703a00dca526c7d7880649fa6329c246856 Parents: cad1b05 Author: Josh Wills <[email protected]> Authored: Thu Sep 13 09:15:58 2012 -0700 Committer: Josh Wills <[email protected]> Committed: Thu Sep 13 09:15:58 2012 -0700 ---------------------------------------------------------------------- .../crunch/impl/mr/collect/UnionCollectionIT.java | 2 - .../org/apache/crunch/impl/mr/plan/DoNode.java | 12 +- .../java/org/apache/crunch/impl/mr/plan/Edge.java | 118 +++++ .../java/org/apache/crunch/impl/mr/plan/Graph.java | 125 +++++ .../apache/crunch/impl/mr/plan/GraphBuilder.java | 92 ++++ .../apache/crunch/impl/mr/plan/MSCRPlanner.java | 376 +++++--------- .../org/apache/crunch/impl/mr/plan/NodePath.java | 20 +- .../org/apache/crunch/impl/mr/plan/Vertex.java | 108 ++++ pom.xml | 3 + 9 files changed, 609 insertions(+), 247 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/739a4703/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java b/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java index 6e70ff6..f9f73b2 100644 --- a/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java +++ b/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java @@ -105,11 +105,9 @@ public class UnionCollectionIT { } private void checkMaterialized(Iterable<String> materialized) { - List<String> materializedValues = Lists.newArrayList(materialized.iterator()); Collections.sort(materializedValues); LOG.info("Materialized union: " + materializedValues); - assertEquals(EXPECTED, materializedValues); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/739a4703/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java index f63700e..236496b 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java @@ -104,8 +104,16 @@ public class DoNode { } public DoNode addChild(DoNode node) { - if (!children.contains(node)) { - this.children.add(node); + // TODO: This is sort of terrible, refactor the code to make this make more sense. + boolean exists = false; + for (DoNode child : children) { + if (node == child) { + exists = true; + break; + } + } + if (!exists) { + children.add(node); } return this; } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/739a4703/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java new file mode 100644 index 0000000..5aceb8b --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.plan; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.crunch.impl.mr.collect.PCollectionImpl; +import org.apache.crunch.impl.mr.collect.PGroupedTableImpl; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +/** + * + */ +public class Edge { + private final Vertex head; + private final Vertex tail; + private final Set<NodePath> paths; + + public Edge(Vertex head, Vertex tail) { + this.head = head; + this.tail = tail; + this.paths = Sets.newHashSet(); + } + + public Vertex getHead() { + return head; + } + + public Vertex getTail() { + return tail; + } + + public void addNodePath(NodePath path) { + this.paths.add(path); + } + + public void addAllNodePaths(Collection<NodePath> paths) { + this.paths.addAll(paths); + } + + public Set<NodePath> getNodePaths() { + return paths; + } + + public PCollectionImpl getSplit() { + List<Iterator<PCollectionImpl<?>>> iters = Lists.newArrayList(); + for (NodePath nodePath : paths) { + Iterator<PCollectionImpl<?>> iter = nodePath.iterator(); + iter.next(); // prime this past the initial NGroupedTableImpl + iters.add(iter); + } + + // Find the lowest point w/the lowest cost to be the split point for + // all of the dependent paths. + boolean end = false; + int splitIndex = -1; + while (!end) { + splitIndex++; + PCollectionImpl<?> current = null; + for (Iterator<PCollectionImpl<?>> iter : iters) { + if (iter.hasNext()) { + PCollectionImpl<?> next = iter.next(); + if (next instanceof PGroupedTableImpl) { + end = true; + break; + } else if (current == null) { + current = next; + } else if (current != next) { + end = true; + break; + } + } else { + end = true; + break; + } + } + } + // TODO: Add costing calcs here. + + return Iterables.getFirst(paths, null).get(splitIndex); + } + + @Override + public boolean equals(Object other) { + if (other == null || !(other instanceof Edge)) { + return false; + } + Edge e = (Edge) other; + return head.equals(e.head) && tail.equals(e.tail); + } + + @Override + public int hashCode() { + return new HashCodeBuilder().append(head).append(tail).toHashCode(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/739a4703/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java new file mode 100644 index 0000000..93ba2bf --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java @@ -0,0 +1,125 @@ +/** + * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ +package org.apache.crunch.impl.mr.plan; + +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.crunch.Pair; +import org.apache.crunch.impl.mr.collect.PCollectionImpl; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +/** + * + */ +public class Graph implements Iterable<Vertex> { + + private final Map<PCollectionImpl, Vertex> vertices; + private final Map<Pair<Vertex, Vertex>, Edge> edges; + private final Map<Vertex, List<Vertex>> dependencies; + + public Graph() { + this.vertices = Maps.newHashMap(); + this.edges = Maps.newHashMap(); + this.dependencies = Maps.newHashMap(); + } + + public Vertex getVertexAt(PCollectionImpl impl) { + return vertices.get(impl); + } + + public Vertex addVertex(PCollectionImpl impl) { + if (vertices.containsKey(impl)) { + return vertices.get(impl); + } + Vertex v = new Vertex(impl); + vertices.put(impl, v); + return v; + } + + public Edge getEdge(Vertex head, Vertex tail) { + Pair<Vertex, Vertex> p = Pair.of(head, tail); + if (edges.containsKey(p)) { + return edges.get(p); + } + + Edge e = new Edge(head, tail); + edges.put(p, e); + tail.addIncoming(e); + head.addOutgoing(e); + return e; + } + + @Override + public Iterator<Vertex> iterator() { + return Sets.newHashSet(vertices.values()).iterator(); + } + + public Set<Edge> getAllEdges() { + return Sets.newHashSet(edges.values()); + } + + public void markDependency(Vertex child, Vertex parent) { + List<Vertex> parents = dependencies.get(child); + if (parents == null) { + parents = Lists.newArrayList(); + dependencies.put(child, parents); + } + parents.add(parent); + } + + public List<Vertex> getParents(Vertex child) { + if (dependencies.containsKey(child)) { + return dependencies.get(child); + } + return ImmutableList.of(); + } + + public List<List<Vertex>> connectedComponents() { + List<List<Vertex>> components = Lists.newArrayList(); + Set<Vertex> unassigned = Sets.newHashSet(vertices.values()); + while (!unassigned.isEmpty()) { + Vertex base = unassigned.iterator().next(); + List<Vertex> component = Lists.newArrayList(); + component.add(base); + unassigned.remove(base); + Set<Vertex> working = Sets.newHashSet(base.getAllNeighbors()); + while (!working.isEmpty()) { + Vertex n = working.iterator().next(); + working.remove(n); + if (unassigned.contains(n)) { + component.add(n); + unassigned.remove(n); + for (Vertex n2 : n.getAllNeighbors()) { + if (unassigned.contains(n2)) { + working.add(n2); + } + } + } + } + components.add(component); + } + + return components; + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/739a4703/crunch/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java new file mode 100644 index 0000000..7705896 --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.plan; + +import org.apache.crunch.impl.mr.collect.DoCollectionImpl; +import org.apache.crunch.impl.mr.collect.DoTableImpl; +import org.apache.crunch.impl.mr.collect.InputCollection; +import org.apache.crunch.impl.mr.collect.PCollectionImpl; +import org.apache.crunch.impl.mr.collect.PGroupedTableImpl; +import org.apache.crunch.impl.mr.collect.UnionCollection; + +/** + * + */ +public class GraphBuilder implements PCollectionImpl.Visitor { + + private Graph graph = new Graph(); + private Vertex workingVertex; + private NodePath workingPath; + + public Graph getGraph() { + return graph; + } + + public void visitOutput(PCollectionImpl<?> output) { + workingVertex = graph.addVertex(output); + workingPath = new NodePath(); + output.accept(this); + } + + @Override + public void visitInputCollection(InputCollection<?> collection) { + Vertex v = graph.addVertex(collection); + graph.getEdge(v, workingVertex).addNodePath(workingPath.close(collection)); + } + + @Override + public void visitUnionCollection(UnionCollection<?> collection) { + Vertex baseVertex = workingVertex; + NodePath basePath = workingPath; + for (PCollectionImpl<?> parent : collection.getParents()) { + workingPath = new NodePath(basePath); + workingVertex = baseVertex; + processParent(parent); + } + } + + @Override + public void visitDoFnCollection(DoCollectionImpl<?> collection) { + workingPath.push(collection); + processParent(collection.getOnlyParent()); + } + + @Override + public void visitDoTable(DoTableImpl<?, ?> collection) { + workingPath.push(collection); + processParent(collection.getOnlyParent()); + } + + @Override + public void visitGroupedTable(PGroupedTableImpl<?, ?> collection) { + Vertex v = graph.addVertex(collection); + graph.getEdge(v, workingVertex).addNodePath(workingPath.close(collection)); + workingVertex = v; + workingPath = new NodePath(collection); + processParent(collection.getOnlyParent()); + } + + private void processParent(PCollectionImpl<?> parent) { + Vertex v = graph.getVertexAt(parent); + if (v == null) { + parent.accept(this); + } else { + graph.getEdge(v, workingVertex).addNodePath(workingPath.close(parent)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/739a4703/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java index 975d5a0..f959f14 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java @@ -73,168 +73,162 @@ public class MSCRPlanner { } public MRExecutor plan(Class<?> jarClass, Configuration conf) throws IOException { - // Constructs all of the node paths, which either start w/an input - // or a GBK and terminate in an output collection of any type. - NodeVisitor visitor = new NodeVisitor(); + // Walk the current plan tree and build a graph in which the vertices are + // sources, targets, and GBK operations. + GraphBuilder graphBuilder = new GraphBuilder(); for (PCollectionImpl<?> output : outputs.keySet()) { - visitor.visitOutput(output); + graphBuilder.visitOutput(output); } - - // Pull out the node paths. - Map<PCollectionImpl<?>, Set<NodePath>> nodePaths = visitor.getNodePaths(); - - // Keeps track of the dependencies from collections -> jobs and then - // between different jobs. - Map<PCollectionImpl<?>, JobPrototype> assignments = Maps.newHashMap(); - Map<PCollectionImpl<?>, Set<JobPrototype>> jobDependencies = new HashMap<PCollectionImpl<?>, Set<JobPrototype>>(); - - // Find the set of GBKs that DO NOT depend on any other GBK. - Set<PGroupedTableImpl<?, ?>> workingGroupings = null; - while (!(workingGroupings = getWorkingGroupings(nodePaths)).isEmpty()) { - - for (PGroupedTableImpl<?, ?> grouping : workingGroupings) { - Set<NodePath> mapInputPaths = nodePaths.get(grouping); - JobPrototype proto = JobPrototype.createMapReduceJob(grouping, mapInputPaths, pipeline.createTempPath()); - assignments.put(grouping, proto); - if (jobDependencies.containsKey(grouping)) { - for (JobPrototype dependency : jobDependencies.get(grouping)) { - proto.addDependency(dependency); - } - } - } - - Map<PGroupedTableImpl<?, ?>, Set<NodePath>> dependencyPaths = getDependencyPaths(workingGroupings, nodePaths); - for (Map.Entry<PGroupedTableImpl<?, ?>, Set<NodePath>> entry : dependencyPaths.entrySet()) { - PGroupedTableImpl<?, ?> grouping = entry.getKey(); - Set<NodePath> currentNodePaths = entry.getValue(); - - JobPrototype proto = assignments.get(grouping); - Set<NodePath> gbkPaths = Sets.newHashSet(); - for (NodePath nodePath : currentNodePaths) { - PCollectionImpl<?> tail = nodePath.tail(); - if (tail instanceof PGroupedTableImpl) { - gbkPaths.add(nodePath); - if (!jobDependencies.containsKey(tail)) { - jobDependencies.put(tail, Sets.<JobPrototype> newHashSet()); - } - jobDependencies.get(tail).add(proto); - } - } - - if (!gbkPaths.isEmpty()) { - handleGroupingDependencies(gbkPaths, currentNodePaths); - } - - // At this point, all of the dependencies for the working groups will be - // file outputs, and so we can add them all to the JobPrototype-- we now - // have - // a complete job. - HashMultimap<Target, NodePath> reduceOutputs = HashMultimap.create(); - for (NodePath nodePath : currentNodePaths) { - assignments.put(nodePath.tail(), proto); - for (Target target : outputs.get(nodePath.tail())) { - reduceOutputs.put(target, nodePath); - } - } - proto.addReducePaths(reduceOutputs); - - // We've processed this GBK-- remove it from the set of nodePaths we - // need to process in the next step. - nodePaths.remove(grouping); - } + Graph baseGraph = graphBuilder.getGraph(); + + // Create a new graph that splits up up dependent GBK nodes. + Graph graph = prepareFinalGraph(baseGraph); + + // Break the graph up into connected components. + List<List<Vertex>> components = graph.connectedComponents(); + + // For each component, we will create one or more job prototypes, + // depending on its profile. + // For dependency handling, we only need to care about which + // job prototype a particular GBK is assigned to. + Map<Vertex, JobPrototype> assignments = Maps.newHashMap(); + for (List<Vertex> component : components) { + assignments.putAll(constructJobPrototypes(component)); } - - // Process any map-only jobs that are remaining. - if (!nodePaths.isEmpty()) { - for (Map.Entry<PCollectionImpl<?>, Set<NodePath>> entry : nodePaths.entrySet()) { - PCollectionImpl<?> collect = entry.getKey(); - if (!assignments.containsKey(collect)) { - HashMultimap<Target, NodePath> mapOutputs = HashMultimap.create(); - for (NodePath nodePath : entry.getValue()) { - for (Target target : outputs.get(nodePath.tail())) { - mapOutputs.put(target, nodePath); - } - } - JobPrototype proto = JobPrototype.createMapOnlyJob(mapOutputs, pipeline.createTempPath()); - - if (jobDependencies.containsKey(collect)) { - for (JobPrototype dependency : jobDependencies.get(collect)) { - proto.addDependency(dependency); - } - } - assignments.put(collect, proto); - } + + // Add in the job dependency information here. + for (Map.Entry<Vertex, JobPrototype> e : assignments.entrySet()) { + JobPrototype current = e.getValue(); + List<Vertex> parents = graph.getParents(e.getKey()); + for (Vertex parent : parents) { + current.addDependency(assignments.get(parent)); } } - + + // Finally, construct the jobs from the prototypes and return. MRExecutor exec = new MRExecutor(jarClass); for (JobPrototype proto : Sets.newHashSet(assignments.values())) { exec.addJob(proto.getCrunchJob(jarClass, conf, pipeline)); } return exec; } - - private Map<PGroupedTableImpl<?, ?>, Set<NodePath>> getDependencyPaths(Set<PGroupedTableImpl<?, ?>> workingGroupings, - Map<PCollectionImpl<?>, Set<NodePath>> nodePaths) { - Map<PGroupedTableImpl<?, ?>, Set<NodePath>> dependencyPaths = Maps.newHashMap(); - for (PGroupedTableImpl<?, ?> grouping : workingGroupings) { - dependencyPaths.put(grouping, Sets.<NodePath> newHashSet()); + + private Graph prepareFinalGraph(Graph baseGraph) { + Graph graph = new Graph(); + + for (Vertex baseVertex : baseGraph) { + // Add all of the vertices in the base graph, but no edges (yet). + graph.addVertex(baseVertex.getPCollection()); } - - // Find the targets that depend on one of the elements of the current - // working group. - for (PCollectionImpl<?> target : nodePaths.keySet()) { - if (!workingGroupings.contains(target)) { - for (NodePath nodePath : nodePaths.get(target)) { - if (workingGroupings.contains(nodePath.head())) { - dependencyPaths.get(nodePath.head()).add(nodePath); + + for (Edge e : baseGraph.getAllEdges()) { + // Add back all of the edges where neither vertex is a GBK. + if (!e.getHead().isGBK() && !e.getTail().isGBK()) { + Vertex head = graph.getVertexAt(e.getHead().getPCollection()); + Vertex tail = graph.getVertexAt(e.getTail().getPCollection()); + graph.getEdge(head, tail).addAllNodePaths(e.getNodePaths()); + } + } + + for (Vertex baseVertex : baseGraph) { + if (baseVertex.isGBK()) { + Vertex vertex = graph.getVertexAt(baseVertex.getPCollection()); + for (Edge e : baseVertex.getIncomingEdges()) { + if (!e.getHead().isGBK()) { + Vertex newHead = graph.getVertexAt(e.getHead().getPCollection()); + graph.getEdge(newHead, vertex).addAllNodePaths(e.getNodePaths()); + } + } + for (Edge e : baseVertex.getOutgoingEdges()) { + if (!e.getTail().isGBK()) { + Vertex newTail = graph.getVertexAt(e.getTail().getPCollection()); + graph.getEdge(vertex, newTail).addAllNodePaths(e.getNodePaths()); + } else { + + // Execute an Edge split + Vertex newGraphTail = graph.getVertexAt(e.getTail().getPCollection()); + PCollectionImpl split = e.getSplit(); + InputCollection<?> inputNode = handleSplitTarget(split); + Vertex splitTail = graph.addVertex(split); + Vertex splitHead = graph.addVertex(inputNode); + + // Divide up the node paths in the edge between the two GBK nodes so + // that each node is either owned by GBK1 -> newTail or newHead -> GBK2. + for (NodePath path : e.getNodePaths()) { + NodePath headPath = path.splitAt(split, splitHead.getPCollection()); + graph.getEdge(vertex, splitTail).addNodePath(headPath); + graph.getEdge(splitHead, newGraphTail).addNodePath(path); + } + + // Note the dependency between the vertices in the graph. + graph.markDependency(splitHead, splitTail); } } } } - return dependencyPaths; + + return graph; } - - private int getSplitIndex(Set<NodePath> currentNodePaths) { - List<Iterator<PCollectionImpl<?>>> iters = Lists.newArrayList(); - for (NodePath nodePath : currentNodePaths) { - Iterator<PCollectionImpl<?>> iter = nodePath.iterator(); - iter.next(); // prime this past the initial NGroupedTableImpl - iters.add(iter); + + private Map<Vertex, JobPrototype> constructJobPrototypes(List<Vertex> component) { + Map<Vertex, JobPrototype> assignment = Maps.newHashMap(); + List<Vertex> gbks = Lists.newArrayList(); + for (Vertex v : component) { + if (v.isGBK()) { + gbks.add(v); + } } - // Find the lowest point w/the lowest cost to be the split point for - // all of the dependent paths. - boolean end = false; - int splitIndex = -1; - while (!end) { - splitIndex++; - PCollectionImpl<?> current = null; - for (Iterator<PCollectionImpl<?>> iter : iters) { - if (iter.hasNext()) { - PCollectionImpl<?> next = iter.next(); - if (next instanceof PGroupedTableImpl) { - end = true; - break; - } else if (current == null) { - current = next; - } else if (current != next) { - end = true; - break; + if (gbks.isEmpty()) { + HashMultimap<Target, NodePath> outputPaths = HashMultimap.create(); + for (Vertex v : component) { + if (v.isInput()) { + for (Edge e : v.getOutgoingEdges()) { + for (NodePath nodePath : e.getNodePaths()) { + PCollectionImpl target = nodePath.tail(); + for (Target t : outputs.get(target)) { + outputPaths.put(t, nodePath); + } + } } - } else { - end = true; - break; } } + if (outputPaths.isEmpty()) { + throw new IllegalStateException("No outputs?"); + } + JobPrototype prototype = JobPrototype.createMapOnlyJob( + outputPaths, pipeline.createTempPath()); + for (Vertex v : component) { + assignment.put(v, prototype); + } + } else { + for (Vertex g : gbks) { + Set<NodePath> inputs = Sets.newHashSet(); + for (Edge e : g.getIncomingEdges()) { + inputs.addAll(e.getNodePaths()); + } + JobPrototype prototype = JobPrototype.createMapReduceJob( + (PGroupedTableImpl) g.getPCollection(), inputs, pipeline.createTempPath()); + assignment.put(g, prototype); + for (Edge e : g.getIncomingEdges()) { + assignment.put(e.getHead(), prototype); + } + HashMultimap<Target, NodePath> outputPaths = HashMultimap.create(); + for (Edge e : g.getOutgoingEdges()) { + Vertex output = e.getTail(); + for (Target t : outputs.get(output.getPCollection())) { + outputPaths.putAll(t, e.getNodePaths()); + } + assignment.put(output, prototype); + } + prototype.addReducePaths(outputPaths); + } } - // TODO: Add costing calcs here. - return splitIndex; + + return assignment; } - - private void handleGroupingDependencies(Set<NodePath> gbkPaths, Set<NodePath> currentNodePaths) throws IOException { - int splitIndex = getSplitIndex(currentNodePaths); - PCollectionImpl<?> splitTarget = currentNodePaths.iterator().next().get(splitIndex); + + private InputCollection<?> handleSplitTarget(PCollectionImpl<?> splitTarget) { if (!outputs.containsKey(splitTarget)) { outputs.put(splitTarget, Sets.<Target> newHashSet()); } @@ -261,108 +255,6 @@ public class MSCRPlanner { outputs.get(splitTarget).add(srcTarget); splitTarget.materializeAt(srcTarget); - PCollectionImpl<?> inputNode = (PCollectionImpl<?>) pipeline.read(srcTarget); - Set<NodePath> nextNodePaths = Sets.newHashSet(); - for (NodePath nodePath : currentNodePaths) { - if (gbkPaths.contains(nodePath)) { - nextNodePaths.add(nodePath.splitAt(splitIndex, inputNode)); - } else { - nextNodePaths.add(nodePath); - } - } - currentNodePaths.clear(); - currentNodePaths.addAll(nextNodePaths); - } - - private Set<PGroupedTableImpl<?, ?>> getWorkingGroupings(Map<PCollectionImpl<?>, Set<NodePath>> nodePaths) { - Set<PGroupedTableImpl<?, ?>> gbks = Sets.newHashSet(); - for (PCollectionImpl<?> target : nodePaths.keySet()) { - if (target instanceof PGroupedTableImpl) { - boolean hasGBKDependency = false; - for (NodePath nodePath : nodePaths.get(target)) { - if (nodePath.head() instanceof PGroupedTableImpl) { - hasGBKDependency = true; - break; - } - } - if (!hasGBKDependency) { - gbks.add((PGroupedTableImpl<?, ?>) target); - } - } - } - return gbks; - } - - private static class NodeVisitor implements PCollectionImpl.Visitor { - - private final Map<PCollectionImpl<?>, Set<NodePath>> nodePaths; - private final Map<PCollectionImpl<?>, Source<?>> inputs; - private PCollectionImpl<?> workingNode; - private NodePath workingPath; - - public NodeVisitor() { - this.nodePaths = new HashMap<PCollectionImpl<?>, Set<NodePath>>(); - this.inputs = new HashMap<PCollectionImpl<?>, Source<?>>(); - } - - public Map<PCollectionImpl<?>, Set<NodePath>> getNodePaths() { - return nodePaths; - } - - public void visitOutput(PCollectionImpl<?> output) { - nodePaths.put(output, Sets.<NodePath> newHashSet()); - workingNode = output; - workingPath = new NodePath(); - output.accept(this); - } - - @Override - public void visitInputCollection(InputCollection<?> collection) { - workingPath.close(collection); - inputs.put(collection, collection.getSource()); - nodePaths.get(workingNode).add(workingPath); - } - - @Override - public void visitUnionCollection(UnionCollection<?> collection) { - PCollectionImpl<?> baseNode = workingNode; - NodePath basePath = workingPath; - for (PCollectionImpl<?> parent : collection.getParents()) { - workingPath = new NodePath(basePath); - workingNode = baseNode; - processParent(parent); - } - } - - @Override - public void visitDoFnCollection(DoCollectionImpl<?> collection) { - workingPath.push(collection); - processParent(collection.getOnlyParent()); - } - - @Override - public void visitDoTable(DoTableImpl<?, ?> collection) { - workingPath.push(collection); - processParent(collection.getOnlyParent()); - } - - @Override - public void visitGroupedTable(PGroupedTableImpl<?, ?> collection) { - workingPath.close(collection); - nodePaths.get(workingNode).add(workingPath); - workingNode = collection; - nodePaths.put(workingNode, Sets.<NodePath> newHashSet()); - workingPath = new NodePath(collection); - processParent(collection.getOnlyParent()); - } - - private void processParent(PCollectionImpl<?> parent) { - if (!nodePaths.containsKey(parent)) { - parent.accept(this); - } else { - workingPath.close(parent); - nodePaths.get(workingNode).add(workingPath); - } - } - } + return (InputCollection<?>) pipeline.read(srcTarget); + } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/739a4703/crunch/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java index c7a67bd..a090d93 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java @@ -44,8 +44,9 @@ class NodePath implements Iterable<PCollectionImpl<?>> { this.path.push((PCollectionImpl<?>) stage); } - public void close(PCollectionImpl<?> head) { + public NodePath close(PCollectionImpl<?> head) { this.path.push(head); + return this; } public Iterator<PCollectionImpl<?>> iterator() { @@ -103,4 +104,21 @@ class NodePath implements Iterable<PCollectionImpl<?>> { path = nextPath; return top; } + + public NodePath splitAt(PCollectionImpl split, PCollectionImpl<?> newHead) { + NodePath top = new NodePath(); + int splitIndex = 0; + for (PCollectionImpl p : path) { + top.path.add(p); + if (p == split) { + break; + } + splitIndex++; + } + LinkedList<PCollectionImpl<?>> nextPath = Lists.newLinkedList(); + nextPath.add(newHead); + nextPath.addAll(path.subList(splitIndex + 1, path.size())); + path = nextPath; + return top; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/739a4703/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java new file mode 100644 index 0000000..db49e83 --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.plan; + +import java.util.List; +import java.util.Set; + +import org.apache.crunch.Source; +import org.apache.crunch.impl.mr.collect.InputCollection; +import org.apache.crunch.impl.mr.collect.PCollectionImpl; +import org.apache.crunch.impl.mr.collect.PGroupedTableImpl; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +/** + * + */ +public class Vertex { + private final PCollectionImpl impl; + private Set<Edge> incoming; + private Set<Edge> outgoing; + + public Vertex(PCollectionImpl impl) { + this.impl = impl; + this.incoming = Sets.newHashSet(); + this.outgoing = Sets.newHashSet(); + } + + public PCollectionImpl getPCollection() { + return impl; + } + + public boolean isInput() { + return impl instanceof InputCollection; + } + + public boolean isGBK() { + return impl instanceof PGroupedTableImpl; + } + + public Source getSource() { + if (isInput()) { + return ((InputCollection) impl).getSource(); + } + return null; + } + + public void addIncoming(Edge edge) { + this.incoming.add(edge); + } + + public void addOutgoing(Edge edge) { + this.outgoing.add(edge); + } + + public List<Vertex> getAllNeighbors() { + List<Vertex> n = Lists.newArrayList(); + for (Edge e : incoming) { + n.add(e.getHead()); + } + for (Edge e : outgoing) { + n.add(e.getTail()); + } + return n; + } + + public Set<Edge> getAllEdges() { + return Sets.union(incoming, outgoing); + } + + public Set<Edge> getIncomingEdges() { + return incoming; + } + + public Set<Edge> getOutgoingEdges() { + return outgoing; + } + + @Override + public boolean equals(Object obj) { + if (obj == null || !(obj instanceof Vertex)) { + return false; + } + Vertex other = (Vertex) obj; + return impl.equals(other.impl); + } + + @Override + public int hashCode() { + return 17 + 37 * impl.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/739a4703/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 792bef0..edc1e0e 100644 --- a/pom.xml +++ b/pom.xml @@ -635,6 +635,9 @@ under the License. <version>2.12</version> <configuration> <argLine>-Xmx512m</argLine> + <includes> + <include>**/*IT.java</include> + </includes> </configuration> </plugin> <plugin>
