Updated Branches: refs/heads/master 70c4edd0b -> 41f01c037
CRUNCH-155: Don't trigger a MapReduce job to materialize an InputCollection. Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/41f01c03 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/41f01c03 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/41f01c03 Branch: refs/heads/master Commit: 41f01c037007fb54091a69464d81c3269b269710 Parents: 70c4edd Author: Josh Wills <[email protected]> Authored: Thu Jan 31 21:11:03 2013 -0800 Committer: Josh Wills <[email protected]> Committed: Sun Feb 3 08:49:00 2013 -0800 ---------------------------------------------------------------------- .../src/it/java/org/apache/crunch/UnionGbkIT.java | 117 +++++++++++++++ .../java/org/apache/crunch/impl/mr/MRPipeline.java | 62 ++++++--- .../apache/crunch/impl/mr/collect/InputTable.java | 4 + .../crunch/impl/mr/collect/PCollectionImpl.java | 1 + .../org/apache/crunch/io/impl/FileSourceImpl.java | 4 + .../org/apache/crunch/lib/join/MapsideJoin.java | 10 +- .../crunch/materialize/MaterializableIterable.java | 28 +++-- 7 files changed, 193 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/41f01c03/crunch/src/it/java/org/apache/crunch/UnionGbkIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/UnionGbkIT.java b/crunch/src/it/java/org/apache/crunch/UnionGbkIT.java new file mode 100644 index 0000000..3937fe8 --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/UnionGbkIT.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import static org.junit.Assert.assertNotNull; + +import org.apache.crunch.PCollection; +import org.apache.crunch.PGroupedTable; +import org.apache.crunch.PTable; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.avro.Avros; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +public class UnionGbkIT { + + @Rule + public TemporaryPath tmpDir = TemporaryPaths.create(); + + MRPipeline pipeline; + + public static class FirstLetterKeyFn extends DoFn<String, Pair<String, String>> { + @Override + public void process(String input, Emitter<Pair<String, String>> emitter) { + if (input.length() > 0) { + emitter.emit(Pair.of(input.substring(0, 1), input)); + } + } + } + + public static class ConcatGroupFn extends DoFn<Pair<String, Iterable<String>>, String> { + @Override + public void process(Pair<String, Iterable<String>> input, Emitter<String> emitter) { + StringBuilder sb = new StringBuilder(); + for (String str : input.second()) { + sb.append(str); + } + emitter.emit(sb.toString()); + } + } + + @Before + public void setUp() { + pipeline = new MRPipeline(UnionGbkIT.class, tmpDir.getDefaultConfiguration()); + } + + @After + public void tearDown() { + pipeline.done(); + } + + @Test + public void tableOfUnionGbk() throws Exception { + PCollection<String> words = pipeline.readTextFile( + tmpDir.copyResourceFileName("shakes.txt")); + PCollection<String> lorum = pipeline.readTextFile( + tmpDir.copyResourceFileName("maugham.txt")); + lorum.materialize(); + + @SuppressWarnings("unchecked") + PCollection<String> union = words.union(lorum); + + PGroupedTable<String, String> groupedByFirstLetter = + union.parallelDo("byFirstLetter", new FirstLetterKeyFn(), + Avros.tableOf(Avros.strings(), Avros.strings())) + .groupByKey(); + PCollection<String> concatted = groupedByFirstLetter + .parallelDo("concat", new ConcatGroupFn(), Avros.strings()); + + assertNotNull(concatted.materialize().iterator()); + } + + @Test + public void unionOfTablesGbk() throws Exception { + PCollection<String> words = pipeline.readTextFile( + tmpDir.copyResourceFileName("shakes.txt")); + PCollection<String> lorum = pipeline.readTextFile( + tmpDir.copyResourceFileName("maugham.txt")); + lorum.materialize(); + + PTable<String, String> wordsByFirstLetter = + words.parallelDo("byFirstLetter", new FirstLetterKeyFn(), + Avros.tableOf(Avros.strings(), Avros.strings())); + PTable<String, String> lorumByFirstLetter = + lorum.parallelDo("byFirstLetter", new FirstLetterKeyFn(), + Avros.tableOf(Avros.strings(), Avros.strings())); + + @SuppressWarnings("unchecked") + PTable<String, String> union = wordsByFirstLetter.union(lorumByFirstLetter); + + PGroupedTable<String, String> groupedByFirstLetter = union.groupByKey(); + + PCollection<String> concatted = groupedByFirstLetter.parallelDo("concat", + new ConcatGroupFn(), Avros.strings()); + + assertNotNull(concatted.materialize().iterator()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/41f01c03/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java index 6ef7491..9c98937 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java @@ -45,6 +45,7 @@ import org.apache.crunch.impl.mr.exec.MRExecutor; import org.apache.crunch.impl.mr.plan.MSCRPlanner; import org.apache.crunch.impl.mr.run.RuntimeParameters; import org.apache.crunch.io.From; +import org.apache.crunch.io.ReadableSource; import org.apache.crunch.io.ReadableSourceTarget; import org.apache.crunch.io.To; import org.apache.crunch.materialize.MaterializableIterable; @@ -156,8 +157,10 @@ public class MRPipeline implements Pipeline { for (PCollectionImpl<?> c : outputTargets.keySet()) { if (outputTargetsToMaterialize.containsKey(c)) { MaterializableIterable iter = outputTargetsToMaterialize.get(c); - iter.materialize(); - c.materializeAt(iter.getSourceTarget()); + if (iter.isSourceTarget()) { + iter.materialize(); + c.materializeAt((SourceTarget) iter.getSource()); + } outputTargetsToMaterialize.remove(c); } else { boolean materialized = false; @@ -225,9 +228,9 @@ public class MRPipeline implements Pipeline { public <T> Iterable<T> materialize(PCollection<T> pcollection) { PCollectionImpl<T> pcollectionImpl = toPcollectionImpl(pcollection); - ReadableSourceTarget<T> srcTarget = getMaterializeSourceTarget(pcollectionImpl); + ReadableSource<T> readableSrc = getMaterializeSourceTarget(pcollectionImpl); - MaterializableIterable<T> c = new MaterializableIterable<T>(this, srcTarget); + MaterializableIterable<T> c = new MaterializableIterable<T>(this, readableSrc); if (!outputTargetsToMaterialize.containsKey(pcollectionImpl)) { outputTargetsToMaterialize.put(pcollectionImpl, c); } @@ -245,35 +248,56 @@ public class MRPipeline implements Pipeline { * @throws IllegalArgumentException If no ReadableSourceTarget can be retrieved for the given * PCollection */ - public <T> ReadableSourceTarget<T> getMaterializeSourceTarget(PCollection<T> pcollection) { + public <T> ReadableSource<T> getMaterializeSourceTarget(PCollection<T> pcollection) { PCollectionImpl<T> impl = toPcollectionImpl(pcollection); + + // First, check to see if this is a readable input collection. + if (impl instanceof InputCollection) { + InputCollection<T> ic = (InputCollection<T>) impl; + if (ic.getSource() instanceof ReadableSource) { + return (ReadableSource) ic.getSource(); + } else { + throw new IllegalArgumentException( + "Cannot materialize non-readable input collection: " + ic); + } + } else if (impl instanceof InputTable) { + InputTable it = (InputTable) impl; + if (it.getSource() instanceof ReadableSource) { + return (ReadableSource) it.getSource(); + } else { + throw new IllegalArgumentException( + "Cannot materialize non-readable input table: " + it); + } + } + + // Next, check to see if this pcollection has already been materialized. SourceTarget<T> matTarget = impl.getMaterializedAt(); if (matTarget != null && matTarget instanceof ReadableSourceTarget) { return (ReadableSourceTarget<T>) matTarget; } - + + // Check to see if we plan on materializing this collection on the + // next run. ReadableSourceTarget<T> srcTarget = null; if (outputTargets.containsKey(pcollection)) { for (Target target : outputTargets.get(impl)) { if (target instanceof ReadableSourceTarget) { - srcTarget = (ReadableSourceTarget<T>) target; - break; + return (ReadableSourceTarget<T>) target; } } } - if (srcTarget == null) { - SourceTarget<T> st = createIntermediateOutput(pcollection.getPType()); - if (!(st instanceof ReadableSourceTarget)) { - throw new IllegalArgumentException("The PType for the given PCollection is not readable" - + " and cannot be materialized"); - } else { - srcTarget = (ReadableSourceTarget<T>) st; - addOutput(impl, srcTarget); - } + // If we're not planning on materializing it already, create a temporary + // output to hold the materialized records and return that. + SourceTarget<T> st = createIntermediateOutput(pcollection.getPType()); + if (!(st instanceof ReadableSourceTarget)) { + throw new IllegalArgumentException("The PType for the given PCollection is not readable" + + " and cannot be materialized"); + } else { + srcTarget = (ReadableSourceTarget<T>) st; + addOutput(impl, srcTarget); + return srcTarget; } - - return srcTarget; } /** http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/41f01c03/crunch/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java index 9f64803..71f11c5 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java @@ -40,6 +40,10 @@ public class InputTable<K, V> extends PTableBase<K, V> { this.asCollection = new InputCollection<Pair<K, V>>(source, pipeline); } + public TableSource<K, V> getSource() { + return source; + } + @Override protected long getSizeInternal() { return asCollection.getSizeInternal(); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/41f01c03/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java index 8ad6692..296043f 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java @@ -38,6 +38,7 @@ import org.apache.crunch.Target; import org.apache.crunch.fn.ExtractKeyFn; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.impl.mr.plan.DoNode; +import org.apache.crunch.io.ReadableSource; import org.apache.crunch.lib.Aggregate; import org.apache.crunch.materialize.pobject.CollectionPObject; import org.apache.crunch.types.PTableType; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/41f01c03/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java index 964c6a0..688c801 100644 --- a/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java +++ b/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java @@ -53,6 +53,10 @@ public class FileSourceImpl<T> implements Source<T> { this.inputBundle = inputBundle; } + public Path getPath() { + return path; + } + @Override public void configureSource(Job job, int inputId) throws IOException { if (inputId == -1) { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/41f01c03/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java b/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java index 9b532c5..fa28155 100644 --- a/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java +++ b/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java @@ -25,6 +25,7 @@ import org.apache.crunch.Emitter; import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.ParallelDoOptions; +import org.apache.crunch.SourceTarget; import org.apache.crunch.io.ReadableSourceTarget; import org.apache.crunch.materialize.MaterializableIterable; import org.apache.crunch.types.PType; @@ -72,12 +73,13 @@ public class MapsideJoin { if (iterable instanceof MaterializableIterable) { MaterializableIterable<Pair<K, V>> mi = (MaterializableIterable<Pair<K, V>>) iterable; MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(mi.getPath().toString(), right.getPType()); - ParallelDoOptions options = ParallelDoOptions.builder() - .sourceTargets(mi.getSourceTarget()) - .build(); + ParallelDoOptions.Builder optionsBuilder = ParallelDoOptions.builder(); + if (mi.isSourceTarget()) { + optionsBuilder.sourceTargets((SourceTarget) mi.getSource()); + } return left.parallelDo("mapjoin", mapJoinDoFn, tf.tableOf(left.getKeyType(), tf.pairs(left.getValueType(), right.getValueType())), - options); + optionsBuilder.build()); } else { // in-memory pipeline return left.parallelDo(new InMemoryJoinFn<K, U, V>(iterable), tf.tableOf(left.getKeyType(), tf.pairs(left.getValueType(), right.getValueType()))); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/41f01c03/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java b/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java index 0ed29e3..2dcc64f 100644 --- a/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java +++ b/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java @@ -24,8 +24,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.Pipeline; +import org.apache.crunch.SourceTarget; import org.apache.crunch.io.PathTarget; -import org.apache.crunch.io.ReadableSourceTarget; +import org.apache.crunch.io.ReadableSource; +import org.apache.crunch.io.impl.FileSourceImpl; import org.apache.hadoop.fs.Path; public class MaterializableIterable<E> implements Iterable<E> { @@ -33,22 +35,28 @@ public class MaterializableIterable<E> implements Iterable<E> { private static final Log LOG = LogFactory.getLog(MaterializableIterable.class); private final Pipeline pipeline; - private final ReadableSourceTarget<E> sourceTarget; + private final ReadableSource<E> source; private Iterable<E> materialized; - public MaterializableIterable(Pipeline pipeline, ReadableSourceTarget<E> source) { + public MaterializableIterable(Pipeline pipeline, ReadableSource<E> source) { this.pipeline = pipeline; - this.sourceTarget = source; + this.source = source; this.materialized = null; } - public ReadableSourceTarget<E> getSourceTarget() { - return sourceTarget; + public ReadableSource<E> getSource() { + return source; } + public boolean isSourceTarget() { + return (source instanceof SourceTarget); + } + public Path getPath() { - if (sourceTarget instanceof PathTarget) { - return ((PathTarget) sourceTarget).getPath(); + if (source instanceof FileSourceImpl) { + return ((FileSourceImpl) source).getPath(); + } else if (source instanceof PathTarget) { + return ((PathTarget) source).getPath(); } return null; } @@ -64,9 +72,9 @@ public class MaterializableIterable<E> implements Iterable<E> { public void materialize() { try { - materialized = sourceTarget.read(pipeline.getConfiguration()); + materialized = source.read(pipeline.getConfiguration()); } catch (IOException e) { - LOG.error("Could not materialize: " + sourceTarget, e); + LOG.error("Could not materialize: " + source, e); throw new CrunchRuntimeException(e); } }
