Repository: crunch Updated Branches: refs/heads/master b559e0e58 -> 9355e74ec
CRUNCH-376: Add aggregate(...) method to PCollection. Contributed by Jason Gauci, and then split aggregate(...) into aggregate(...) and first(). Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/9355e74e Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/9355e74e Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/9355e74e Branch: refs/heads/master Commit: 9355e74ec221bd6e2835ac591e7e30f1654799b5 Parents: b559e0e Author: Josh Wills <[email protected]> Authored: Mon Apr 14 17:32:01 2014 -0700 Committer: Josh Wills <[email protected]> Committed: Tue Apr 15 15:00:09 2014 -0700 ---------------------------------------------------------------------- .../java/org/apache/crunch/PCollection.java | 11 +++ .../impl/dist/collect/PCollectionImpl.java | 11 +++ .../crunch/impl/mem/collect/MemCollection.java | 10 +++ .../java/org/apache/crunch/lib/Aggregate.java | 13 ++++ .../apache/crunch/examples/TotalWordCount.java | 78 ++++++++++++++++++++ 5 files changed, 123 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/9355e74e/crunch-core/src/main/java/org/apache/crunch/PCollection.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/PCollection.java b/crunch-core/src/main/java/org/apache/crunch/PCollection.java index 2d62d00..bf5bacc 100644 --- a/crunch-core/src/main/java/org/apache/crunch/PCollection.java +++ b/crunch-core/src/main/java/org/apache/crunch/PCollection.java @@ -183,6 +183,11 @@ public interface PCollection<S> { PObject<Collection<S>> asCollection(); /** + * @return The first element of this {@code PCollection}. + */ + PObject<S> first(); + + /** * @return A reference to the data in this instance that can be read from a job running * on a cluster. * @@ -267,4 +272,10 @@ public interface PCollection<S> { * Returns a {@code PObject} of the minimum element of this instance. */ PObject<S> min(); + + /** + * Returns a {@code PCollection} that contains the result of aggregating all values in this instance. + */ + PCollection<S> aggregate(Aggregator<S> aggregator); + } http://git-wip-us.apache.org/repos/asf/crunch/blob/9355e74e/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java index cb9c60c..9167863 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java @@ -19,6 +19,8 @@ package org.apache.crunch.impl.dist.collect; import com.google.common.collect.Lists; import com.google.common.collect.Sets; + +import org.apache.crunch.Aggregator; import org.apache.crunch.CachingOptions; import org.apache.crunch.DoFn; import org.apache.crunch.FilterFn; @@ -37,6 +39,7 @@ import org.apache.crunch.impl.dist.DistributedPipeline; import org.apache.crunch.io.ReadableSource; import org.apache.crunch.lib.Aggregate; import org.apache.crunch.materialize.pobject.CollectionPObject; +import org.apache.crunch.materialize.pobject.FirstElementPObject; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; @@ -211,6 +214,9 @@ public abstract class PCollectionImpl<S> implements PCollection<S> { return new CollectionPObject<S>(this); } + @Override + public PObject<S> first() { return new FirstElementPObject<S>(this); } + public SourceTarget<S> getMaterializedAt() { return materializedAt; } @@ -259,6 +265,11 @@ public abstract class PCollectionImpl<S> implements PCollection<S> { public PObject<S> min() { return Aggregate.min(this); } + + @Override + public PCollection<S> aggregate(Aggregator<S> aggregator) { + return Aggregate.aggregate(this, aggregator); + } @Override public PTypeFamily getTypeFamily() { http://git-wip-us.apache.org/repos/asf/crunch/blob/9355e74e/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java index 81433eb..8e509bc 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java @@ -25,6 +25,7 @@ import javassist.util.proxy.MethodFilter; import javassist.util.proxy.MethodHandler; import javassist.util.proxy.ProxyFactory; +import org.apache.crunch.Aggregator; import org.apache.crunch.CachingOptions; import org.apache.crunch.DoFn; import org.apache.crunch.FilterFn; @@ -42,6 +43,7 @@ import org.apache.crunch.impl.mem.MemPipeline; import org.apache.crunch.impl.mem.emit.InMemoryEmitter; import org.apache.crunch.lib.Aggregate; import org.apache.crunch.materialize.pobject.CollectionPObject; +import org.apache.crunch.materialize.pobject.FirstElementPObject; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; @@ -184,6 +186,9 @@ public class MemCollection<S> implements PCollection<S> { } @Override + public PObject<S> first() { return new FirstElementPObject<S>(this); } + + @Override public ReadableData<S> asReadable(boolean materialize) { return new MemReadableData<S>(collect); } @@ -241,6 +246,11 @@ public class MemCollection<S> implements PCollection<S> { } @Override + public PCollection<S> aggregate(Aggregator<S> aggregator) { + return Aggregate.aggregate(this, aggregator); + } + + @Override public PCollection<S> filter(FilterFn<S> filterFn) { return parallelDo(filterFn, getPType()); } http://git-wip-us.apache.org/repos/asf/crunch/blob/9355e74e/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java index d8388b3..3d132d4 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java @@ -23,6 +23,7 @@ import java.util.Comparator; import java.util.List; import java.util.PriorityQueue; +import org.apache.crunch.Aggregator; import org.apache.crunch.CombineFn; import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; @@ -277,4 +278,16 @@ public class Aggregate { } }, tf.collections(collect.getValueType())); } + + public static <S> PCollection<S> aggregate(PCollection<S> collect, Aggregator<S> aggregator) { + PTypeFamily tf = collect.getTypeFamily(); + return collect.parallelDo("Aggregate.aggregator", new MapFn<S, Pair<Boolean, S>>() { + public Pair<Boolean, S> map(S input) { + return Pair.of(false, input); + } + }, tf.tableOf(tf.booleans(), collect.getPType())) + .groupByKey(1) + .combineValues(aggregator) + .values(); + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/9355e74e/crunch-examples/src/main/java/org/apache/crunch/examples/TotalWordCount.java ---------------------------------------------------------------------- diff --git a/crunch-examples/src/main/java/org/apache/crunch/examples/TotalWordCount.java b/crunch-examples/src/main/java/org/apache/crunch/examples/TotalWordCount.java new file mode 100644 index 0000000..374cec1 --- /dev/null +++ b/crunch-examples/src/main/java/org/apache/crunch/examples/TotalWordCount.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.examples; + +import java.io.Serializable; + +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.PCollection; +import org.apache.crunch.PObject; +import org.apache.crunch.PTable; +import org.apache.crunch.Pipeline; +import org.apache.crunch.PipelineResult; +import org.apache.crunch.fn.Aggregators; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +public class TotalWordCount extends Configured implements Tool, Serializable { + public int run(String[] args) throws Exception { + if (args.length != 1) { + System.err.println(); + System.err.println("Usage: " + this.getClass().getName() + " [generic options] input"); + System.err.println(); + GenericOptionsParser.printGenericCommandUsage(System.err); + return 1; + } + // Create an object to coordinate pipeline creation and execution. + Pipeline pipeline = new MRPipeline(TotalWordCount.class, getConf()); + // Reference a given text file as a collection of Strings. + PCollection<String> lines = pipeline.readTextFile(args[0]); + + // Define a function that splits each line in a PCollection of Strings into + // a + // PCollection made up of the individual words in the file. + PCollection<Long> numberOfWords = lines.parallelDo(new DoFn<String, Long>() { + public void process(String line, Emitter<Long> emitter) { + emitter.emit((long)line.split("\\s+").length); + } + }, Writables.longs()); // Indicates the serialization format + + // The aggregate method groups a collection into a single PObject. + PObject<Long> totalCount = numberOfWords.aggregate(Aggregators.SUM_LONGS()).first(); + + // Execute the pipeline as a MapReduce. + PipelineResult result = pipeline.run(); + + System.out.println("Total number of words: " + totalCount.getValue()); + + pipeline.done(); + + return result.succeeded() ? 0 : 1; + } + + public static void main(String[] args) throws Exception { + int result = ToolRunner.run(new Configuration(), new TotalWordCount(), args); + System.exit(result); + } +}
