Repository: crunch Updated Branches: refs/heads/apache-crunch-0.8 5a51b1396 -> 7695acc0b
CRUNCH-420: Change breakpointing logic around PCollection's that are materialized before GBK operations. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/7695acc0 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/7695acc0 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/7695acc0 Branch: refs/heads/apache-crunch-0.8 Commit: 7695acc0b20e3fe61e576098e67a546c19d5b6ae Parents: 5a51b13 Author: Josh Wills <[email protected]> Authored: Sun Jun 15 23:22:20 2014 -0700 Committer: Josh Wills <[email protected]> Committed: Thu Jun 19 18:45:59 2014 -0700 ---------------------------------------------------------------------- .../java/org/apache/crunch/Breakpoint2IT.java | 106 +++++++++++++++++++ .../it/java/org/apache/crunch/BreakpointIT.java | 8 +- .../org/apache/crunch/impl/mr/plan/Edge.java | 11 +- .../apache/crunch/impl/mr/plan/MSCRPlanner.java | 33 ++++-- 4 files changed, 141 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/7695acc0/crunch-core/src/it/java/org/apache/crunch/Breakpoint2IT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/Breakpoint2IT.java b/crunch-core/src/it/java/org/apache/crunch/Breakpoint2IT.java new file mode 100644 index 0000000..4b76c8b --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/Breakpoint2IT.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import static org.junit.Assert.assertEquals; + +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.impl.mr.MRPipelineExecution; +import org.apache.crunch.impl.mr.exec.MRExecutor; +import org.apache.crunch.io.From; +import org.apache.crunch.io.To; +import org.apache.crunch.lib.Join; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.writable.Writables; +import org.junit.Rule; +import org.junit.Test; + +public class Breakpoint2IT { + private static final class PTableTransform extends DoFn<String, Pair<String, Integer>> { + @Override + public void process(final String s, final Emitter<Pair<String, Integer>> emitter) { + for (int i = 0; i < 10; i++) { + emitter.emit(new Pair<String, Integer>(s, i)); + } + } + } + + @Rule + public TemporaryPath tmpDir = TemporaryPaths.create(); + + @Test + public void testNoBreakpoint() throws Exception { + run(new MRPipeline(Breakpoint2IT.class, tmpDir.getDefaultConfiguration()), + tmpDir.copyResourceFileName("letters.txt"), + tmpDir.copyResourceFileName("urls.txt"), + tmpDir.copyResourceFileName("docs.txt"), + tmpDir.getFileName("out1"), + tmpDir.getFileName("out2"), + false); + } + + @Test + public void testBreakpoint() throws Exception { + run(new MRPipeline(Breakpoint2IT.class, tmpDir.getDefaultConfiguration()), + tmpDir.copyResourceFileName("letters.txt"), + tmpDir.copyResourceFileName("urls.txt"), + tmpDir.copyResourceFileName("docs.txt"), + tmpDir.getFileName("out1"), + tmpDir.getFileName("out2"), + true); + } + + public static void run(MRPipeline pipeline, String input1, String input2, String input3, + String out1, String out2, boolean breakpoint) throws Exception { + // Read a line from a file to get a PCollection. + PCollection<String> pCol1 = pipeline.read(From.textFile(input1)); + PCollection<String> pCol2 = pipeline.read(From.textFile(input2)); + PCollection<String> pCol3 = pipeline.read(From.textFile(input3)); + + // Create PTables from the PCollections + PTable<String, Integer> pTable1 = pCol1.parallelDo("Transform pCol1 to PTable", new PTableTransform(), + Writables.tableOf(Writables.strings(), Writables.ints())); + if (breakpoint) { + pTable1.materialize(); + } + + PTable<String, Integer> pTable2 = pCol2.parallelDo("Transform pCol2 to PTable", new PTableTransform(), + Writables.tableOf(Writables.strings(), Writables.ints())); + PTable<String, Integer> pTable3 = pCol3.parallelDo("Transform pCol3 to PTable", new PTableTransform(), + Writables.tableOf(Writables.strings(), Writables.ints())); + + // Perform joins to pTable1 + PTable<String, Pair<Integer, Integer>> join1 = Join.leftJoin(pTable1, pTable2); + PTable<String, Pair<Integer, Integer>> join2 = Join.rightJoin(pTable1, pTable3); + + // Write joins + join1.keys().write(To.textFile(out1)); + join2.keys().write(To.textFile(out2)); + + MRPipelineExecution exec = pipeline.runAsync(); + int fnCount = 0; + for (String line : exec.getPlanDotFile().split("\n")) { + if (line.contains("label=\"Transform pCol1 to PTable\"")) { + fnCount++; + } + } + assertEquals(breakpoint ? 1 : 2, fnCount); + exec.waitUntilDone(); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/7695acc0/crunch-core/src/it/java/org/apache/crunch/BreakpointIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/BreakpointIT.java b/crunch-core/src/it/java/org/apache/crunch/BreakpointIT.java index 790f049..8a21fa6 100644 --- a/crunch-core/src/it/java/org/apache/crunch/BreakpointIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/BreakpointIT.java @@ -18,6 +18,7 @@ package org.apache.crunch; import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.impl.mr.exec.MRExecutor; import org.apache.crunch.io.From; import org.apache.crunch.io.To; import org.apache.crunch.test.TemporaryPath; @@ -51,7 +52,7 @@ public class BreakpointIT { true); } - public static void run(Pipeline pipeline, String input, String out1, String out2, boolean breakpoint) + public static void run(MRPipeline pipeline, String input, String out1, String out2, boolean breakpoint) throws Exception { // Read a line from a file to get a PCollection. @@ -115,15 +116,14 @@ public class BreakpointIT { // Write values pGrpTable3.ungroup().write(To.textFile(out2)); - PipelineExecution pe = pipeline.runAsync(); + MRExecutor exec = pipeline.plan(); // Count the number of map processing steps in this pipeline int mapsCount = 0; - for (String line : pe.getPlanDotFile().split("\n")) { + for (String line : exec.getPlanDotFile().split("\n")) { if (line.contains(" subgraph ") && line.contains("-map\" {")) { mapsCount++; } } assertEquals(breakpoint ? 1 : 2, mapsCount); - pe.waitUntilDone(); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/7695acc0/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java index 67c624d..4006930 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.commons.lang.builder.ReflectionToStringBuilder; @@ -64,7 +65,7 @@ class Edge { return paths; } - public Map<NodePath, PCollectionImpl> getSplitPoints(Map<PCollectionImpl<?>, Set<Target>> outputs) { + public Map<NodePath, PCollectionImpl> getSplitPoints(boolean breakpointsOnly) { List<NodePath> np = Lists.newArrayList(paths); List<PCollectionImpl<?>> smallestOverallPerPath = Lists.newArrayListWithExpectedSize(np.size()); Map<PCollectionImpl<?>, Set<Integer>> pathCounts = Maps.newHashMap(); @@ -74,7 +75,7 @@ class Edge { boolean breakpoint = false; PCollectionImpl<?> best = null; for (PCollectionImpl<?> pc : np.get(i)) { - if (!(pc instanceof BaseGroupedTable)) { + if (!(pc instanceof BaseGroupedTable) && (!breakpointsOnly || pc.isBreakpoint())) { if (pc.isBreakpoint()) { if (!breakpoint || pc.getSize() < bestSize) { best = pc; @@ -105,7 +106,11 @@ class Edge { missing.add(i); } } - if (missing.isEmpty()) { + + if (breakpointsOnly && missing.size() > 0) { + // We can't create new splits in this mode + return ImmutableMap.of(); + } else if (missing.isEmpty()) { return splitPoints; } else { // Need to either choose the smallest collection from each missing path, http://git-wip-us.apache.org/repos/asf/crunch/blob/7695acc0/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java index 72c431b..c9a6136 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java @@ -124,14 +124,13 @@ public class MSCRPlanner { // job prototype a particular GBK is assigned to. Multimap<Vertex, JobPrototype> newAssignments = HashMultimap.create(); for (List<Vertex> component : components) { - newAssignments.putAll(constructJobPrototypes(component, components.size())); + newAssignments.putAll(constructJobPrototypes(component)); } // Add in the job dependency information here. for (Map.Entry<Vertex, JobPrototype> e : newAssignments.entries()) { JobPrototype current = e.getValue(); - List<Vertex> parents = graph.getParents(e.getKey()); - for (Vertex parent : parents) { + for (Vertex parent : graph.getParents(e.getKey())) { for (JobPrototype parentJobProto : newAssignments.get(parent)) { current.addDependency(parentJobProto); } @@ -206,10 +205,8 @@ public class MSCRPlanner { } for (Edge e : baseGraph.getAllEdges()) { - // Add back all of the edges where neither vertex is a GBK and we do not - // have an output feeding into a GBK. - if (!(e.getHead().isGBK() && e.getTail().isGBK()) && - !(e.getHead().isOutput() && e.getTail().isGBK())) { + // Add back all of the edges where neither vertex is a GBK. + if (!e.getHead().isGBK() && !e.getTail().isGBK()) { Vertex head = graph.getVertexAt(e.getHead().getPCollection()); Vertex tail = graph.getVertexAt(e.getTail().getPCollection()); graph.getEdge(head, tail).addAllNodePaths(e.getNodePaths()); @@ -239,7 +236,23 @@ public class MSCRPlanner { graph.markDependency(splitHead, splitTail); } else if (!e.getHead().isGBK()) { Vertex newHead = graph.getVertexAt(e.getHead().getPCollection()); - graph.getEdge(newHead, vertex).addAllNodePaths(e.getNodePaths()); + Map<NodePath, PCollectionImpl> splitPoints = e.getSplitPoints(true /* breakpoints only */); + if (splitPoints.isEmpty()) { + graph.getEdge(newHead, vertex).addAllNodePaths(e.getNodePaths()); + } else { + for (Map.Entry<NodePath, PCollectionImpl> s : splitPoints.entrySet()) { + NodePath path = s.getKey(); + PCollectionImpl split = s.getValue(); + InputCollection<?> inputNode = handleSplitTarget(split); + Vertex splitTail = graph.addVertex(split, true); + Vertex splitHead = graph.addVertex(inputNode, false); + NodePath headPath = path.splitAt(split, splitHead.getPCollection()); + graph.getEdge(newHead, splitTail).addNodePath(headPath); + graph.getEdge(splitHead, vertex).addNodePath(path); + // Note the dependency between the vertices in the graph. + graph.markDependency(splitHead, splitTail); + } + } } } for (Edge e : baseVertex.getOutgoingEdges()) { @@ -249,7 +262,7 @@ public class MSCRPlanner { } else { // Execute an Edge split Vertex newGraphTail = graph.getVertexAt(e.getTail().getPCollection()); - Map<NodePath, PCollectionImpl> splitPoints = e.getSplitPoints(outputs); + Map<NodePath, PCollectionImpl> splitPoints = e.getSplitPoints(false /* breakpoints only */); for (Map.Entry<NodePath, PCollectionImpl> s : splitPoints.entrySet()) { NodePath path = s.getKey(); PCollectionImpl split = s.getValue(); @@ -270,7 +283,7 @@ public class MSCRPlanner { return graph; } - private Multimap<Vertex, JobPrototype> constructJobPrototypes(List<Vertex> component, int numOfJobs) { + private Multimap<Vertex, JobPrototype> constructJobPrototypes(List<Vertex> component) { Multimap<Vertex, JobPrototype> assignment = HashMultimap.create(); List<Vertex> gbks = Lists.newArrayList(); for (Vertex v : component) {
