Updated Branches: refs/heads/master 2bf556177 -> 2bc04f98c
CRUNCH-128: Add an explicit dependency between the output of one MapReduce job and the start of another one for cases like mapside joins and total orderings, where we need a file to exist on the filesystem before another process takes advantage of it. Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/2bc04f98 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/2bc04f98 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/2bc04f98 Branch: refs/heads/master Commit: 2bc04f98c1b874f4039ff405b5fd50098bd67447 Parents: 2bf5561 Author: Josh Wills <[email protected]> Authored: Sun Dec 9 19:25:53 2012 -0800 Committer: Josh Wills <[email protected]> Committed: Mon Jan 7 19:06:35 2013 -0800 ---------------------------------------------------------------------- .../org/apache/crunch/lib/join/MapsideJoinIT.java | 56 +++++-- .../main/java/org/apache/crunch/PCollection.java | 34 ++++ .../java/org/apache/crunch/ParallelDoOptions.java | 62 ++++++++ .../org/apache/crunch/impl/mem/MemPipeline.java | 1 + .../crunch/impl/mem/collect/MemCollection.java | 13 ++ .../crunch/impl/mr/collect/DoCollectionImpl.java | 11 ++- .../apache/crunch/impl/mr/collect/DoTableImpl.java | 8 +- .../crunch/impl/mr/collect/PCollectionImpl.java | 35 ++++- .../apache/crunch/impl/mr/collect/PTableBase.java | 5 + .../apache/crunch/impl/mr/plan/MSCRPlanner.java | 121 ++++++++++----- .../org/apache/crunch/io/ReadableSourceTarget.java | 1 + .../org/apache/crunch/lib/join/MapsideJoin.java | 72 ++++++---- .../crunch/materialize/MaterializableIterable.java | 9 + 13 files changed, 341 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java b/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java index 9147baf..7d5d94d 100644 --- a/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java +++ b/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java @@ -29,7 +29,9 @@ import org.apache.crunch.MapFn; import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.Pipeline; +import org.apache.crunch.PipelineResult; import org.apache.crunch.fn.FilterFns; +import org.apache.crunch.fn.MapValuesFn; import org.apache.crunch.impl.mem.MemPipeline; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.test.TemporaryPath; @@ -64,21 +66,33 @@ public class MapsideJoinIT { } private static class LineSplitter extends MapFn<String, Pair<Integer, String>> { - @Override public Pair<Integer, String> map(String input) { String[] fields = input.split("\\|"); return Pair.of(Integer.parseInt(fields[0]), fields[1]); } - } + private static class CapOrdersFn extends MapValuesFn<Integer, String, String> { + @Override + public String map(String v) { + return v.toUpperCase(); + } + } + + private static class ConcatValuesFn extends MapValuesFn<Integer, Pair<String, String>, String> { + @Override + public String map(Pair<String, String> v) { + return v.toString(); + } + } + @Rule public TemporaryPath tmpDir = TemporaryPaths.create(); - @Test(expected = CrunchRuntimeException.class) - public void testNonMapReducePipeline() { - runMapsideJoin(MemPipeline.getInstance()); + @Test + public void testMapSideJoin_MemPipeline() { + runMapsideJoin(MemPipeline.getInstance(), true); } @Test @@ -95,27 +109,39 @@ public class MapsideJoinIT { List<Pair<Integer, Pair<String, String>>> materializedJoin = Lists.newArrayList(joined.materialize()); assertTrue(materializedJoin.isEmpty()); - } @Test public void testMapsideJoin() throws IOException { - runMapsideJoin(new MRPipeline(MapsideJoinIT.class, tmpDir.getDefaultConfiguration())); + runMapsideJoin(new MRPipeline(MapsideJoinIT.class, tmpDir.getDefaultConfiguration()), false); } - private void runMapsideJoin(Pipeline pipeline) { + private void runMapsideJoin(Pipeline pipeline, boolean inMemory) { PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt"); PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt"); + + PTable<Integer, String> custOrders = MapsideJoin.join(customerTable, orderTable) + .parallelDo("concat", new ConcatValuesFn(), Writables.tableOf(Writables.ints(), Writables.strings())); - PTable<Integer, Pair<String, String>> joined = MapsideJoin.join(customerTable, orderTable); + PTable<Integer, String> ORDER_TABLE = orderTable.parallelDo(new CapOrdersFn(), orderTable.getPTableType()); + + PTable<Integer, Pair<String, String>> joined = MapsideJoin.join(custOrders, ORDER_TABLE); List<Pair<Integer, Pair<String, String>>> expectedJoinResult = Lists.newArrayList(); - expectedJoinResult.add(Pair.of(111, Pair.of("John Doe", "Corn flakes"))); - expectedJoinResult.add(Pair.of(222, Pair.of("Jane Doe", "Toilet paper"))); - expectedJoinResult.add(Pair.of(222, Pair.of("Jane Doe", "Toilet plunger"))); - expectedJoinResult.add(Pair.of(333, Pair.of("Someone Else", "Toilet brush"))); - - List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(joined.materialize()); + expectedJoinResult.add(Pair.of(111, Pair.of("[John Doe,Corn flakes]", "CORN FLAKES"))); + expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet paper]", "TOILET PAPER"))); + expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet paper]", "TOILET PLUNGER"))); + expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet plunger]", "TOILET PAPER"))); + expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet plunger]", "TOILET PLUNGER"))); + expectedJoinResult.add(Pair.of(333, Pair.of("[Someone Else,Toilet brush]", "TOILET BRUSH"))); + Iterable<Pair<Integer, Pair<String, String>>> iter = joined.materialize(); + + PipelineResult res = pipeline.run(); + if (!inMemory) { + assertEquals(2, res.getStageResults().size()); + } + + List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(iter); Collections.sort(joinedResultList); assertEquals(expectedJoinResult, joinedResultList); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/PCollection.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/PCollection.java b/crunch/src/main/java/org/apache/crunch/PCollection.java index 00c300f..798c262 100644 --- a/crunch/src/main/java/org/apache/crunch/PCollection.java +++ b/crunch/src/main/java/org/apache/crunch/PCollection.java @@ -65,6 +65,23 @@ public interface PCollection<S> { * @return a new {@code PCollection} */ <T> PCollection<T> parallelDo(String name, DoFn<S, T> doFn, PType<T> type); + + /** + * Applies the given doFn to the elements of this {@code PCollection} and + * returns a new {@code PCollection} that is the output of this processing. + * + * @param name + * An identifier for this processing step, useful for debugging + * @param doFn + * The {@code DoFn} to apply + * @param type + * The {@link PType} of the resulting {@code PCollection} + * @param options + * Optional information that is needed for certain pipeline operations + * @return a new {@code PCollection} + */ + <T> PCollection<T> parallelDo(String name, DoFn<S, T> doFn, PType<T> type, + ParallelDoOptions options); /** * Similar to the other {@code parallelDo} instance, but returns a @@ -91,6 +108,23 @@ public interface PCollection<S> { * @return a new {@code PTable} */ <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type); + + /** + * Similar to the other {@code parallelDo} instance, but returns a + * {@code PTable} instance instead of a {@code PCollection}. + * + * @param name + * An identifier for this processing step + * @param doFn + * The {@code DoFn} to apply + * @param type + * The {@link PTableType} of the resulting {@code PTable} + * @param options + * Optional information that is needed for certain pipeline operations + * @return a new {@code PTable} + */ + <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type, + ParallelDoOptions options); /** * Write the contents of this {@code PCollection} to the given {@code Target}, http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/ParallelDoOptions.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/ParallelDoOptions.java b/crunch/src/main/java/org/apache/crunch/ParallelDoOptions.java new file mode 100644 index 0000000..2407b3a --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/ParallelDoOptions.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import java.util.Collections; +import java.util.Set; + +import com.google.common.collect.Sets; + +/** + * Container class that includes optional information about a {@code parallelDo} operation + * applied to a {@code PCollection}. Primarily used within the Crunch framework + * itself for certain types of advanced processing operations, such as in-memory joins + * that require reading a file from the filesystem into a {@code DoFn}. + */ +public class ParallelDoOptions { + private final Set<SourceTarget<?>> sourceTargets; + + private ParallelDoOptions(Set<SourceTarget<?>> sourceTargets) { + this.sourceTargets = sourceTargets; + } + + public Set<SourceTarget<?>> getSourceTargets() { + return sourceTargets; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private Set<SourceTarget<?>> sourceTargets; + + public Builder() { + this.sourceTargets = Sets.newHashSet(); + } + + public Builder sourceTargets(SourceTarget<?>... sourceTargets) { + Collections.addAll(this.sourceTargets, sourceTargets); + return this; + } + + public ParallelDoOptions build() { + return new ParallelDoOptions(sourceTargets); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java index 77c41ce..3e28a0c 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java @@ -28,6 +28,7 @@ import org.apache.crunch.Pair; import org.apache.crunch.Pipeline; import org.apache.crunch.PipelineResult; import org.apache.crunch.Source; +import org.apache.crunch.SourceTarget; import org.apache.crunch.TableSource; import org.apache.crunch.Target; import org.apache.crunch.impl.mem.collect.MemCollection; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java index 35f64ce..ffc38ae 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java @@ -31,6 +31,7 @@ import org.apache.crunch.PCollection; import org.apache.crunch.PObject; import org.apache.crunch.PTable; import org.apache.crunch.Pair; +import org.apache.crunch.ParallelDoOptions; import org.apache.crunch.Pipeline; import org.apache.crunch.Target; import org.apache.crunch.fn.ExtractKeyFn; @@ -95,6 +96,12 @@ public class MemCollection<S> implements PCollection<S> { @Override public <T> PCollection<T> parallelDo(String name, DoFn<S, T> doFn, PType<T> type) { + return parallelDo(name, doFn, type, ParallelDoOptions.builder().build()); + } + + @Override + public <T> PCollection<T> parallelDo(String name, DoFn<S, T> doFn, PType<T> type, + ParallelDoOptions options) { InMemoryEmitter<T> emitter = new InMemoryEmitter<T>(); doFn.initialize(); for (S s : collect) { @@ -111,6 +118,12 @@ public class MemCollection<S> implements PCollection<S> { @Override public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type) { + return parallelDo(name, doFn, type, ParallelDoOptions.builder().build()); + } + + @Override + public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type, + ParallelDoOptions options) { InMemoryEmitter<Pair<K, V>> emitter = new InMemoryEmitter<Pair<K, V>>(); doFn.setContext(getInMemoryContext(getPipeline().getConfiguration())); doFn.initialize(); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java index 1f4fea2..7b8f2ea 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java @@ -18,12 +18,16 @@ package org.apache.crunch.impl.mr.collect; import java.util.List; +import java.util.Set; import org.apache.crunch.DoFn; +import org.apache.crunch.ParallelDoOptions; +import org.apache.crunch.SourceTarget; import org.apache.crunch.impl.mr.plan.DoNode; import org.apache.crunch.types.PType; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; public class DoCollectionImpl<S> extends PCollectionImpl<S> { @@ -32,7 +36,12 @@ public class DoCollectionImpl<S> extends PCollectionImpl<S> { private final PType<S> ntype; <T> DoCollectionImpl(String name, PCollectionImpl<T> parent, DoFn<T, S> fn, PType<S> ntype) { - super(name); + this(name, parent, fn, ntype, ParallelDoOptions.builder().build()); + } + + <T> DoCollectionImpl(String name, PCollectionImpl<T> parent, DoFn<T, S> fn, PType<S> ntype, + ParallelDoOptions options) { + super(name, options); this.parent = (PCollectionImpl<Object>) parent; this.fn = (DoFn<Object, S>) fn; this.ntype = ntype; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java index 1d19580..176643b 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java @@ -23,6 +23,7 @@ import org.apache.crunch.CombineFn; import org.apache.crunch.DoFn; import org.apache.crunch.PTable; import org.apache.crunch.Pair; +import org.apache.crunch.ParallelDoOptions; import org.apache.crunch.impl.mr.plan.DoNode; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; @@ -36,7 +37,12 @@ public class DoTableImpl<K, V> extends PTableBase<K, V> implements PTable<K, V> private final PTableType<K, V> type; <S> DoTableImpl(String name, PCollectionImpl<S> parent, DoFn<S, Pair<K, V>> fn, PTableType<K, V> ntype) { - super(name); + this(name, parent, fn, ntype, ParallelDoOptions.builder().build()); + } + + <S> DoTableImpl(String name, PCollectionImpl<S> parent, DoFn<S, Pair<K, V>> fn, PTableType<K, V> ntype, + ParallelDoOptions options) { + super(name, options); this.parent = parent; this.fn = fn; this.type = ntype; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java index 79fe72b..8ad6692 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java @@ -20,6 +20,7 @@ package org.apache.crunch.impl.mr.collect; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -30,6 +31,7 @@ import org.apache.crunch.PCollection; import org.apache.crunch.PObject; import org.apache.crunch.PTable; import org.apache.crunch.Pair; +import org.apache.crunch.ParallelDoOptions; import org.apache.crunch.Pipeline; import org.apache.crunch.SourceTarget; import org.apache.crunch.Target; @@ -43,6 +45,7 @@ import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; public abstract class PCollectionImpl<S> implements PCollection<S> { @@ -51,9 +54,15 @@ public abstract class PCollectionImpl<S> implements PCollection<S> { private final String name; protected MRPipeline pipeline; private SourceTarget<S> materializedAt; - + private final ParallelDoOptions options; + public PCollectionImpl(String name) { + this(name, ParallelDoOptions.builder().build()); + } + + public PCollectionImpl(String name, ParallelDoOptions options) { this.name = name; + this.options = options; } @Override @@ -86,7 +95,13 @@ public abstract class PCollectionImpl<S> implements PCollection<S> { public <T> PCollection<T> parallelDo(String name, DoFn<S, T> fn, PType<T> type) { return new DoCollectionImpl<T>(name, getChainingCollection(), fn, type); } - + + @Override + public <T> PCollection<T> parallelDo(String name, DoFn<S, T> fn, PType<T> type, + ParallelDoOptions options) { + return new DoCollectionImpl<T>(name, getChainingCollection(), fn, type, options); + } + @Override public <K, V> PTable<K, V> parallelDo(DoFn<S, Pair<K, V>> fn, PTableType<K, V> type) { MRPipeline pipeline = (MRPipeline) getPipeline(); @@ -98,6 +113,12 @@ public abstract class PCollectionImpl<S> implements PCollection<S> { return new DoTableImpl<K, V>(name, getChainingCollection(), fn, type); } + @Override + public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> fn, PTableType<K, V> type, + ParallelDoOptions options) { + return new DoTableImpl<K, V>(name, getChainingCollection(), fn, type, options); + } + public PCollection<S> write(Target target) { if (materializedAt != null) { getPipeline().write(new InputCollection<S>(materializedAt, (MRPipeline) getPipeline()), target); @@ -194,7 +215,15 @@ public abstract class PCollectionImpl<S> implements PCollection<S> { } return pipeline; } - + + public Set<SourceTarget<?>> getTargetDependencies() { + Set<SourceTarget<?>> targetDeps = options.getSourceTargets(); + for (PCollectionImpl<?> parent : getParents()) { + targetDeps = Sets.union(targetDeps, parent.getTargetDependencies()); + } + return targetDeps; + } + public int getDepth() { int parentMax = 0; for (PCollectionImpl parent : getParents()) { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java index 03c2fdc..69ea8a3 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java @@ -27,6 +27,7 @@ import org.apache.crunch.PCollection; import org.apache.crunch.PObject; import org.apache.crunch.PTable; import org.apache.crunch.Pair; +import org.apache.crunch.ParallelDoOptions; import org.apache.crunch.Target; import org.apache.crunch.lib.Aggregate; import org.apache.crunch.lib.Cogroup; @@ -44,6 +45,10 @@ abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K, V>> implements P super(name); } + public PTableBase(String name, ParallelDoOptions options) { + super(name, options); + } + public PType<K> getKeyType() { return getPTableType().getKeyType(); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java index 7fe2809..3718ec2 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; @@ -35,14 +36,23 @@ import org.apache.hadoop.conf.Configuration; import com.google.common.collect.HashMultimap; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; public class MSCRPlanner { + private final MRPipeline pipeline; + private final Map<PCollectionImpl<?>, Set<Target>> outputs; + + public MSCRPlanner(MRPipeline pipeline, Map<PCollectionImpl<?>, Set<Target>> outputs) { + this.pipeline = pipeline; + this.outputs = new TreeMap<PCollectionImpl<?>, Set<Target>>(DEPTH_COMPARATOR); + this.outputs.putAll(outputs); + } + // Used to ensure that we always build pipelines starting from the deepest - // outputs, which - // helps ensure that we handle intermediate outputs correctly. + // outputs, which helps ensure that we handle intermediate outputs correctly. private static final Comparator<PCollectionImpl<?>> DEPTH_COMPARATOR = new Comparator<PCollectionImpl<?>>() { @Override public int compare(PCollectionImpl<?> left, PCollectionImpl<?> right) { @@ -50,55 +60,88 @@ public class MSCRPlanner { if (cmp == 0) { // Ensure we don't throw away two output collections at the same depth. // Using the collection name would be nicer here, but names aren't - // necessarily unique + // necessarily unique. cmp = new Integer(right.hashCode()).compareTo(left.hashCode()); } return cmp; } }; - private final MRPipeline pipeline; - private final Map<PCollectionImpl<?>, Set<Target>> outputs; - - public MSCRPlanner(MRPipeline pipeline, Map<PCollectionImpl<?>, Set<Target>> outputs) { - this.pipeline = pipeline; - this.outputs = new TreeMap<PCollectionImpl<?>, Set<Target>>(DEPTH_COMPARATOR); - this.outputs.putAll(outputs); - } - public MRExecutor plan(Class<?> jarClass, Configuration conf) throws IOException { - // Walk the current plan tree and build a graph in which the vertices are - // sources, targets, and GBK operations. - GraphBuilder graphBuilder = new GraphBuilder(); - for (PCollectionImpl<?> output : outputs.keySet()) { - graphBuilder.visitOutput(output); + Map<PCollectionImpl<?>, Set<SourceTarget<?>>> targetDeps = Maps.newTreeMap(DEPTH_COMPARATOR); + for (PCollectionImpl<?> pcollect : outputs.keySet()) { + targetDeps.put(pcollect, pcollect.getTargetDependencies()); } - Graph baseGraph = graphBuilder.getGraph(); - - // Create a new graph that splits up up dependent GBK nodes. - Graph graph = prepareFinalGraph(baseGraph); - - // Break the graph up into connected components. - List<List<Vertex>> components = graph.connectedComponents(); - - // For each component, we will create one or more job prototypes, - // depending on its profile. - // For dependency handling, we only need to care about which - // job prototype a particular GBK is assigned to. Multimap<Vertex, JobPrototype> assignments = HashMultimap.create(); - for (List<Vertex> component : components) { - assignments.putAll(constructJobPrototypes(component)); + Multimap<PCollectionImpl<?>, Vertex> protoDependency = HashMultimap.create(); + while (!targetDeps.isEmpty()) { + Set<Target> allTargets = Sets.newHashSet(); + for (PCollectionImpl<?> pcollect : targetDeps.keySet()) { + allTargets.addAll(outputs.get(pcollect)); + } + GraphBuilder graphBuilder = new GraphBuilder(); + + // Walk the current plan tree and build a graph in which the vertices are + // sources, targets, and GBK operations. + Set<PCollectionImpl<?>> currentStage = Sets.newHashSet(); + Set<PCollectionImpl<?>> laterStage = Sets.newHashSet(); + for (PCollectionImpl<?> output : targetDeps.keySet()) { + if (Sets.intersection(allTargets, targetDeps.get(output)).isEmpty()) { + graphBuilder.visitOutput(output); + currentStage.add(output); + } else { + laterStage.add(output); + } + } + + Graph baseGraph = graphBuilder.getGraph(); + + // Create a new graph that splits up up dependent GBK nodes. + Graph graph = prepareFinalGraph(baseGraph); + + // Break the graph up into connected components. + List<List<Vertex>> components = graph.connectedComponents(); + + // For each component, we will create one or more job prototypes, + // depending on its profile. + // For dependency handling, we only need to care about which + // job prototype a particular GBK is assigned to. + for (List<Vertex> component : components) { + assignments.putAll(constructJobPrototypes(component)); + } + + // Add in the job dependency information here. + for (Map.Entry<Vertex, JobPrototype> e : assignments.entries()) { + JobPrototype current = e.getValue(); + List<Vertex> parents = graph.getParents(e.getKey()); + for (Vertex parent : parents) { + for (JobPrototype parentJobProto : assignments.get(parent)) { + current.addDependency(parentJobProto); + } + } + } + + // Add cross-stage dependencies. + for (PCollectionImpl<?> output : currentStage) { + Set<Target> targets = outputs.get(output); + Vertex vertex = graph.getVertexAt(output); + for (PCollectionImpl<?> later : laterStage) { + if (!Sets.intersection(targets, targetDeps.get(later)).isEmpty()) { + protoDependency.put(later, vertex); + } + } + targetDeps.remove(output); + } } - - // Add in the job dependency information here. - for (Map.Entry<Vertex, JobPrototype> e : assignments.entries()) { - JobPrototype current = e.getValue(); - List<Vertex> parents = graph.getParents(e.getKey()); - for (Vertex parent : parents) { - for (JobPrototype parentJobProto : assignments.get(parent)) { - current.addDependency(parentJobProto); + // Cross-job dependencies. + for (Entry<PCollectionImpl<?>, Vertex> pd : protoDependency.entries()) { + Vertex d = new Vertex(pd.getKey()); + Vertex dj = pd.getValue(); + for (JobPrototype parent : assignments.get(dj)) { + for (JobPrototype child : assignments.get(d)) { + child.addDependency(parent); } } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java index 95c90aa..ac979c3 100644 --- a/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java +++ b/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java @@ -18,6 +18,7 @@ package org.apache.crunch.io; import org.apache.crunch.SourceTarget; +import org.apache.hadoop.fs.Path; /** * An interface that indicates that a {@code SourceTarget} instance can be read http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java b/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java index 1acbf2d..8116ea1 100644 --- a/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java +++ b/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java @@ -24,16 +24,18 @@ import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; import org.apache.crunch.PTable; import org.apache.crunch.Pair; -import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.ParallelDoOptions; import org.apache.crunch.io.ReadableSourceTarget; -import org.apache.crunch.io.impl.SourcePathTargetImpl; +import org.apache.crunch.materialize.MaterializableIterable; import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; /** @@ -64,37 +66,46 @@ public class MapsideJoin { * @return A table keyed on the join key, containing pairs of joined values */ public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right) { - - if (!(right.getPipeline() instanceof MRPipeline)) { - throw new CrunchRuntimeException("Map-side join is only supported within a MapReduce context"); + PTypeFamily tf = left.getTypeFamily(); + Iterable<Pair<K, V>> iterable = right.materialize(); + + if (iterable instanceof MaterializableIterable) { + MaterializableIterable<Pair<K, V>> mi = (MaterializableIterable<Pair<K, V>>) iterable; + MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(mi.getPath().toString(), right.getPType()); + ParallelDoOptions options = ParallelDoOptions.builder() + .sourceTargets(mi.getSourceTarget()) + .build(); + return left.parallelDo("mapjoin", mapJoinDoFn, + tf.tableOf(left.getKeyType(), tf.pairs(left.getValueType(), right.getValueType())), + options); + } else { // in-memory pipeline + return left.parallelDo(new InMemoryJoinFn<K, U, V>(iterable), + tf.tableOf(left.getKeyType(), tf.pairs(left.getValueType(), right.getValueType()))); } + } - MRPipeline pipeline = (MRPipeline) right.getPipeline(); - pipeline.materialize(right); - - // TODO Move necessary logic to MRPipeline so that we can theoretically - // optimize his by running the setup of multiple map-side joins concurrently - pipeline.run(); + static class InMemoryJoinFn<K, U, V> extends DoFn<Pair<K, U>, Pair<K, Pair<U, V>>> { - ReadableSourceTarget<Pair<K, V>> readableSourceTarget = pipeline.getMaterializeSourceTarget(right); - if (!(readableSourceTarget instanceof SourcePathTargetImpl)) { - throw new CrunchRuntimeException("Right-side contents can't be read from a path"); + private Multimap<K, V> joinMap; + + public InMemoryJoinFn(Iterable<Pair<K, V>> iterable) { + joinMap = HashMultimap.create(); + for (Pair<K, V> joinPair : iterable) { + joinMap.put(joinPair.first(), joinPair.second()); + } + } + + @Override + public void process(Pair<K, U> input, Emitter<Pair<K, Pair<U, V>>> emitter) { + K key = input.first(); + U value = input.second(); + for (V joinValue : joinMap.get(key)) { + Pair<U, V> valuePair = Pair.of(value, joinValue); + emitter.emit(Pair.of(key, valuePair)); + } } - - // Suppress warnings because we've just checked this cast via instanceof - @SuppressWarnings("unchecked") - SourcePathTargetImpl<Pair<K, V>> sourcePathTarget = (SourcePathTargetImpl<Pair<K, V>>) readableSourceTarget; - - Path path = sourcePathTarget.getPath(); - DistributedCache.addCacheFile(path.toUri(), pipeline.getConfiguration()); - - MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(path.getName(), right.getPType()); - PTypeFamily typeFamily = left.getTypeFamily(); - return left.parallelDo("mapjoin", mapJoinDoFn, - typeFamily.tableOf(left.getKeyType(), typeFamily.pairs(left.getValueType(), right.getValueType()))); - } - + static class MapsideJoinDoFn<K, U, V> extends DoFn<Pair<K, U>, Pair<K, Pair<U, V>>> { private String inputPath; @@ -122,6 +133,11 @@ public class MapsideJoin { } @Override + public void configure(Configuration conf) { + DistributedCache.addCacheFile(new Path(inputPath).toUri(), conf); + } + + @Override public void initialize() { super.initialize(); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java b/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java index 2d6c573..0ed29e3 100644 --- a/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java +++ b/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java @@ -24,7 +24,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.Pipeline; +import org.apache.crunch.io.PathTarget; import org.apache.crunch.io.ReadableSourceTarget; +import org.apache.hadoop.fs.Path; public class MaterializableIterable<E> implements Iterable<E> { @@ -44,6 +46,13 @@ public class MaterializableIterable<E> implements Iterable<E> { return sourceTarget; } + public Path getPath() { + if (sourceTarget instanceof PathTarget) { + return ((PathTarget) sourceTarget).getPath(); + } + return null; + } + @Override public Iterator<E> iterator() { if (materialized == null) {
