Java 8 lambda support for Apache Crunch. Remove lambda support from crunch-core, and instead implement a new module called crunch-lambda. This will allow full use of Java 8 features in implementing support for lambda expressions and method references, without requiring a dependency on Java 8 for crunch-core. Pthings are wrapped into analagous Lthings which can be operated on with an API inspired both by the existing Crunch API and the Java 8 streams API.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/7d7af4ef Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/7d7af4ef Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/7d7af4ef Branch: refs/heads/master Commit: 7d7af4ef43b122cc03cee721b9106d174d71d435 Parents: f8920d3 Author: David Whiting <[email protected]> Authored: Sat Jan 2 11:28:34 2016 +0100 Committer: David Whiting <[email protected]> Committed: Sun Jan 10 21:29:49 2016 +0100 ---------------------------------------------------------------------- .../org/apache/crunch/MultiStagePlanningIT.java | 2 +- .../it/java/org/apache/crunch/PageRankIT.java | 2 +- .../it/java/org/apache/crunch/WordCountIT.java | 12 +- .../src/main/java/org/apache/crunch/IDoFn.java | 49 ---- .../main/java/org/apache/crunch/IFilterFn.java | 27 -- .../main/java/org/apache/crunch/IFlatMapFn.java | 28 --- .../src/main/java/org/apache/crunch/IMapFn.java | 27 -- .../java/org/apache/crunch/PCollection.java | 96 +------- .../java/org/apache/crunch/PGroupedTable.java | 11 - .../src/main/java/org/apache/crunch/PTable.java | 24 -- .../java/org/apache/crunch/fn/IFnHelpers.java | 149 ----------- .../impl/dist/collect/BaseGroupedTable.java | 9 - .../impl/dist/collect/PCollectionImpl.java | 92 +------ .../crunch/impl/dist/collect/PTableBase.java | 22 -- .../crunch/impl/mem/collect/MemCollection.java | 89 ------- .../impl/mem/collect/MemGroupedTable.java | 7 - .../crunch/impl/mem/collect/MemTable.java | 23 -- crunch-dist/pom.xml | 4 + crunch-lambda/pom.xml | 67 +++++ .../org/apache/crunch/lambda/LAggregator.java | 57 +++++ .../org/apache/crunch/lambda/LCollection.java | 244 +++++++++++++++++++ .../crunch/lambda/LCollectionFactory.java | 44 ++++ .../crunch/lambda/LCollectionFactoryImpl.java | 70 ++++++ .../java/org/apache/crunch/lambda/LDoFn.java | 31 +++ .../org/apache/crunch/lambda/LDoFnContext.java | 52 ++++ .../org/apache/crunch/lambda/LDoFnWrapper.java | 106 ++++++++ .../org/apache/crunch/lambda/LGroupedTable.java | 162 ++++++++++++ .../java/org/apache/crunch/lambda/LTable.java | 188 ++++++++++++++ .../java/org/apache/crunch/lambda/Lambda.java | 59 +++++ .../apache/crunch/lambda/fn/SBiConsumer.java | 28 +++ .../apache/crunch/lambda/fn/SBiFunction.java | 28 +++ .../crunch/lambda/fn/SBinaryOperator.java | 28 +++ .../org/apache/crunch/lambda/fn/SConsumer.java | 28 +++ .../org/apache/crunch/lambda/fn/SFunction.java | 28 +++ .../org/apache/crunch/lambda/fn/SPredicate.java | 28 +++ .../org/apache/crunch/lambda/fn/SSupplier.java | 28 +++ .../apache/crunch/lambda/fn/package-info.java | 22 ++ .../org/apache/crunch/lambda/package-info.java | 30 +++ .../apache/crunch/lambda/LCollectionTest.java | 128 ++++++++++ .../apache/crunch/lambda/LGroupedTableTest.java | 103 ++++++++ .../org/apache/crunch/lambda/LTableTest.java | 94 +++++++ .../org/apache/crunch/lambda/TestCommon.java | 34 +++ .../org/apache/crunch/lambda/TypedRecord.java | 52 ++++ pom.xml | 17 ++ 44 files changed, 1771 insertions(+), 658 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/it/java/org/apache/crunch/MultiStagePlanningIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/MultiStagePlanningIT.java b/crunch-core/src/it/java/org/apache/crunch/MultiStagePlanningIT.java index 38211a7..a7b7d48 100644 --- a/crunch-core/src/it/java/org/apache/crunch/MultiStagePlanningIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/MultiStagePlanningIT.java @@ -60,7 +60,7 @@ public class MultiStagePlanningIT implements Serializable { PTable<String, String> addressesTable = pipeline.readTextFile(addressesFile) .parallelDo("Split addresses", new StringToPairMapFn(), tableOf(strings(), strings())) - .filter(new IFilterFn<Pair<String, String>>() { + .filter(new FilterFn<Pair<String, String>>() { @Override public boolean accept(Pair<String, String> input) { // This is odd but it is the simpler way of simulating this would take longer than http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java b/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java index b30465d..701f78a 100644 --- a/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java @@ -130,7 +130,7 @@ public class PageRankIT { }, ptf.tableOf(ptf.strings(), ptf.floats())); return input.cogroup(outbound).mapValues( - new IMapFn<Pair<Collection<PageRankData>, Collection<Float>>, PageRankData>() { + new MapFn<Pair<Collection<PageRankData>, Collection<Float>>, PageRankData>() { @Override public PageRankData map(Pair<Collection<PageRankData>, Collection<Float>> input) { PageRankData prd = Iterables.getOnlyElement(input.first()); http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java b/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java index 4c77c41..e0bd719 100644 --- a/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java @@ -39,6 +39,7 @@ import org.apache.crunch.types.PTypes; import org.apache.crunch.types.avro.AvroTypeFamily; import org.apache.crunch.types.writable.WritableTypeFamily; import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.mapred.ShuffleConsumerPlugin; import org.junit.Rule; import org.junit.Test; @@ -55,17 +56,18 @@ public class WordCountIT { } public static PTable<String, Long> wordCount(PCollection<String> words, PTypeFamily typeFamily) { - return Aggregate.count(words.parallelDo(new IDoFn<String, String>() { + return Aggregate.count(words.parallelDo(new DoFn<String, String>() { @Override - public void process(Context<String, String> context) { - List<String> words = Arrays.asList(context.element().split("\\s+")); + public void process(String input, Emitter<String> emitter) { + List<String> words = Arrays.asList(input.split("\\s+")); for (String word : words) { if ("and".equals(word)) { - context.increment(WordCountStats.ANDS); + increment(WordCountStats.ANDS); } - context.emit(word); + emitter.emit(word); } } + }, typeFamily.strings())); } http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/IDoFn.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/IDoFn.java b/crunch-core/src/main/java/org/apache/crunch/IDoFn.java deleted file mode 100644 index b393f43..0000000 --- a/crunch-core/src/main/java/org/apache/crunch/IDoFn.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.TaskInputOutputContext; - -import java.io.Serializable; - -/** - * A Java lambdas friendly version of the {@link DoFn} class. - */ -public interface IDoFn<S, T> extends Serializable { - - void process(Context<S, T> context); - - public interface Context<S, T> { - S element(); - - void emit(T t); - - TaskInputOutputContext getContext(); - - Configuration getConfiguration(); - - void increment(String groupName, String counterName); - - void increment(String groupName, String counterName, long value); - - void increment(Enum<?> counterName); - - void increment(Enum<?> counterName, long value); - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/IFilterFn.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/IFilterFn.java b/crunch-core/src/main/java/org/apache/crunch/IFilterFn.java deleted file mode 100644 index bb8a03d..0000000 --- a/crunch-core/src/main/java/org/apache/crunch/IFilterFn.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch; - -import java.io.Serializable; - -/** - * A Java lambdas friendly version of the {@link org.apache.crunch.FilterFn} class. - */ -public interface IFilterFn<S> extends Serializable { - boolean accept(S input); -} http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/IFlatMapFn.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/IFlatMapFn.java b/crunch-core/src/main/java/org/apache/crunch/IFlatMapFn.java deleted file mode 100644 index a2b85c4..0000000 --- a/crunch-core/src/main/java/org/apache/crunch/IFlatMapFn.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch; - -import java.io.Serializable; - -/** - * A Java lambdas friendly interface for writing business logic against {@code PCollection}s - * that take in a single input record and return 0 to N output records via an {@code Iterable}. - */ -public interface IFlatMapFn<S, T> extends Serializable { - Iterable<T> process(S input); -} http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/IMapFn.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/IMapFn.java b/crunch-core/src/main/java/org/apache/crunch/IMapFn.java deleted file mode 100644 index 3c06d9e..0000000 --- a/crunch-core/src/main/java/org/apache/crunch/IMapFn.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch; - -import java.io.Serializable; - -/** - * A Java lambdas friendly version of the {@link org.apache.crunch.MapFn} class. - */ -public interface IMapFn<S, T> extends Serializable { - public T map(S input); -} http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/PCollection.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/PCollection.java b/crunch-core/src/main/java/org/apache/crunch/PCollection.java index 5d072e6..8043349 100644 --- a/crunch-core/src/main/java/org/apache/crunch/PCollection.java +++ b/crunch-core/src/main/java/org/apache/crunch/PCollection.java @@ -132,95 +132,6 @@ public interface PCollection<S> { <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type, ParallelDoOptions options); - /** - * Similar to other instances of {@code parallelDo}, but designed for Java lambdas. - */ - <T> PCollection<T> parallelDo(IDoFn<S, T> fn, PType<T> type); - - /** - * Similar to other instances of {@code parallelDo}, but designed for Java lambdas. - */ - <K, V> PTable<K, V> parallelDo(IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type); - - /** - * Similar to other instances of {@code parallelDo}, but designed for Java lambdas. - */ - <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T> type); - - /** - * Similar to other instances of {@code parallelDo}, but designed for Java lambdas. - */ - <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type); - - /** - * Similar to other instances of {@code parallelDo}, but designed for Java lambdas. - */ - <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T> type, ParallelDoOptions options); - - /** - * Similar to other instances of {@code parallelDo}, but designed for Java lambdas. - */ - <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type, ParallelDoOptions options); - - /** - * For each element of this {@code PCollection}, generate 0 to N output values using the - * given {@code IFlatMapFn}. Designed for Java lambdas. - */ - <T> PCollection<T> flatMap(IFlatMapFn<S, T> fn, PType<T> type); - - /** - * For each element of this {@code PCollection}, generate 0 to N output values using the - * given {@code IFlatMapFn}. Designed for Java lambdas. - */ - <K, V> PTable<K, V> flatMap(IFlatMapFn<S, Pair<K, V>> fn, PTableType<K, V> type); - - /** - * For each element of this {@code PCollection}, generate 0 to N output values using the - * given {@code IFlatMapFn}. Designed for Java lambdas. - */ - <T> PCollection<T> flatMap(String name, IFlatMapFn<S, T> fn, PType<T> type); - - /** - * For each element of this {@code PCollection}, generate 0 to N output values using the - * given {@code IFlatMapFn}. Designed for Java lambdas. - */ - <K, V> PTable<K, V> flatMap(String name, IFlatMapFn<S, Pair<K, V>> fn, PTableType<K, V> type); - - /** - * For each element of this {@code PCollection}, generate one output value using the - * given {@code IMapFn}. Designed for Java lambdas. - */ - <T> PCollection<T> map(IMapFn<S, T> fn, PType<T> type); - - /** - * For each element of this {@code PCollection}, generate one output value using the - * given {@code IMapFn}. Designed for Java lambdas. - */ - <K, V> PTable<K, V> map(IMapFn<S, Pair<K, V>> fn, PTableType<K, V> type); - - /** - * For each element of this {@code PCollection}, generate one output value using the - * given {@code IMapFn}. Designed for Java lambdas. - */ - <T> PCollection<T> map(String name, IMapFn<S, T> fn, PType<T> type); - - /** - * For each element of this {@code PCollection}, generate one output value using the - * given {@code IMapFn}. Designed for Java lambdas. - */ - <K, V> PTable<K, V> map(String name, IMapFn<S, Pair<K, V>> fn, PTableType<K, V> type); - - /** - * Filter elements of this {@code PCollection} using the given {@code IFilterFn}. - * Designed for Java lambdas. - */ - PCollection<S> filter(IFilterFn<S> fn); - - /** - * Filter elements of this {@code PCollection} using the given {@code IFilterFn}. - * Designed for Java lambdas. - */ - PCollection<S> filter(String name, IFilterFn<S> fn); /** * Write the contents of this {@code PCollection} to the given {@code Target}, @@ -349,12 +260,6 @@ public interface PCollection<S> { <K> PTable<K, S> by(MapFn<S, K> extractKeyFn, PType<K> keyType); /** - * Apply the given {@code IMapFn} to each element of this instance in order to - * create a {@code PTable}. Designed for use with Java 8 lambdas. - */ - <K> PTable<K, S> by(IMapFn<S, K> extractKeyFn, PType<K> keyType); - - /** * Apply the given map function to each element of this instance in order to * create a {@code PTable}. * @@ -385,4 +290,5 @@ public interface PCollection<S> { * Returns a {@code PCollection} that contains the result of aggregating all values in this instance. */ PCollection<S> aggregate(Aggregator<S> aggregator); + } http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java index 6ac86de..756855c 100644 --- a/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java +++ b/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java @@ -79,17 +79,6 @@ public interface PGroupedTable<K, V> extends PCollection<Pair<K, Iterable<V>>> { */ <U> PTable<K, U> mapValues(MapFn<Iterable<V>, U> mapFn, PType<U> ptype); - - /** - * Maps the {@code Iterable<V>} elements of each record to a new type. Just like - * any {@code parallelDo} operation on a {@code PGroupedTable}, this may only be - * called once. Designed for Java lambdas - * @param mapFn The mapping function (can be lambda/method ref) - * @param ptype The serialization infromation for the returned data - * @return A new {@code PTable} instance - */ - <U> PTable<K, U> mapValues(IMapFn<Iterable<V>, U> mapFn, PType<U> ptype); - /** * Maps the {@code Iterable<V>} elements of each record to a new type. Just like * any {@code parallelDo} operation on a {@code PGroupedTable}, this may only be http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/PTable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/PTable.java b/crunch-core/src/main/java/org/apache/crunch/PTable.java index 5609c3f..74cade8 100644 --- a/crunch-core/src/main/java/org/apache/crunch/PTable.java +++ b/crunch-core/src/main/java/org/apache/crunch/PTable.java @@ -107,12 +107,6 @@ public interface PTable<K, V> extends PCollection<Pair<K, V>> { /** * Returns a {@code PTable} that has the same keys as this instance, but - * uses the given function to map the values. Designed for Java lambdas. - */ - <U> PTable<K, U> mapValues(IMapFn<V, U> mapFn, PType<U> ptype); - - /** - * Returns a {@code PTable} that has the same keys as this instance, but * uses the given function to map the values. */ <U> PTable<K, U> mapValues(String name, MapFn<V, U> mapFn, PType<U> ptype); @@ -125,12 +119,6 @@ public interface PTable<K, V> extends PCollection<Pair<K, V>> { /** * Returns a {@code PTable} that has the same values as this instance, but - * uses the given function to map the keys. Designed for Java lambdas. - */ - <K2> PTable<K2, V> mapKeys(IMapFn<K, K2> mapFn, PType<K2> ptype); - - /** - * Returns a {@code PTable} that has the same values as this instance, but * uses the given function to map the keys. */ <K2> PTable<K2, V> mapKeys(String name, MapFn<K, K2> mapFn, PType<K2> ptype); @@ -149,12 +137,6 @@ public interface PTable<K, V> extends PCollection<Pair<K, V>> { /** * Apply the given filter function to this instance and return the resulting - * {@code PTable}. Designed for Java lambdas. - */ - PTable<K, V> filter(IFilterFn<Pair<K, V>> fn); - - /** - * Apply the given filter function to this instance and return the resulting * {@code PTable}. * * @param name @@ -165,12 +147,6 @@ public interface PTable<K, V> extends PCollection<Pair<K, V>> { PTable<K, V> filter(String name, FilterFn<Pair<K, V>> filterFn); /** - * Apply the given filter function to this instance and return the resulting - * {@code PTable}. Designed for Java lambdas. - */ - PTable<K, V> filter(String name, IFilterFn<Pair<K, V>> fn); - - /** * Returns a PTable made up of the pairs in this PTable with the largest value * field. * http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/fn/IFnHelpers.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/IFnHelpers.java b/crunch-core/src/main/java/org/apache/crunch/fn/IFnHelpers.java deleted file mode 100644 index 8560fab..0000000 --- a/crunch-core/src/main/java/org/apache/crunch/fn/IFnHelpers.java +++ /dev/null @@ -1,149 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.fn; - -import org.apache.crunch.DoFn; -import org.apache.crunch.Emitter; -import org.apache.crunch.FilterFn; -import org.apache.crunch.IDoFn; -import org.apache.crunch.IFilterFn; -import org.apache.crunch.IFlatMapFn; -import org.apache.crunch.IMapFn; -import org.apache.crunch.MapFn; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.TaskInputOutputContext; - -public class IFnHelpers { - - public static <S, T> DoFn<S, T> wrap(final org.apache.crunch.IDoFn<S, T> fn) { - return new IDoFnWrapper(fn); - } - - public static <S, T> DoFn<S, T> wrapFlatMap(final IFlatMapFn<S, T> fn) { - return new DoFn<S, T>() { - @Override - public void process(S input, Emitter<T> emitter) { - for (T t : fn.process(input)) { - emitter.emit(t); - } - } - }; - } - - public static <S, T> MapFn<S, T> wrapMap(final IMapFn<S, T> fn) { - return new MapFn<S, T>() { - @Override - public T map(S input) { - return fn.map(input); - } - }; - } - - public static <S> FilterFn<S> wrapFilter(final IFilterFn<S> fn) { - return new FilterFn<S>() { - @Override - public boolean accept(S input) { - return fn.accept(input); - } - }; - } - - static class IDoFnWrapper<S, T> extends DoFn<S, T> { - - private final org.apache.crunch.IDoFn<S, T> fn; - private transient ContextImpl<S, T> ctxt; - - public IDoFnWrapper(org.apache.crunch.IDoFn<S, T> fn) { - this.fn = fn; - } - - @Override - public void initialize() { - super.initialize(); - if (getContext() == null) { - this.ctxt = new ContextImpl<S, T>(getConfiguration()); - } else { - this.ctxt = new ContextImpl<S, T>(getContext()); - } - } - - @Override - public void process(S input, Emitter<T> emitter) { - fn.process(ctxt.update(input, emitter)); - } - } - - static class ContextImpl<S, T> implements IDoFn.Context<S, T> { - private S element; - private Emitter<T> emitter; - private TaskInputOutputContext context; - private Configuration conf; - - public ContextImpl(TaskInputOutputContext context) { - this.context = context; - this.conf = context.getConfiguration(); - } - - public ContextImpl(Configuration conf) { - this.context = null; - this.conf = conf; - } - - public ContextImpl update(S element, Emitter<T> emitter) { - this.element = element; - this.emitter = emitter; - return this; - } - - public S element() { - return element; - } - - public void emit(T t) { - emitter.emit(t); - } - - public TaskInputOutputContext getContext() { - return context; - } - - public Configuration getConfiguration() { - return conf; - } - - public void increment(String groupName, String counterName) { - increment(groupName, counterName, 1); - } - - public void increment(String groupName, String counterName, long value) { - if (context != null) { - context.getCounter(groupName, counterName).increment(value); - } - } - - public void increment(Enum<?> counterName) { - increment(counterName, 1); - } - - public void increment(Enum<?> counterName, long value) { - if (context != null) { - context.getCounter(counterName).increment(value); - } - } - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java index eb2d829..7bfacdf 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java @@ -25,16 +25,13 @@ import org.apache.crunch.CombineFn; import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; import org.apache.crunch.GroupingOptions; -import org.apache.crunch.IMapFn; import org.apache.crunch.MapFn; import org.apache.crunch.PGroupedTable; import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.ReadableData; -import org.apache.crunch.SourceTarget; import org.apache.crunch.Target; import org.apache.crunch.fn.Aggregators; -import org.apache.crunch.fn.IFnHelpers; import org.apache.crunch.lib.PTables; import org.apache.crunch.types.PGroupedTableType; import org.apache.crunch.types.PType; @@ -121,12 +118,6 @@ public class BaseGroupedTable<K, V> extends PCollectionImpl<Pair<K, Iterable<V>> } @Override - public <U> PTable<K, U> mapValues(IMapFn<Iterable<V>, U> mapFn, PType<U> ptype) { - return PTables.mapValues(this, IFnHelpers.wrapMap(mapFn), ptype); - } - - - @Override public <U> PTable<K, U> mapValues(String name, MapFn<Iterable<V>, U> mapFn, PType<U> ptype) { return PTables.mapValues(name, this, mapFn, ptype); } http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java index 2a5e1f5..7650ff5 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java @@ -25,10 +25,6 @@ import org.apache.crunch.CachingOptions; import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.DoFn; import org.apache.crunch.FilterFn; -import org.apache.crunch.IFilterFn; -import org.apache.crunch.IFlatMapFn; -import org.apache.crunch.IDoFn; -import org.apache.crunch.IMapFn; import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; import org.apache.crunch.PObject; @@ -40,7 +36,6 @@ import org.apache.crunch.ReadableData; import org.apache.crunch.SourceTarget; import org.apache.crunch.Target; import org.apache.crunch.fn.ExtractKeyFn; -import org.apache.crunch.fn.IFnHelpers; import org.apache.crunch.fn.IdentityFn; import org.apache.crunch.impl.dist.DistributedPipeline; import org.apache.crunch.io.ReadableSource; @@ -145,7 +140,7 @@ public abstract class PCollectionImpl<S> implements PCollection<S> { } @Override - public <T> PCollection<T> parallelDo(String name, DoFn<S, T> fn, PType<T> type) { + public <T> PCollection<T> parallelDo(String name, DoFn<S, T> fn, PType<T> type) { return parallelDo(name, fn, type, ParallelDoOptions.builder().build()); } @@ -171,86 +166,6 @@ public abstract class PCollectionImpl<S> implements PCollection<S> { return pipeline.getFactory().createDoTable(name, getChainingCollection(), fn, type, options); } - @Override - public <T> PCollection<T> parallelDo(IDoFn<S, T> fn, PType<T> type) { - return parallelDo(IFnHelpers.wrap(fn), type); - } - - @Override - public <K, V> PTable<K, V> parallelDo(IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type) { - return parallelDo(IFnHelpers.wrap(fn), type); - } - - @Override - public <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T> type) { - return parallelDo(name, IFnHelpers.wrap(fn), type); - } - - @Override - public <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type) { - return parallelDo(name, IFnHelpers.wrap(fn), type); - } - - @Override - public <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T> type, ParallelDoOptions options) { - return parallelDo(name, IFnHelpers.wrap(fn), type, options); - } - - @Override - public <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type, ParallelDoOptions options) { - return parallelDo(name, IFnHelpers.wrap(fn), type, options); - } - - @Override - public <T> PCollection<T> flatMap(IFlatMapFn<S, T> fn, PType<T> type) { - return parallelDo(IFnHelpers.wrapFlatMap(fn), type); - } - - @Override - public <K, V> PTable<K, V> flatMap(IFlatMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) { - return parallelDo(IFnHelpers.wrapFlatMap(fn), type); - } - - @Override - public <T> PCollection<T> flatMap(String name, IFlatMapFn<S, T> fn, PType<T> type) { - return parallelDo(name, IFnHelpers.wrapFlatMap(fn), type); - } - - @Override - public <K, V> PTable<K, V> flatMap(String name, IFlatMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) { - return parallelDo(name, IFnHelpers.wrapFlatMap(fn), type); - } - - @Override - public <T> PCollection<T> map(IMapFn<S, T> fn, PType<T> type) { - return parallelDo(IFnHelpers.wrapMap(fn), type); - } - - @Override - public <K, V> PTable<K, V> map(IMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) { - return parallelDo(IFnHelpers.wrapMap(fn), type); - } - - @Override - public <T> PCollection<T> map(String name, IMapFn<S, T> fn, PType<T> type) { - return parallelDo(name, IFnHelpers.wrapMap(fn), type); - } - - @Override - public <K, V> PTable<K, V> map(String name, IMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) { - return parallelDo(name, IFnHelpers.wrapMap(fn), type); - } - - @Override - public PCollection<S> filter(IFilterFn<S> fn) { - return filter(IFnHelpers.wrapFilter(fn)); - } - - @Override - public PCollection<S> filter(String name, IFilterFn<S> fn) { - return filter(name, IFnHelpers.wrapFilter(fn)); - } - public PCollection<S> write(Target target) { if (materializedAt != null) { getPipeline().write( @@ -355,11 +270,6 @@ public abstract class PCollectionImpl<S> implements PCollection<S> { } @Override - public <K> PTable<K, S> by(IMapFn<S, K> mapFn, PType<K> keyType) { - return parallelDo(new ExtractKeyFn<K, S>(IFnHelpers.wrapMap(mapFn)), getTypeFamily().tableOf(keyType, getPType())); - } - - @Override public <K> PTable<K, S> by(String name, MapFn<S, K> mapFn, PType<K> keyType) { return parallelDo(name, new ExtractKeyFn<K, S>(mapFn), getTypeFamily().tableOf(keyType, getPType())); } http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java index 6bc3a41..4ba4d49 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java @@ -21,8 +21,6 @@ import com.google.common.collect.Lists; import org.apache.crunch.CachingOptions; import org.apache.crunch.FilterFn; import org.apache.crunch.GroupingOptions; -import org.apache.crunch.IFilterFn; -import org.apache.crunch.IMapFn; import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; import org.apache.crunch.PObject; @@ -31,7 +29,6 @@ import org.apache.crunch.Pair; import org.apache.crunch.ParallelDoOptions; import org.apache.crunch.TableSource; import org.apache.crunch.Target; -import org.apache.crunch.fn.IFnHelpers; import org.apache.crunch.impl.dist.DistributedPipeline; import org.apache.crunch.lib.Aggregate; import org.apache.crunch.lib.Cogroup; @@ -135,24 +132,10 @@ public abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K, V>> imple } @Override - public PTable<K, V> filter(IFilterFn<Pair<K, V>> filterFn) { - return parallelDo(IFnHelpers.wrapFilter(filterFn), getPTableType()); - } - - @Override - public PTable<K, V> filter(String name, IFilterFn<Pair<K, V>> filterFn) { - return parallelDo(name, IFnHelpers.wrapFilter(filterFn), getPTableType()); - } - - @Override public <U> PTable<K, U> mapValues(MapFn<V, U> mapFn, PType<U> ptype) { return PTables.mapValues(this, mapFn, ptype); } - @Override - public <U> PTable<K, U> mapValues(IMapFn<V, U> mapFn, PType<U> ptype) { - return PTables.mapValues(this, IFnHelpers.wrapMap(mapFn), ptype); - } @Override public <U> PTable<K, U> mapValues(String name, MapFn<V, U> mapFn, PType<U> ptype) { @@ -165,11 +148,6 @@ public abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K, V>> imple } @Override - public <K2> PTable<K2, V> mapKeys(IMapFn<K, K2> mapFn, PType<K2> ptype) { - return PTables.mapKeys(this, IFnHelpers.wrapMap(mapFn), ptype); - } - - @Override public <K2> PTable<K2, V> mapKeys(String name, MapFn<K, K2> mapFn, PType<K2> ptype) { return PTables.mapKeys(name, this, mapFn, ptype); } http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java index 89671a3..087a31d 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java @@ -21,7 +21,6 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.ObjectInputStream; -import java.io.ObjectStreamClass; import java.lang.reflect.Method; import java.util.Collection; import java.util.Set; @@ -36,10 +35,6 @@ import org.apache.crunch.Aggregator; import org.apache.crunch.CachingOptions; import org.apache.crunch.DoFn; import org.apache.crunch.FilterFn; -import org.apache.crunch.IFilterFn; -import org.apache.crunch.IFlatMapFn; -import org.apache.crunch.IDoFn; -import org.apache.crunch.IMapFn; import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; import org.apache.crunch.PObject; @@ -51,7 +46,6 @@ import org.apache.crunch.ReadableData; import org.apache.crunch.PipelineCallable; import org.apache.crunch.Target; import org.apache.crunch.fn.ExtractKeyFn; -import org.apache.crunch.fn.IFnHelpers; import org.apache.crunch.impl.mem.MemPipeline; import org.apache.crunch.impl.mem.emit.InMemoryEmitter; import org.apache.crunch.lib.Aggregate; @@ -205,84 +199,6 @@ public class MemCollection<S> implements PCollection<S> { } @Override - public <T> PCollection<T> parallelDo(IDoFn<S, T> fn, PType<T> type) { - return parallelDo(null, fn, type); - } - - @Override - public <K, V> PTable<K, V> parallelDo(IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type) { - return parallelDo(null, fn, type); - } - - @Override - public <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T> type) { - return parallelDo(name, fn, type, null); - } - - @Override - public <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type) { - return parallelDo(name, fn, type, null); - } - - @Override - public <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T> type, ParallelDoOptions options) { - return parallelDo(name, IFnHelpers.wrap(fn), type, options); - } - - @Override - public <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type, ParallelDoOptions options) { - return parallelDo(name, IFnHelpers.wrap(fn), type, options); - } - - @Override - public <T> PCollection<T> flatMap(IFlatMapFn<S, T> fn, PType<T> type) { - return parallelDo(IFnHelpers.wrapFlatMap(fn), type); - } - - @Override - public <K, V> PTable<K, V> flatMap(IFlatMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) { - return parallelDo(IFnHelpers.wrapFlatMap(fn), type); - } - - @Override - public <T> PCollection<T> flatMap(String name, IFlatMapFn<S, T> fn, PType<T> type) { - return parallelDo(name, IFnHelpers.wrapFlatMap(fn), type); - } - - @Override - public <K, V> PTable<K, V> flatMap(String name, IFlatMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) { - return parallelDo(name, IFnHelpers.wrapFlatMap(fn), type); - } - - @Override - public <T> PCollection<T> map(IMapFn<S, T> fn, PType<T> type) { - return parallelDo(IFnHelpers.wrapMap(fn), type); - } - - @Override - public <K, V> PTable<K, V> map(IMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) { - return parallelDo(IFnHelpers.wrapMap(fn), type); - } - - @Override - public <T> PCollection<T> map(String name, IMapFn<S, T> fn, PType<T> type) { - return parallelDo(name, IFnHelpers.wrapMap(fn), type); - } - - @Override - public <K, V> PTable<K, V> map(String name, IMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) { - return parallelDo(name, IFnHelpers.wrapMap(fn), type); - } - - public PCollection<S> filter(IFilterFn<S> fn) { - return filter(IFnHelpers.wrapFilter(fn)); - } - - public PCollection<S> filter(String name, IFilterFn<S> fn) { - return filter(name, IFnHelpers.wrapFilter(fn)); - } - - @Override public PCollection<S> write(Target target) { getPipeline().write(this, target); return this; @@ -404,11 +320,6 @@ public class MemCollection<S> implements PCollection<S> { } @Override - public <K> PTable<K, S> by(IMapFn<S, K> mapFn, PType<K> keyType) { - return parallelDo(new ExtractKeyFn<K, S>(IFnHelpers.wrapMap(mapFn)), getTypeFamily().tableOf(keyType, getPType())); - } - - @Override public <K> PTable<K, S> by(String name, MapFn<S, K> mapFn, PType<K> keyType) { return parallelDo(name, new ExtractKeyFn<K, S>(mapFn), getTypeFamily().tableOf(keyType, getPType())); } http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java index 5451533..e8bf5e6 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java @@ -22,7 +22,6 @@ import org.apache.crunch.CombineFn; import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; import org.apache.crunch.GroupingOptions; -import org.apache.crunch.IMapFn; import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; import org.apache.crunch.PGroupedTable; @@ -30,7 +29,6 @@ import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.Target; import org.apache.crunch.fn.Aggregators; -import org.apache.crunch.fn.IFnHelpers; import org.apache.crunch.lib.PTables; import org.apache.crunch.types.PGroupedTableType; import org.apache.crunch.types.PTableType; @@ -125,11 +123,6 @@ class MemGroupedTable<K, V> extends MemCollection<Pair<K, Iterable<V>>> implemen } @Override - public <U> PTable<K, U> mapValues(IMapFn<Iterable<V>, U> mapFn, PType<U> ptype) { - return PTables.mapValues(this, IFnHelpers.wrapMap(mapFn), ptype); - } - - @Override public <U> PTable<K, U> mapValues(String name, MapFn<Iterable<V>, U> mapFn, PType<U> ptype) { return PTables.mapValues(name, this, mapFn, ptype); } http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java index 03b5a70..b90b656 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java @@ -24,8 +24,6 @@ import com.google.common.collect.ImmutableList; import org.apache.crunch.CachingOptions; import org.apache.crunch.FilterFn; import org.apache.crunch.GroupingOptions; -import org.apache.crunch.IFilterFn; -import org.apache.crunch.IMapFn; import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; import org.apache.crunch.PGroupedTable; @@ -33,7 +31,6 @@ import org.apache.crunch.PObject; import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.Target; -import org.apache.crunch.fn.IFnHelpers; import org.apache.crunch.lib.Aggregate; import org.apache.crunch.lib.Cogroup; import org.apache.crunch.lib.Join; @@ -138,26 +135,11 @@ public class MemTable<K, V> extends MemCollection<Pair<K, V>> implements PTable< } @Override - public PTable<K, V> filter(IFilterFn<Pair<K, V>> filterFn) { - return parallelDo(IFnHelpers.wrapFilter(filterFn), getPTableType()); - } - - @Override - public PTable<K, V> filter(String name, IFilterFn<Pair<K, V>> filterFn) { - return parallelDo(name, IFnHelpers.wrapFilter(filterFn), getPTableType()); - } - - @Override public <U> PTable<K, U> mapValues(MapFn<V, U> mapFn, PType<U> ptype) { return PTables.mapValues(this, mapFn, ptype); } @Override - public <U> PTable<K, U> mapValues(IMapFn<V, U> mapFn, PType<U> ptype) { - return PTables.mapValues(this, IFnHelpers.wrapMap(mapFn), ptype); - } - - @Override public <U> PTable<K, U> mapValues(String name, MapFn<V, U> mapFn, PType<U> ptype) { return PTables.mapValues(name, this, mapFn, ptype); } @@ -168,11 +150,6 @@ public class MemTable<K, V> extends MemCollection<Pair<K, V>> implements PTable< } @Override - public <K2> PTable<K2, V> mapKeys(IMapFn<K, K2> mapFn, PType<K2> ptype) { - return PTables.mapKeys(this, IFnHelpers.wrapMap(mapFn), ptype); - } - - @Override public <K2> PTable<K2, V> mapKeys(String name, MapFn<K, K2> mapFn, PType<K2> ptype) { return PTables.mapKeys(name, this, mapFn, ptype); } http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-dist/pom.xml ---------------------------------------------------------------------- diff --git a/crunch-dist/pom.xml b/crunch-dist/pom.xml index b7cd0e9..48d9b05 100644 --- a/crunch-dist/pom.xml +++ b/crunch-dist/pom.xml @@ -61,6 +61,10 @@ under the License. <groupId>org.apache.crunch</groupId> <artifactId>crunch-contrib</artifactId> </dependency> + <dependency> + <groupId>org.apache.crunch</groupId> + <artifactId>crunch-lambda</artifactId> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/pom.xml ---------------------------------------------------------------------- diff --git a/crunch-lambda/pom.xml b/crunch-lambda/pom.xml new file mode 100644 index 0000000..e910517 --- /dev/null +++ b/crunch-lambda/pom.xml @@ -0,0 +1,67 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.crunch</groupId> + <artifactId>crunch-parent</artifactId> + <version>0.14.0-SNAPSHOT</version> + </parent> + <properties> + <java.source.version>1.8</java.source.version> + <java.target.version>1.8</java.target.version> + </properties> + + <artifactId>crunch-lambda</artifactId> + <name>Apache Crunch Lambda</name> + + <dependencies> + <dependency> + <groupId>org.apache.crunch</groupId> + <artifactId>crunch-core</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.crunch</groupId> + <artifactId>crunch-test</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <scope>provided</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LAggregator.java ---------------------------------------------------------------------- diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LAggregator.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LAggregator.java new file mode 100644 index 0000000..5b8611d --- /dev/null +++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LAggregator.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lambda; + +import org.apache.crunch.fn.Aggregators; +import org.apache.crunch.lambda.fn.SBiFunction; +import org.apache.crunch.lambda.fn.SFunction; +import org.apache.crunch.lambda.fn.SSupplier; + +/** + * Crunch Aggregator expressed as a composition of functional interface implementations + * @param <V> Type of values to be aggregated + * @param <A> Type of object which stores objects as they are being aggregated + */ +public class LAggregator<V, A> extends Aggregators.SimpleAggregator<V> { + + private final SSupplier<A> initialSupplier; + private final SBiFunction<A, V, A> combineFn; + private final SFunction<A, Iterable<V>> outputFn; + private A a; + + public LAggregator(SSupplier<A> initialSupplier, SBiFunction<A, V, A> combineFn, SFunction<A, Iterable<V>> outputFn) { + this.initialSupplier = initialSupplier; + this.combineFn = combineFn; + this.outputFn = outputFn; + } + + @Override + public void reset() { + a = initialSupplier.get(); + } + + @Override + public void update(V v) { + a = combineFn.apply(a, v); + } + + @Override + public Iterable<V> results() { + return outputFn.apply(a); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollection.java ---------------------------------------------------------------------- diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollection.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollection.java new file mode 100644 index 0000000..6a8dd62 --- /dev/null +++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollection.java @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lambda; + +import org.apache.crunch.*; +import org.apache.crunch.fn.Aggregators; +import org.apache.crunch.lambda.fn.SFunction; +import org.apache.crunch.lambda.fn.SPredicate; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.PTypeFamily; + +import java.util.Optional; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +/** + * Java 8 friendly version of the {@link PCollection} interface, allowing distributed operations to be expressed in + * terms of lambda expressions and method references, instead of creating a new class implementation for each operation. + * @param <S> The type of the elements in this collection + */ +public interface LCollection<S> { + /** + * Get the underlying {@link PCollection} for this LCollection + */ + PCollection<S> underlying(); + + /** + * Get the {@link LCollectionFactory} which can be used to create new Ltype instances + */ + LCollectionFactory factory(); + + /** + * Transform this LCollection using a standard Crunch {@link DoFn} + */ + default <T> LCollection<T> parallelDo(DoFn<S, T> fn, PType<T> pType) { + return factory().wrap(underlying().parallelDo(fn, pType)); + } + + /** + * Transform this LCollection to an {@link LTable} using a standard Crunch {@link DoFn} + */ + default <K, V> LTable<K, V> parallelDo(DoFn<S, Pair<K, V>> fn, PTableType<K, V> pType) { + return factory().wrap(underlying().parallelDo(fn, pType)); + } + + /** + * Transform this LCollection using a Lambda-friendly {@link LDoFn}. + */ + default <T> LCollection<T> parallelDo(LDoFn<S, T> fn, PType<T> pType) { + return parallelDo(new LDoFnWrapper<>(fn), pType); + } + + /** + * Transform this LCollection using a Lambda-friendly {@link LDoFn}. + */ + default <K, V> LTable<K, V> parallelDo(LDoFn<S, Pair<K, V>> fn, PTableType<K, V> pType) { + return parallelDo(new LDoFnWrapper<>(fn), pType); + } + + /** + * Map the elements of this collection 1-1 through the supplied function. + */ + default <T> LCollection<T> map(SFunction<S, T> fn, PType<T> pType) { + return parallelDo(ctx -> ctx.emit(fn.apply(ctx.element())), pType); + } + + /** + * Map the elements of this collection 1-1 through the supplied function to yield an {@link LTable} + */ + default <K, V> LTable<K, V> map(SFunction<S, Pair<K, V>> fn, PTableType<K, V> pType) { + return parallelDo(ctx -> ctx.emit(fn.apply(ctx.element())), pType); + } + + /** + * Map each element to zero or more output elements using the provided stream-returning function. + */ + default <T> LCollection<T> flatMap(SFunction<S, Stream<T>> fn, PType<T> pType) { + return parallelDo(ctx -> fn.apply(ctx.element()).forEach(ctx::emit), pType); + } + + /** + * Map each element to zero or more output elements using the provided stream-returning function to yield an + * {@link LTable} + */ + default <K, V> LTable<K, V> flatMap(SFunction<S, Stream<Pair<K, V>>> fn, PTableType<K, V> pType) { + return parallelDo(ctx -> fn.apply(ctx.element()).forEach(ctx::emit), pType); + } + + /** + * Combination of a filter and map operation by using a function with {@link Optional} return type. + */ + default <T> LCollection<T> filterMap(SFunction<S, Optional<T>> fn, PType<T> pType) { + return parallelDo(ctx -> fn.apply(ctx.element()).ifPresent(ctx::emit), pType); + } + + /** + * Combination of a filter and map operation by using a function with {@link Optional} return type. + */ + default <K, V> LTable<K, V> filterMap(SFunction<S, Optional<Pair<K, V>>> fn, PTableType<K, V> pType) { + return parallelDo(ctx -> fn.apply(ctx.element()).ifPresent(ctx::emit), pType); + } + + /** + * Filter the collection using the supplied predicate. + */ + default LCollection<S> filter(SPredicate<S> predicate) { + return parallelDo(ctx -> { if (predicate.test(ctx.element())) ctx.emit(ctx.element());}, pType()); + } + + /** + * Union this LCollection with another LCollection of the same type + */ + default LCollection<S> union(LCollection<S> other) { + return factory().wrap(underlying().union(other.underlying())); + } + + /** + * Union this LCollection with a {@link PCollection} of the same type + */ + default LCollection<S> union(PCollection<S> other) { + return factory().wrap(underlying().union(other)); + } + + /** + * Increment a counter for every element in the collection + */ + default LCollection<S> increment(Enum<?> counter) { + return parallelDo(ctx -> ctx.increment(counter), pType()); + } + + /** + * Increment a counter for every element in the collection + */ + default LCollection<S> increment(String counterGroup, String counterName) { + return parallelDo(ctx -> ctx.increment(counterGroup, counterName), pType()); + } + + /** + * Increment a counter for every element satisfying the conditional predicate supplied. + */ + default LCollection<S> incrementIf(Enum<?> counter, SPredicate<S> condition) { + return parallelDo(ctx -> { + if (condition.test(ctx.element())) ctx.increment(counter); + }, pType()); + } + + /** + * Increment a counter for every element satisfying the conditional predicate supplied. + */ + default LCollection<S> incrementIf(String counterGroup, String counterName, SPredicate<S> condition) { + return parallelDo(ctx -> { + if (condition.test(ctx.element())) ctx.increment(counterGroup, counterName); + }, pType()); + } + + /** + * Cache the underlying {@link PCollection} + */ + default LCollection<S> cache() { + underlying().cache(); + return this; + } + + /** + * Cache the underlying {@link PCollection} + */ + default LCollection<S> cache(CachingOptions options) { + underlying().cache(options); + return this; + } + + /** + * Key this LCollection by a key extracted from the element to yield a {@link LTable} mapping the key to the whole + * element. + */ + default <K> LTable<K, S> by(SFunction<S, K> extractFn, PType<K> pType) { + return parallelDo( + ctx -> ctx.emit(Pair.of(extractFn.apply(ctx.element()), ctx.element())), + ptf().tableOf(pType, pType())); + } + + /** + * Count distict values in this LCollection, yielding an {@link LTable} mapping each value to the number + * of occurrences in the collection. + */ + default LTable<S, Long> count() { + return map(a -> Pair.of(a, 1L), ptf().tableOf(pType(), ptf().longs())) + .groupByKey() + .combineValues(Aggregators.SUM_LONGS()); + } + + /** + * Obtain the contents of this LCollection as a {@link Stream} that can be processed locally. Note, this may trigger + * your job to execute in a distributed environment if the pipeline has not yet been run. + */ + default Stream<S> materialize() { + return StreamSupport.stream(underlying().materialize().spliterator(), false); + } + + /** + * Get the {@link PTypeFamily} representing how elements of this collection may be serialized. + */ + default PTypeFamily ptf() { + return underlying().getPType().getFamily(); + } + + /** + * Get the {@link PType} representing how elements of this collection may be serialized. + */ + default PType<S> pType() { return underlying().getPType(); } + + /** + * Write this collection to the specified {@link Target} + */ + default LCollection<S> write(Target target) { + underlying().write(target); + return this; + } + + /** + * Write this collection to the specified {@link Target} with the given {@link org.apache.crunch.Target.WriteMode} + */ + default LCollection<S> write(Target target, Target.WriteMode writeMode) { + underlying().write(target, writeMode); + return this; + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollectionFactory.java ---------------------------------------------------------------------- diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollectionFactory.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollectionFactory.java new file mode 100644 index 0000000..4b208d2 --- /dev/null +++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollectionFactory.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lambda; + +import org.apache.crunch.PCollection; +import org.apache.crunch.PGroupedTable; +import org.apache.crunch.PTable; + +/** + * Factory for creating {@link LCollection}, {@link LTable} and {@link LGroupedTable} objects from their corresponding + * {@link PCollection}, {@link PTable} and {@link PGroupedTable} types. You probably don't want to use or implement this + * interface directly. You should start with the {@link Lambda} class instead. + */ +public interface LCollectionFactory { + /** + * Wrap a PCollection into an LCollection + */ + <S> LCollection<S> wrap(PCollection<S> collection); + + /** + * Wrap a PTable into an LTable + */ + <K, V> LTable<K, V> wrap(PTable<K, V> collection); + + /** + * Wrap a PGroupedTable into an LGroupedTable + */ + <K, V> LGroupedTable<K, V> wrap(PGroupedTable<K, V> collection); +} http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollectionFactoryImpl.java ---------------------------------------------------------------------- diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollectionFactoryImpl.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollectionFactoryImpl.java new file mode 100644 index 0000000..4bedfa7 --- /dev/null +++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollectionFactoryImpl.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lambda; + +import org.apache.crunch.PCollection; +import org.apache.crunch.PGroupedTable; +import org.apache.crunch.PTable; + +class LCollectionFactoryImpl implements LCollectionFactory { + + @Override + public <S> LCollection<S> wrap(final PCollection<S> collection) { + return new LCollection<S>() { + @Override + public PCollection<S> underlying() { + return collection; + } + + @Override + public LCollectionFactory factory() { + return LCollectionFactoryImpl.this; + } + }; + } + + @Override + public <K, V> LTable<K, V> wrap(final PTable<K, V> collection) { + return new LTable<K, V>() { + @Override + public PTable<K, V> underlying() { + return collection; + } + + @Override + public LCollectionFactory factory() { + return LCollectionFactoryImpl.this; + } + }; + } + + @Override + public <K, V> LGroupedTable<K, V> wrap(final PGroupedTable<K, V> collection) { + return new LGroupedTable<K, V>() { + @Override + public PGroupedTable<K, V> underlying() { + return collection; + } + + @Override + public LCollectionFactory factory() { + return LCollectionFactoryImpl.this; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFn.java ---------------------------------------------------------------------- diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFn.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFn.java new file mode 100644 index 0000000..1be8085 --- /dev/null +++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFn.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lambda; + +import org.apache.crunch.DoFn; + +import java.io.Serializable; + +/** + * A Java lambdas friendly version of the {@link DoFn} class. + */ +public interface LDoFn<S, T> extends Serializable { + + void process(LDoFnContext<S, T> context); + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFnContext.java ---------------------------------------------------------------------- diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFnContext.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFnContext.java new file mode 100644 index 0000000..3743a2f --- /dev/null +++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFnContext.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lambda; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; + +/** + * Context object for implementing distributed operations in terms of Lambda expressions. + * @param <S> Input type of LDoFn + * @param <T> Output type of LDoFn + */ +public interface LDoFnContext<S, T> { + /** Get the input element */ + S element(); + + /** Emit t to the output */ + void emit(T t); + + /** Get the underlying {@link TaskInputOutputContext} (for special cases) */ + TaskInputOutputContext getContext(); + + /** Get the current Hadoop {@link Configuration} */ + Configuration getConfiguration(); + + /** Increment a counter by 1 */ + void increment(String groupName, String counterName); + + /** Increment a counter by value */ + void increment(String groupName, String counterName, long value); + + /** Increment a counter by 1 */ + void increment(Enum<?> counterName); + + /** Increment a counter by value */ + void increment(Enum<?> counterName, long value); +} http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFnWrapper.java ---------------------------------------------------------------------- diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFnWrapper.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFnWrapper.java new file mode 100644 index 0000000..76087d5 --- /dev/null +++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFnWrapper.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lambda; + +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; + +class LDoFnWrapper<S, T> extends DoFn<S, T> { + + private final LDoFn<S, T> fn; + private transient Context<S, T> ctxt; + + public LDoFnWrapper(LDoFn<S, T> fn) { + this.fn = fn; + } + + @Override + public void initialize() { + super.initialize(); + if (getContext() == null) { + this.ctxt = new Context<>(getConfiguration()); + } else { + this.ctxt = new Context<>(getContext()); + } + } + + @Override + public void process(S input, Emitter<T> emitter) { + fn.process(ctxt.update(input, emitter)); + } + static class Context<S, T> implements LDoFnContext<S, T> { + private S element; + private Emitter<T> emitter; + private TaskInputOutputContext context; + private Configuration conf; + + public Context(TaskInputOutputContext context) { + this.context = context; + this.conf = context.getConfiguration(); + } + + public Context(Configuration conf) { + this.context = null; + this.conf = conf; + } + + public Context<S, T> update(S element, Emitter<T> emitter) { + this.element = element; + this.emitter = emitter; + return this; + } + + public S element() { + return element; + } + + public void emit(T t) { + emitter.emit(t); + } + + public TaskInputOutputContext getContext() { + return context; + } + + public Configuration getConfiguration() { + return conf; + } + + public void increment(String groupName, String counterName) { + increment(groupName, counterName, 1); + } + + public void increment(String groupName, String counterName, long value) { + if (context != null) { + context.getCounter(groupName, counterName).increment(value); + } + } + + public void increment(Enum<?> counterName) { + increment(counterName, 1); + } + + public void increment(Enum<?> counterName, long value) { + if (context != null) { + context.getCounter(counterName).increment(value); + } + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LGroupedTable.java ---------------------------------------------------------------------- diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LGroupedTable.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LGroupedTable.java new file mode 100644 index 0000000..10209e0 --- /dev/null +++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LGroupedTable.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lambda; + +import org.apache.crunch.Aggregator; +import org.apache.crunch.PGroupedTable; +import org.apache.crunch.Pair; +import org.apache.crunch.lambda.fn.SBiConsumer; +import org.apache.crunch.lambda.fn.SBiFunction; +import org.apache.crunch.lambda.fn.SBinaryOperator; +import org.apache.crunch.lambda.fn.SFunction; +import org.apache.crunch.lambda.fn.SSupplier; +import org.apache.crunch.types.PType; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; +/** + * Java 8 friendly version of the {@link PGroupedTable} interface, allowing distributed operations to be expressed in + * terms of lambda expressions and method references, instead of creating a new class implementation for each operation. + * @param <K> key type for this table + * @param <V> value type for this table + */ +public interface LGroupedTable<K, V> extends LCollection<Pair<K, Iterable<V>>> { + /** + * Get the underlying {@link PGroupedTable} for this LGroupedTable + */ + PGroupedTable<K, V> underlying(); + + /** + * Combine the value part of the table using the provided Crunch {@link Aggregator}. This will be optimised into + * both a combine and reduce in the MapReduce implementation, with similar optimisations available for other + * implementations. + */ + default LTable<K, V> combineValues(Aggregator<V> aggregator) { + return factory().wrap(underlying().combineValues(aggregator)); + } + + /** + * Combine the value part of the table using the given functions. The supplier is used to create a new aggregating + * type, the combineFn adds a value into the aggregate, and the output function transforms the aggregate into + * an iterable of the original value type. For example, summation can be expressed as follows: + * + * <pre>{@code myGroupedTable.combineValues(() -> 0, (sum, value) -> sum + value, Collections::singleton) }</pre> + * + * <p>This will be optimised into both a combine and reduce in the MapReduce implementation, with similar + * optimizations *available for other implementations.</p> + */ + default <A> LTable<K, V> combineValues( + SSupplier<A> initialSupplier, + SBiFunction<A, V, A> combineFn, + SFunction<A, Iterable<V>> outputFn) { + return combineValues(new LAggregator<>(initialSupplier, combineFn, outputFn)); + } + + /** + * Map the values in this LGroupedTable using a custom function. This function operates over a stream which can + * be consumed only once. + * + * <p>Note that in serialization systems which heavily reuse objects (such as Avro), you may + * in fact get given the same object multiple times with different data as you consume the stream, meaning it may + * be necessary to detach values.</p> + */ + default <T> LTable<K, T> mapValues(SFunction<Stream<V>, T> fn, PType<T> pType) { + return parallelDo( + ctx -> ctx.emit(Pair.of( + ctx.element().first(), + fn.apply(StreamSupport.stream(ctx.element().second().spliterator(), false))) + ), ptf().tableOf(keyType(), pType)); + } + + /** + * Collect the values into an aggregate type. This differs from combineValues in that it outputs the aggregate type + * rather than the value type, and is designed to happen in one step (rather than being optimised into multiple + * levels). This makes it much more suitable for assembling collections than computing simple numeric aggregates. + * + * <p>The supplier provides an "empty" object, then the consumer is called with each value. For example, to collect + * all values into a {@link Collection}, one can do this:</p> + * <pre>{@code + * lgt.collectValues(ArrayList::new, Collection::add, lgt.ptf().collections(lgt.valueType())) + * }</pre> + * + * <p>This is in fact the default implementation for the collectAllValues() method.</p> + * + * <p>Note that in serialization systems which heavily reuse objects (such as Avro), you may + * in fact get given the same object multiple times with different data as you consume the stream, meaning it may + * be necessary to detach values.</p> + */ + default <C> LTable<K, C> collectValues(SSupplier<C> emptySupplier, SBiConsumer<C, V> addFn, PType<C> pType) { + return parallelDo(ctx -> { + C coll = emptySupplier.get(); + ctx.element().second().forEach(v -> addFn.accept(coll, v)); + ctx.emit(Pair.of(ctx.element().first(), coll)); + }, ptf().tableOf(keyType(), pType)); + } + + /** + * Collect all values for each key into a {@link Collection} + */ + default LTable<K, Collection<V>> collectAllValues() { + return collectValues(ArrayList::new, Collection::add, ptf().collections(valueType())); + } + + /** + * Collect all unique values for each key into a {@link Collection} (note that the value type must have a correctly- + * defined equals() and hashcode(). + */ + default LTable<K, Collection<V>> collectUniqueValues() { + return collectValues(HashSet::new, Collection::add, ptf().collections(valueType())); + } + + /** + * Reduce the values for each key using the an associative binary operator. + * For example {@code reduceValues((a, b) -> a + b)} for summation, {@code reduceValues((a, b) -> a + ", " + b} + * for comma-separated string concatenation and {@code reduceValues((a, b) -> a > b ? a : b} for maximum value. + */ + default LTable<K, V> reduceValues(SBinaryOperator<V> operator) { + return combineValues(() -> (V)null, (a, b) -> a == null ? b : operator.apply(a, b), Collections::singleton); + } + + /** + * Ungroup this LGroupedTable back into an {@link LTable}. This will still trigger a "reduce" operation, so is + * usually only used in special cases like producing a globally-ordered list by feeding the everything through + * a single reducers. + */ + default LTable<K, V> ungroup() { + return factory().wrap(underlying().ungroup()); + } + + /** + * Get a {@link PType} which can be used to serialize the key part of this grouped table + */ + default PType<K> keyType() { + return underlying().getGroupedTableType().getTableType().getKeyType(); + } + + /** + * Get a {@link PType} which can be used to serialize the value part of this grouped table + */ + default PType<V> valueType() { + return underlying().getGroupedTableType().getTableType().getValueType(); + } + +}
