Updated Branches: refs/heads/master 1381165fb -> 12dea675b
CRUNCH-294: Cost-based planning with materialize as breakpoint. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/12dea675 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/12dea675 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/12dea675 Branch: refs/heads/master Commit: 12dea675bf50ee767a86870dfcda744818ecb332 Parents: 1381165 Author: Josh Wills <[email protected]> Authored: Wed Nov 20 13:19:02 2013 -0800 Committer: Josh Wills <[email protected]> Committed: Wed Nov 20 16:22:58 2013 -0800 ---------------------------------------------------------------------- .../it/java/org/apache/crunch/BreakpointIT.java | 129 +++++++++++++++++++ .../src/main/java/org/apache/crunch/DoFn.java | 2 +- .../org/apache/crunch/impl/mr/MRPipeline.java | 2 +- .../crunch/impl/mr/collect/PCollectionImpl.java | 22 +++- .../crunch/impl/mr/collect/UnionCollection.java | 9 ++ .../crunch/impl/mr/collect/UnionTable.java | 8 ++ .../org/apache/crunch/impl/mr/plan/Edge.java | 119 ++++++++++++----- .../apache/crunch/impl/mr/plan/MSCRPlanner.java | 20 ++- 8 files changed, 258 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/12dea675/crunch-core/src/it/java/org/apache/crunch/BreakpointIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/BreakpointIT.java b/crunch-core/src/it/java/org/apache/crunch/BreakpointIT.java new file mode 100644 index 0000000..790f049 --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/BreakpointIT.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.From; +import org.apache.crunch.io.To; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.writable.Writables; +import org.junit.Rule; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class BreakpointIT { + + @Rule + public TemporaryPath tmpDir = TemporaryPaths.create(); + + @Test + public void testNoBreakpoint() throws Exception { + run(new MRPipeline(BreakpointIT.class, tmpDir.getDefaultConfiguration()), + tmpDir.copyResourceFileName("shakes.txt"), + tmpDir.getFileName("out1"), + tmpDir.getFileName("out2"), + false); + } + + @Test + public void testBreakpoint() throws Exception { + run(new MRPipeline(BreakpointIT.class, tmpDir.getDefaultConfiguration()), + tmpDir.copyResourceFileName("shakes.txt"), + tmpDir.getFileName("out1"), + tmpDir.getFileName("out2"), + true); + } + + public static void run(Pipeline pipeline, String input, String out1, String out2, boolean breakpoint) + throws Exception { + + // Read a line from a file to get a PCollection. + PCollection<String> pCol1 = pipeline.read(From.textFile(input)); + + // Create a PTable from PCollection + PTable<String, Integer> pTable1 = pCol1.parallelDo(new DoFn<String, Pair<String, Integer>>() { + @Override + public void process(final String s, final Emitter<Pair<String, Integer>> emitter) { + for (int i = 0; i < 10; i++) { + emitter.emit(new Pair<String, Integer>(s, i)); + } + } + }, Writables.tableOf(Writables.strings(), Writables.ints())); + + // Do a groupByKey + PGroupedTable<String, Integer> pGrpTable1 = pTable1.groupByKey(); + + // Select from PGroupedTable + PTable<String, Integer> selectFromPTable1 = pGrpTable1.parallelDo( + new DoFn<Pair<String, Iterable<Integer>>, Pair<String, Integer>>() { + @Override + public void process(final Pair<String, Iterable<Integer>> input, + final Emitter<Pair<String, Integer>> emitter) { + emitter.emit(new Pair<String, Integer>(input.first(), input.second().iterator().next())); + } + }, Writables.tableOf(Writables.strings(), Writables.ints())); + + // Process selectFromPTable1 once + final PTable<String, String> pTable2 = selectFromPTable1.parallelDo(new DoFn<Pair<String, Integer>, Pair<String, String>>() { + @Override + public void process(final Pair<String, Integer> input, final Emitter<Pair<String, String>> emitter) { + final Integer newInt = input.second() + 5; + increment("job", "table2"); + emitter.emit(new Pair<String, String>(newInt.toString(), input.first())); + } + }, Writables.tableOf(Writables.strings(), Writables.strings())); + + // Process selectFromPTable1 once more + PTable<String, String> pTable3 = selectFromPTable1.parallelDo(new DoFn<Pair<String, Integer>, Pair<String, String>>() { + @Override + public void process(final Pair<String, Integer> input, final Emitter<Pair<String, String>> emitter) { + final Integer newInt = input.second() + 10; + increment("job", "table3"); + emitter.emit(new Pair<String, String>(newInt.toString(), input.first())); + } + }, Writables.tableOf(Writables.strings(), Writables.strings())); + + // Union pTable2 and pTable3 and set a breakpoint + PTable<String, String> pTable4 = pTable2.union(pTable3); + if (breakpoint) { + pTable4.materialize(); + } + + // Write keys + pTable4.keys().write(To.textFile(out1)); + + // Group values + final PGroupedTable<String, String> pGrpTable3 = pTable4.groupByKey(); + + // Write values + pGrpTable3.ungroup().write(To.textFile(out2)); + + PipelineExecution pe = pipeline.runAsync(); + // Count the number of map processing steps in this pipeline + int mapsCount = 0; + for (String line : pe.getPlanDotFile().split("\n")) { + if (line.contains(" subgraph ") && line.contains("-map\" {")) { + mapsCount++; + } + } + assertEquals(breakpoint ? 1 : 2, mapsCount); + pe.waitUntilDone(); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/12dea675/crunch-core/src/main/java/org/apache/crunch/DoFn.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/DoFn.java b/crunch-core/src/main/java/org/apache/crunch/DoFn.java index 6ae89a4..a052d09 100644 --- a/crunch-core/src/main/java/org/apache/crunch/DoFn.java +++ b/crunch-core/src/main/java/org/apache/crunch/DoFn.java @@ -116,7 +116,7 @@ public abstract class DoFn<S, T> implements Serializable { * resulting {@code PCollection} should override this method. */ public float scaleFactor() { - return 1.2f; + return 0.99f; } /** http://git-wip-us.apache.org/repos/asf/crunch/blob/12dea675/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java index 4fb2876..ff95b91 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java @@ -244,7 +244,7 @@ public class MRPipeline implements Pipeline { @Override public <T> Iterable<T> materialize(PCollection<T> pcollection) { - + ((PCollectionImpl) pcollection).setBreakpoint(); PCollectionImpl<T> pcollectionImpl = toPcollectionImpl(pcollection); ReadableSource<T> readableSrc = getMaterializeSourceTarget(pcollectionImpl); http://git-wip-us.apache.org/repos/asf/crunch/blob/12dea675/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java index b82c883..191b11e 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java @@ -59,7 +59,9 @@ public abstract class PCollectionImpl<S> implements PCollection<S> { private boolean materialized; protected SourceTarget<S> materializedAt; protected final ParallelDoOptions doOptions; - + private long size = -1L; + private boolean breakpoint; + public PCollectionImpl(String name) { this(name, ParallelDoOptions.builder().build()); } @@ -158,6 +160,14 @@ public abstract class PCollectionImpl<S> implements PCollection<S> { return getPipeline().materialize(this); } + public void setBreakpoint() { + this.breakpoint = true; + } + + public boolean isBreakpoint() { + return breakpoint; + } + /** {@inheritDoc} */ @Override public PObject<Collection<S>> asCollection() { @@ -170,6 +180,7 @@ public abstract class PCollectionImpl<S> implements PCollection<S> { public void materializeAt(SourceTarget<S> sourceTarget) { this.materializedAt = sourceTarget; + this.size = materializedAt.getSize(getPipeline().getConfiguration()); } @Override @@ -299,13 +310,10 @@ public abstract class PCollectionImpl<S> implements PCollection<S> { @Override public long getSize() { - if (materializedAt != null) { - long sz = materializedAt.getSize(getPipeline().getConfiguration()); - if (sz > 0) { - return sz; - } + if (size < 0) { + this.size = getSizeInternal(); } - return getSizeInternal(); + return size; } protected abstract long getSizeInternal(); http://git-wip-us.apache.org/repos/asf/crunch/blob/12dea675/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java index 4a69d96..e6c95bb 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java @@ -20,6 +20,7 @@ package org.apache.crunch.impl.mr.collect; import java.util.List; import com.google.common.collect.Lists; +import org.apache.crunch.PCollection; import org.apache.crunch.ReadableData; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.impl.mr.plan.DoNode; @@ -60,6 +61,14 @@ public class UnionCollection<S> extends PCollectionImpl<S> { } @Override + public void setBreakpoint() { + super.setBreakpoint(); + for (PCollectionImpl<S> parent : parents) { + parent.setBreakpoint(); + } + } + + @Override protected long getSizeInternal() { return size; } http://git-wip-us.apache.org/repos/asf/crunch/blob/12dea675/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java index b6a26d5..b4144e4 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java @@ -90,6 +90,14 @@ public class UnionTable<K, V> extends PTableBase<K, V> { } @Override + public void setBreakpoint() { + super.setBreakpoint(); + for (PCollectionImpl<Pair<K, V>> parent : parents) { + parent.setBreakpoint(); + } + } + + @Override protected void acceptInternal(PCollectionImpl.Visitor visitor) { visitor.visitUnionCollection(new UnionCollection<Pair<K, V>>(parents)); } http://git-wip-us.apache.org/repos/asf/crunch/blob/12dea675/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java index 6eb50eb..8f99a0b 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java @@ -18,17 +18,19 @@ package org.apache.crunch.impl.mr.plan; import java.util.Collection; -import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; +import com.google.common.collect.Maps; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.commons.lang.builder.ReflectionToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; +import org.apache.crunch.SourceTarget; +import org.apache.crunch.Target; import org.apache.crunch.impl.mr.collect.PCollectionImpl; import org.apache.crunch.impl.mr.collect.PGroupedTableImpl; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -62,43 +64,94 @@ class Edge { public Set<NodePath> getNodePaths() { return paths; } - - public PCollectionImpl getSplit() { - List<Iterator<PCollectionImpl<?>>> iters = Lists.newArrayList(); - for (NodePath nodePath : paths) { - Iterator<PCollectionImpl<?>> iter = nodePath.iterator(); - iter.next(); // prime this past the initial NGroupedTableImpl - iters.add(iter); + + private static boolean readWriteOutput(PCollectionImpl<?> pc, Map<PCollectionImpl<?>, Set<Target>> outputs) { + if (outputs.containsKey(pc)) { + for (Target t : outputs.get(pc)) { + if (t instanceof SourceTarget || t.asSourceTarget(pc.getPType()) != null) { + return true; + } + } } + return false; + } - // Find the lowest point w/the lowest cost to be the split point for - // all of the dependent paths. - boolean end = false; - int splitIndex = -1; - while (!end) { - splitIndex++; - PCollectionImpl<?> current = null; - for (Iterator<PCollectionImpl<?>> iter : iters) { - if (iter.hasNext()) { - PCollectionImpl<?> next = iter.next(); - if (next instanceof PGroupedTableImpl) { - end = true; - break; - } else if (current == null) { - current = next; - } else if (current != next) { - end = true; - break; + public Map<NodePath, PCollectionImpl> getSplitPoints(Map<PCollectionImpl<?>, Set<Target>> outputs) { + List<NodePath> np = Lists.newArrayList(paths); + List<PCollectionImpl<?>> smallestOverallPerPath = Lists.newArrayListWithExpectedSize(np.size()); + Map<PCollectionImpl<?>, Set<Integer>> pathCounts = Maps.newHashMap(); + Map<NodePath, PCollectionImpl> splitPoints = Maps.newHashMap(); + for (int i = 0; i < np.size(); i++) { + long bestSize = Long.MAX_VALUE; + boolean breakpoint = false; + PCollectionImpl<?> best = null; + for (PCollectionImpl<?> pc : np.get(i)) { + if (!(pc instanceof PGroupedTableImpl)) { + if (pc.isBreakpoint()) { + if (!breakpoint || pc.getSize() < bestSize) { + best = pc; + bestSize = pc.getSize(); + breakpoint = true; + } + } else if (!breakpoint && pc.getSize() < bestSize) { + best = pc; + bestSize = pc.getSize(); } - } else { - end = true; - break; + Set<Integer> cnts = pathCounts.get(pc); + if (cnts == null) { + cnts = Sets.newHashSet(); + pathCounts.put(pc, cnts); + } + cnts.add(i); + } + } + smallestOverallPerPath.add(best); + if (breakpoint) { + splitPoints.put(np.get(i), best); + } + } + + Set<Integer> missing = Sets.newHashSet(); + for (int i = 0; i < np.size(); i++) { + if (!splitPoints.containsKey(np.get(i))) { + missing.add(i); + } + } + if (missing.isEmpty()) { + return splitPoints; + } else { + // Need to either choose the smallest collection from each missing path, + // or the smallest single collection that is on all paths as the split target. + Set<PCollectionImpl<?>> smallest = Sets.newHashSet(); + long smallestSize = 0; + for (Integer id : missing) { + PCollectionImpl<?> s = smallestOverallPerPath.get(id); + if (!smallest.contains(s)) { + smallest.add(s); + smallestSize += s.getSize(); + } + } + + PCollectionImpl<?> singleBest = null; + long singleSmallestSize = Long.MAX_VALUE; + for (Map.Entry<PCollectionImpl<?>, Set<Integer>> e : pathCounts.entrySet()) { + if (Sets.difference(missing, e.getValue()).isEmpty() && e.getKey().getSize() < singleSmallestSize) { + singleBest = e.getKey(); + singleSmallestSize = singleBest.getSize(); + } + } + + if (smallestSize < singleSmallestSize) { + for (Integer id : missing) { + splitPoints.put(np.get(id), smallestOverallPerPath.get(id)); + } + } else { + for (Integer id : missing) { + splitPoints.put(np.get(id), singleBest); } } } - // TODO: Add costing calcs here. - - return Iterables.getFirst(paths, null).get(splitIndex); + return splitPoints; } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/12dea675/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java index 1e0793c..ac61fec 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java @@ -224,21 +224,19 @@ public class MSCRPlanner { } else { // Execute an Edge split Vertex newGraphTail = graph.getVertexAt(e.getTail().getPCollection()); - PCollectionImpl split = e.getSplit(); - InputCollection<?> inputNode = handleSplitTarget(split); - Vertex splitTail = graph.addVertex(split, true); - Vertex splitHead = graph.addVertex(inputNode, false); - - // Divide up the node paths in the edge between the two GBK nodes so - // that each node is either owned by GBK1 -> newTail or newHead -> GBK2. - for (NodePath path : e.getNodePaths()) { + Map<NodePath, PCollectionImpl> splitPoints = e.getSplitPoints(outputs); + for (Map.Entry<NodePath, PCollectionImpl> s : splitPoints.entrySet()) { + NodePath path = s.getKey(); + PCollectionImpl split = s.getValue(); + InputCollection<?> inputNode = handleSplitTarget(split); + Vertex splitTail = graph.addVertex(split, true); + Vertex splitHead = graph.addVertex(inputNode, false); NodePath headPath = path.splitAt(split, splitHead.getPCollection()); graph.getEdge(vertex, splitTail).addNodePath(headPath); graph.getEdge(splitHead, newGraphTail).addNodePath(path); + // Note the dependency between the vertices in the graph. + graph.markDependency(splitHead, splitTail); } - - // Note the dependency between the vertices in the graph. - graph.markDependency(splitHead, splitTail); } } }
