Repository: crunch Updated Branches: refs/heads/master 366da2ee0 -> 60b28b12b
CRUNCH-538: Java lambdas for Crunch business logic. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/60b28b12 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/60b28b12 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/60b28b12 Branch: refs/heads/master Commit: 60b28b12b26348a2e17ae5f8023e8f4256cd03fc Parents: 366da2e Author: Josh Wills <[email protected]> Authored: Wed Jul 1 15:20:50 2015 -0700 Committer: Josh Wills <[email protected]> Committed: Thu Aug 27 15:05:42 2015 -0700 ---------------------------------------------------------------------- .../org/apache/crunch/MultiStagePlanningIT.java | 2 +- .../it/java/org/apache/crunch/PageRankIT.java | 2 +- .../it/java/org/apache/crunch/WordCountIT.java | 13 +- .../src/main/java/org/apache/crunch/IDoFn.java | 49 ++++++ .../main/java/org/apache/crunch/IFilterFn.java | 27 ++++ .../main/java/org/apache/crunch/IFlatMapFn.java | 28 ++++ .../src/main/java/org/apache/crunch/IMapFn.java | 27 ++++ .../java/org/apache/crunch/PCollection.java | 96 ++++++++++++ .../src/main/java/org/apache/crunch/PTable.java | 32 +++- .../java/org/apache/crunch/fn/IFnHelpers.java | 149 +++++++++++++++++++ .../impl/dist/collect/PCollectionImpl.java | 89 +++++++++++ .../crunch/impl/dist/collect/PTableBase.java | 29 +++- .../crunch/impl/mem/collect/MemCollection.java | 89 ++++++++++- .../impl/mem/collect/MemGroupedTable.java | 11 -- .../crunch/impl/mem/collect/MemTable.java | 30 +++- 15 files changed, 641 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/60b28b12/crunch-core/src/it/java/org/apache/crunch/MultiStagePlanningIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/MultiStagePlanningIT.java b/crunch-core/src/it/java/org/apache/crunch/MultiStagePlanningIT.java index a7b7d48..38211a7 100644 --- a/crunch-core/src/it/java/org/apache/crunch/MultiStagePlanningIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/MultiStagePlanningIT.java @@ -60,7 +60,7 @@ public class MultiStagePlanningIT implements Serializable { PTable<String, String> addressesTable = pipeline.readTextFile(addressesFile) .parallelDo("Split addresses", new StringToPairMapFn(), tableOf(strings(), strings())) - .filter(new FilterFn<Pair<String, String>>() { + .filter(new IFilterFn<Pair<String, String>>() { @Override public boolean accept(Pair<String, String> input) { // This is odd but it is the simpler way of simulating this would take longer than http://git-wip-us.apache.org/repos/asf/crunch/blob/60b28b12/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java b/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java index 701f78a..b30465d 100644 --- a/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java @@ -130,7 +130,7 @@ public class PageRankIT { }, ptf.tableOf(ptf.strings(), ptf.floats())); return input.cogroup(outbound).mapValues( - new MapFn<Pair<Collection<PageRankData>, Collection<Float>>, PageRankData>() { + new IMapFn<Pair<Collection<PageRankData>, Collection<Float>>, PageRankData>() { @Override public PageRankData map(Pair<Collection<PageRankData>, Collection<Float>> input) { PageRankData prd = Iterables.getOnlyElement(input.first()); http://git-wip-us.apache.org/repos/asf/crunch/blob/60b28b12/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java b/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java index cee6a90..c4e1d58 100644 --- a/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; import java.nio.charset.Charset; +import java.util.Arrays; import java.util.List; import org.apache.crunch.fn.Aggregators; @@ -51,15 +52,15 @@ public class WordCountIT { } public static PTable<String, Long> wordCount(PCollection<String> words, PTypeFamily typeFamily) { - return Aggregate.count(words.parallelDo(new DoFn<String, String>() { - + return Aggregate.count(words.parallelDo(new IDoFn<String, String>() { @Override - public void process(String line, Emitter<String> emitter) { - for (String word : line.split("\\s+")) { - emitter.emit(word); + public void process(Context<String, String> context) { + List<String> words = Arrays.asList(context.element().split("\\s+")); + for (String word : words) { if ("and".equals(word)) { - increment(WordCountStats.ANDS); + context.increment(WordCountStats.ANDS); } + context.emit(word); } } }, typeFamily.strings())); http://git-wip-us.apache.org/repos/asf/crunch/blob/60b28b12/crunch-core/src/main/java/org/apache/crunch/IDoFn.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/IDoFn.java b/crunch-core/src/main/java/org/apache/crunch/IDoFn.java new file mode 100644 index 0000000..b393f43 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/IDoFn.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; + +import java.io.Serializable; + +/** + * A Java lambdas friendly version of the {@link DoFn} class. + */ +public interface IDoFn<S, T> extends Serializable { + + void process(Context<S, T> context); + + public interface Context<S, T> { + S element(); + + void emit(T t); + + TaskInputOutputContext getContext(); + + Configuration getConfiguration(); + + void increment(String groupName, String counterName); + + void increment(String groupName, String counterName, long value); + + void increment(Enum<?> counterName); + + void increment(Enum<?> counterName, long value); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/60b28b12/crunch-core/src/main/java/org/apache/crunch/IFilterFn.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/IFilterFn.java b/crunch-core/src/main/java/org/apache/crunch/IFilterFn.java new file mode 100644 index 0000000..bb8a03d --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/IFilterFn.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import java.io.Serializable; + +/** + * A Java lambdas friendly version of the {@link org.apache.crunch.FilterFn} class. + */ +public interface IFilterFn<S> extends Serializable { + boolean accept(S input); +} http://git-wip-us.apache.org/repos/asf/crunch/blob/60b28b12/crunch-core/src/main/java/org/apache/crunch/IFlatMapFn.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/IFlatMapFn.java b/crunch-core/src/main/java/org/apache/crunch/IFlatMapFn.java new file mode 100644 index 0000000..a2b85c4 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/IFlatMapFn.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import java.io.Serializable; + +/** + * A Java lambdas friendly interface for writing business logic against {@code PCollection}s + * that take in a single input record and return 0 to N output records via an {@code Iterable}. + */ +public interface IFlatMapFn<S, T> extends Serializable { + Iterable<T> process(S input); +} http://git-wip-us.apache.org/repos/asf/crunch/blob/60b28b12/crunch-core/src/main/java/org/apache/crunch/IMapFn.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/IMapFn.java b/crunch-core/src/main/java/org/apache/crunch/IMapFn.java new file mode 100644 index 0000000..3c06d9e --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/IMapFn.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import java.io.Serializable; + +/** + * A Java lambdas friendly version of the {@link org.apache.crunch.MapFn} class. + */ +public interface IMapFn<S, T> extends Serializable { + public T map(S input); +} http://git-wip-us.apache.org/repos/asf/crunch/blob/60b28b12/crunch-core/src/main/java/org/apache/crunch/PCollection.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/PCollection.java b/crunch-core/src/main/java/org/apache/crunch/PCollection.java index a1d507a..5d072e6 100644 --- a/crunch-core/src/main/java/org/apache/crunch/PCollection.java +++ b/crunch-core/src/main/java/org/apache/crunch/PCollection.java @@ -133,6 +133,96 @@ public interface PCollection<S> { ParallelDoOptions options); /** + * Similar to other instances of {@code parallelDo}, but designed for Java lambdas. + */ + <T> PCollection<T> parallelDo(IDoFn<S, T> fn, PType<T> type); + + /** + * Similar to other instances of {@code parallelDo}, but designed for Java lambdas. + */ + <K, V> PTable<K, V> parallelDo(IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type); + + /** + * Similar to other instances of {@code parallelDo}, but designed for Java lambdas. + */ + <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T> type); + + /** + * Similar to other instances of {@code parallelDo}, but designed for Java lambdas. + */ + <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type); + + /** + * Similar to other instances of {@code parallelDo}, but designed for Java lambdas. + */ + <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T> type, ParallelDoOptions options); + + /** + * Similar to other instances of {@code parallelDo}, but designed for Java lambdas. + */ + <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type, ParallelDoOptions options); + + /** + * For each element of this {@code PCollection}, generate 0 to N output values using the + * given {@code IFlatMapFn}. Designed for Java lambdas. + */ + <T> PCollection<T> flatMap(IFlatMapFn<S, T> fn, PType<T> type); + + /** + * For each element of this {@code PCollection}, generate 0 to N output values using the + * given {@code IFlatMapFn}. Designed for Java lambdas. + */ + <K, V> PTable<K, V> flatMap(IFlatMapFn<S, Pair<K, V>> fn, PTableType<K, V> type); + + /** + * For each element of this {@code PCollection}, generate 0 to N output values using the + * given {@code IFlatMapFn}. Designed for Java lambdas. + */ + <T> PCollection<T> flatMap(String name, IFlatMapFn<S, T> fn, PType<T> type); + + /** + * For each element of this {@code PCollection}, generate 0 to N output values using the + * given {@code IFlatMapFn}. Designed for Java lambdas. + */ + <K, V> PTable<K, V> flatMap(String name, IFlatMapFn<S, Pair<K, V>> fn, PTableType<K, V> type); + + /** + * For each element of this {@code PCollection}, generate one output value using the + * given {@code IMapFn}. Designed for Java lambdas. + */ + <T> PCollection<T> map(IMapFn<S, T> fn, PType<T> type); + + /** + * For each element of this {@code PCollection}, generate one output value using the + * given {@code IMapFn}. Designed for Java lambdas. + */ + <K, V> PTable<K, V> map(IMapFn<S, Pair<K, V>> fn, PTableType<K, V> type); + + /** + * For each element of this {@code PCollection}, generate one output value using the + * given {@code IMapFn}. Designed for Java lambdas. + */ + <T> PCollection<T> map(String name, IMapFn<S, T> fn, PType<T> type); + + /** + * For each element of this {@code PCollection}, generate one output value using the + * given {@code IMapFn}. Designed for Java lambdas. + */ + <K, V> PTable<K, V> map(String name, IMapFn<S, Pair<K, V>> fn, PTableType<K, V> type); + + /** + * Filter elements of this {@code PCollection} using the given {@code IFilterFn}. + * Designed for Java lambdas. + */ + PCollection<S> filter(IFilterFn<S> fn); + + /** + * Filter elements of this {@code PCollection} using the given {@code IFilterFn}. + * Designed for Java lambdas. + */ + PCollection<S> filter(String name, IFilterFn<S> fn); + + /** * Write the contents of this {@code PCollection} to the given {@code Target}, * using the storage format specified by the target. * @@ -259,6 +349,12 @@ public interface PCollection<S> { <K> PTable<K, S> by(MapFn<S, K> extractKeyFn, PType<K> keyType); /** + * Apply the given {@code IMapFn} to each element of this instance in order to + * create a {@code PTable}. Designed for use with Java 8 lambdas. + */ + <K> PTable<K, S> by(IMapFn<S, K> extractKeyFn, PType<K> keyType); + + /** * Apply the given map function to each element of this instance in order to * create a {@code PTable}. * http://git-wip-us.apache.org/repos/asf/crunch/blob/60b28b12/crunch-core/src/main/java/org/apache/crunch/PTable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/PTable.java b/crunch-core/src/main/java/org/apache/crunch/PTable.java index 09fe9db..5609c3f 100644 --- a/crunch-core/src/main/java/org/apache/crunch/PTable.java +++ b/crunch-core/src/main/java/org/apache/crunch/PTable.java @@ -107,16 +107,28 @@ public interface PTable<K, V> extends PCollection<Pair<K, V>> { /** * Returns a {@code PTable} that has the same keys as this instance, but + * uses the given function to map the values. Designed for Java lambdas. + */ + <U> PTable<K, U> mapValues(IMapFn<V, U> mapFn, PType<U> ptype); + + /** + * Returns a {@code PTable} that has the same keys as this instance, but * uses the given function to map the values. */ <U> PTable<K, U> mapValues(String name, MapFn<V, U> mapFn, PType<U> ptype); - + /** * Returns a {@code PTable} that has the same values as this instance, but * uses the given function to map the keys. */ <K2> PTable<K2, V> mapKeys(MapFn<K, K2> mapFn, PType<K2> ptype); - + + /** + * Returns a {@code PTable} that has the same values as this instance, but + * uses the given function to map the keys. Designed for Java lambdas. + */ + <K2> PTable<K2, V> mapKeys(IMapFn<K, K2> mapFn, PType<K2> ptype); + /** * Returns a {@code PTable} that has the same values as this instance, but * uses the given function to map the keys. @@ -134,7 +146,13 @@ public interface PTable<K, V> extends PCollection<Pair<K, V>> { * {@code PTable}. */ PTable<K, V> filter(FilterFn<Pair<K, V>> filterFn); - + + /** + * Apply the given filter function to this instance and return the resulting + * {@code PTable}. Designed for Java lambdas. + */ + PTable<K, V> filter(IFilterFn<Pair<K, V>> fn); + /** * Apply the given filter function to this instance and return the resulting * {@code PTable}. @@ -145,7 +163,13 @@ public interface PTable<K, V> extends PCollection<Pair<K, V>> { * The {@code FilterFn} to apply */ PTable<K, V> filter(String name, FilterFn<Pair<K, V>> filterFn); - + + /** + * Apply the given filter function to this instance and return the resulting + * {@code PTable}. Designed for Java lambdas. + */ + PTable<K, V> filter(String name, IFilterFn<Pair<K, V>> fn); + /** * Returns a PTable made up of the pairs in this PTable with the largest value * field. http://git-wip-us.apache.org/repos/asf/crunch/blob/60b28b12/crunch-core/src/main/java/org/apache/crunch/fn/IFnHelpers.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/IFnHelpers.java b/crunch-core/src/main/java/org/apache/crunch/fn/IFnHelpers.java new file mode 100644 index 0000000..8560fab --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/fn/IFnHelpers.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.fn; + +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.FilterFn; +import org.apache.crunch.IDoFn; +import org.apache.crunch.IFilterFn; +import org.apache.crunch.IFlatMapFn; +import org.apache.crunch.IMapFn; +import org.apache.crunch.MapFn; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; + +public class IFnHelpers { + + public static <S, T> DoFn<S, T> wrap(final org.apache.crunch.IDoFn<S, T> fn) { + return new IDoFnWrapper(fn); + } + + public static <S, T> DoFn<S, T> wrapFlatMap(final IFlatMapFn<S, T> fn) { + return new DoFn<S, T>() { + @Override + public void process(S input, Emitter<T> emitter) { + for (T t : fn.process(input)) { + emitter.emit(t); + } + } + }; + } + + public static <S, T> MapFn<S, T> wrapMap(final IMapFn<S, T> fn) { + return new MapFn<S, T>() { + @Override + public T map(S input) { + return fn.map(input); + } + }; + } + + public static <S> FilterFn<S> wrapFilter(final IFilterFn<S> fn) { + return new FilterFn<S>() { + @Override + public boolean accept(S input) { + return fn.accept(input); + } + }; + } + + static class IDoFnWrapper<S, T> extends DoFn<S, T> { + + private final org.apache.crunch.IDoFn<S, T> fn; + private transient ContextImpl<S, T> ctxt; + + public IDoFnWrapper(org.apache.crunch.IDoFn<S, T> fn) { + this.fn = fn; + } + + @Override + public void initialize() { + super.initialize(); + if (getContext() == null) { + this.ctxt = new ContextImpl<S, T>(getConfiguration()); + } else { + this.ctxt = new ContextImpl<S, T>(getContext()); + } + } + + @Override + public void process(S input, Emitter<T> emitter) { + fn.process(ctxt.update(input, emitter)); + } + } + + static class ContextImpl<S, T> implements IDoFn.Context<S, T> { + private S element; + private Emitter<T> emitter; + private TaskInputOutputContext context; + private Configuration conf; + + public ContextImpl(TaskInputOutputContext context) { + this.context = context; + this.conf = context.getConfiguration(); + } + + public ContextImpl(Configuration conf) { + this.context = null; + this.conf = conf; + } + + public ContextImpl update(S element, Emitter<T> emitter) { + this.element = element; + this.emitter = emitter; + return this; + } + + public S element() { + return element; + } + + public void emit(T t) { + emitter.emit(t); + } + + public TaskInputOutputContext getContext() { + return context; + } + + public Configuration getConfiguration() { + return conf; + } + + public void increment(String groupName, String counterName) { + increment(groupName, counterName, 1); + } + + public void increment(String groupName, String counterName, long value) { + if (context != null) { + context.getCounter(groupName, counterName).increment(value); + } + } + + public void increment(Enum<?> counterName) { + increment(counterName, 1); + } + + public void increment(Enum<?> counterName, long value) { + if (context != null) { + context.getCounter(counterName).increment(value); + } + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/60b28b12/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java index 315baf1..2a5e1f5 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java @@ -25,6 +25,10 @@ import org.apache.crunch.CachingOptions; import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.DoFn; import org.apache.crunch.FilterFn; +import org.apache.crunch.IFilterFn; +import org.apache.crunch.IFlatMapFn; +import org.apache.crunch.IDoFn; +import org.apache.crunch.IMapFn; import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; import org.apache.crunch.PObject; @@ -36,6 +40,7 @@ import org.apache.crunch.ReadableData; import org.apache.crunch.SourceTarget; import org.apache.crunch.Target; import org.apache.crunch.fn.ExtractKeyFn; +import org.apache.crunch.fn.IFnHelpers; import org.apache.crunch.fn.IdentityFn; import org.apache.crunch.impl.dist.DistributedPipeline; import org.apache.crunch.io.ReadableSource; @@ -166,6 +171,85 @@ public abstract class PCollectionImpl<S> implements PCollection<S> { return pipeline.getFactory().createDoTable(name, getChainingCollection(), fn, type, options); } + @Override + public <T> PCollection<T> parallelDo(IDoFn<S, T> fn, PType<T> type) { + return parallelDo(IFnHelpers.wrap(fn), type); + } + + @Override + public <K, V> PTable<K, V> parallelDo(IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type) { + return parallelDo(IFnHelpers.wrap(fn), type); + } + + @Override + public <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T> type) { + return parallelDo(name, IFnHelpers.wrap(fn), type); + } + + @Override + public <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type) { + return parallelDo(name, IFnHelpers.wrap(fn), type); + } + + @Override + public <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T> type, ParallelDoOptions options) { + return parallelDo(name, IFnHelpers.wrap(fn), type, options); + } + + @Override + public <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type, ParallelDoOptions options) { + return parallelDo(name, IFnHelpers.wrap(fn), type, options); + } + + @Override + public <T> PCollection<T> flatMap(IFlatMapFn<S, T> fn, PType<T> type) { + return parallelDo(IFnHelpers.wrapFlatMap(fn), type); + } + + @Override + public <K, V> PTable<K, V> flatMap(IFlatMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) { + return parallelDo(IFnHelpers.wrapFlatMap(fn), type); + } + + @Override + public <T> PCollection<T> flatMap(String name, IFlatMapFn<S, T> fn, PType<T> type) { + return parallelDo(name, IFnHelpers.wrapFlatMap(fn), type); + } + + @Override + public <K, V> PTable<K, V> flatMap(String name, IFlatMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) { + return parallelDo(name, IFnHelpers.wrapFlatMap(fn), type); + } + + @Override + public <T> PCollection<T> map(IMapFn<S, T> fn, PType<T> type) { + return parallelDo(IFnHelpers.wrapMap(fn), type); + } + + @Override + public <K, V> PTable<K, V> map(IMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) { + return parallelDo(IFnHelpers.wrapMap(fn), type); + } + + @Override + public <T> PCollection<T> map(String name, IMapFn<S, T> fn, PType<T> type) { + return parallelDo(name, IFnHelpers.wrapMap(fn), type); + } + + @Override + public <K, V> PTable<K, V> map(String name, IMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) { + return parallelDo(name, IFnHelpers.wrapMap(fn), type); + } + + @Override + public PCollection<S> filter(IFilterFn<S> fn) { + return filter(IFnHelpers.wrapFilter(fn)); + } + + @Override + public PCollection<S> filter(String name, IFilterFn<S> fn) { + return filter(name, IFnHelpers.wrapFilter(fn)); + } public PCollection<S> write(Target target) { if (materializedAt != null) { @@ -271,6 +355,11 @@ public abstract class PCollectionImpl<S> implements PCollection<S> { } @Override + public <K> PTable<K, S> by(IMapFn<S, K> mapFn, PType<K> keyType) { + return parallelDo(new ExtractKeyFn<K, S>(IFnHelpers.wrapMap(mapFn)), getTypeFamily().tableOf(keyType, getPType())); + } + + @Override public <K> PTable<K, S> by(String name, MapFn<S, K> mapFn, PType<K> keyType) { return parallelDo(name, new ExtractKeyFn<K, S>(mapFn), getTypeFamily().tableOf(keyType, getPType())); } http://git-wip-us.apache.org/repos/asf/crunch/blob/60b28b12/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java index e81773e..6bc3a41 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java @@ -21,6 +21,8 @@ import com.google.common.collect.Lists; import org.apache.crunch.CachingOptions; import org.apache.crunch.FilterFn; import org.apache.crunch.GroupingOptions; +import org.apache.crunch.IFilterFn; +import org.apache.crunch.IMapFn; import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; import org.apache.crunch.PObject; @@ -29,6 +31,7 @@ import org.apache.crunch.Pair; import org.apache.crunch.ParallelDoOptions; import org.apache.crunch.TableSource; import org.apache.crunch.Target; +import org.apache.crunch.fn.IFnHelpers; import org.apache.crunch.impl.dist.DistributedPipeline; import org.apache.crunch.lib.Aggregate; import org.apache.crunch.lib.Cogroup; @@ -130,12 +133,27 @@ public abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K, V>> imple public PTable<K, V> filter(String name, FilterFn<Pair<K, V>> filterFn) { return parallelDo(name, filterFn, getPTableType()); } - + + @Override + public PTable<K, V> filter(IFilterFn<Pair<K, V>> filterFn) { + return parallelDo(IFnHelpers.wrapFilter(filterFn), getPTableType()); + } + + @Override + public PTable<K, V> filter(String name, IFilterFn<Pair<K, V>> filterFn) { + return parallelDo(name, IFnHelpers.wrapFilter(filterFn), getPTableType()); + } + @Override public <U> PTable<K, U> mapValues(MapFn<V, U> mapFn, PType<U> ptype) { return PTables.mapValues(this, mapFn, ptype); } - + + @Override + public <U> PTable<K, U> mapValues(IMapFn<V, U> mapFn, PType<U> ptype) { + return PTables.mapValues(this, IFnHelpers.wrapMap(mapFn), ptype); + } + @Override public <U> PTable<K, U> mapValues(String name, MapFn<V, U> mapFn, PType<U> ptype) { return PTables.mapValues(name, this, mapFn, ptype); @@ -145,7 +163,12 @@ public abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K, V>> imple public <K2> PTable<K2, V> mapKeys(MapFn<K, K2> mapFn, PType<K2> ptype) { return PTables.mapKeys(this, mapFn, ptype); } - + + @Override + public <K2> PTable<K2, V> mapKeys(IMapFn<K, K2> mapFn, PType<K2> ptype) { + return PTables.mapKeys(this, IFnHelpers.wrapMap(mapFn), ptype); + } + @Override public <K2> PTable<K2, V> mapKeys(String name, MapFn<K, K2> mapFn, PType<K2> ptype) { return PTables.mapKeys(name, this, mapFn, ptype); http://git-wip-us.apache.org/repos/asf/crunch/blob/60b28b12/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java index 55b7821..05bff3f 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java @@ -31,6 +31,10 @@ import org.apache.crunch.Aggregator; import org.apache.crunch.CachingOptions; import org.apache.crunch.DoFn; import org.apache.crunch.FilterFn; +import org.apache.crunch.IFilterFn; +import org.apache.crunch.IFlatMapFn; +import org.apache.crunch.IDoFn; +import org.apache.crunch.IMapFn; import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; import org.apache.crunch.PObject; @@ -42,6 +46,7 @@ import org.apache.crunch.ReadableData; import org.apache.crunch.PipelineCallable; import org.apache.crunch.Target; import org.apache.crunch.fn.ExtractKeyFn; +import org.apache.crunch.fn.IFnHelpers; import org.apache.crunch.impl.mem.MemPipeline; import org.apache.crunch.impl.mem.emit.InMemoryEmitter; import org.apache.crunch.lib.Aggregate; @@ -59,7 +64,6 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; public class MemCollection<S> implements PCollection<S> { @@ -159,6 +163,84 @@ public class MemCollection<S> implements PCollection<S> { } @Override + public <T> PCollection<T> parallelDo(IDoFn<S, T> fn, PType<T> type) { + return parallelDo(null, fn, type); + } + + @Override + public <K, V> PTable<K, V> parallelDo(IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type) { + return parallelDo(null, fn, type); + } + + @Override + public <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T> type) { + return parallelDo(name, fn, type, null); + } + + @Override + public <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type) { + return parallelDo(name, fn, type, null); + } + + @Override + public <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T> type, ParallelDoOptions options) { + return parallelDo(name, IFnHelpers.wrap(fn), type, options); + } + + @Override + public <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type, ParallelDoOptions options) { + return parallelDo(name, IFnHelpers.wrap(fn), type, options); + } + + @Override + public <T> PCollection<T> flatMap(IFlatMapFn<S, T> fn, PType<T> type) { + return parallelDo(IFnHelpers.wrapFlatMap(fn), type); + } + + @Override + public <K, V> PTable<K, V> flatMap(IFlatMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) { + return parallelDo(IFnHelpers.wrapFlatMap(fn), type); + } + + @Override + public <T> PCollection<T> flatMap(String name, IFlatMapFn<S, T> fn, PType<T> type) { + return parallelDo(name, IFnHelpers.wrapFlatMap(fn), type); + } + + @Override + public <K, V> PTable<K, V> flatMap(String name, IFlatMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) { + return parallelDo(name, IFnHelpers.wrapFlatMap(fn), type); + } + + @Override + public <T> PCollection<T> map(IMapFn<S, T> fn, PType<T> type) { + return parallelDo(IFnHelpers.wrapMap(fn), type); + } + + @Override + public <K, V> PTable<K, V> map(IMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) { + return parallelDo(IFnHelpers.wrapMap(fn), type); + } + + @Override + public <T> PCollection<T> map(String name, IMapFn<S, T> fn, PType<T> type) { + return parallelDo(name, IFnHelpers.wrapMap(fn), type); + } + + @Override + public <K, V> PTable<K, V> map(String name, IMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) { + return parallelDo(name, IFnHelpers.wrapMap(fn), type); + } + + public PCollection<S> filter(IFilterFn<S> fn) { + return filter(IFnHelpers.wrapFilter(fn)); + } + + public PCollection<S> filter(String name, IFilterFn<S> fn) { + return filter(name, IFnHelpers.wrapFilter(fn)); + } + + @Override public PCollection<S> write(Target target) { getPipeline().write(this, target); return this; @@ -280,6 +362,11 @@ public class MemCollection<S> implements PCollection<S> { } @Override + public <K> PTable<K, S> by(IMapFn<S, K> mapFn, PType<K> keyType) { + return parallelDo(new ExtractKeyFn<K, S>(IFnHelpers.wrapMap(mapFn)), getTypeFamily().tableOf(keyType, getPType())); + } + + @Override public <K> PTable<K, S> by(String name, MapFn<S, K> mapFn, PType<K> keyType) { return parallelDo(name, new ExtractKeyFn<K, S>(mapFn), getTypeFamily().tableOf(keyType, getPType())); } http://git-wip-us.apache.org/repos/asf/crunch/blob/60b28b12/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java index 0e4516a..f3db972 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java @@ -17,11 +17,6 @@ */ package org.apache.crunch.impl.mem.collect; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; - import org.apache.crunch.Aggregator; import org.apache.crunch.CombineFn; import org.apache.crunch.DoFn; @@ -32,7 +27,6 @@ import org.apache.crunch.PCollection; import org.apache.crunch.PGroupedTable; import org.apache.crunch.PTable; import org.apache.crunch.Pair; -import org.apache.crunch.Pipeline; import org.apache.crunch.Target; import org.apache.crunch.fn.Aggregators; import org.apache.crunch.lib.PTables; @@ -40,11 +34,6 @@ import org.apache.crunch.types.PGroupedTableType; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; -import org.apache.hadoop.io.RawComparator; -import org.apache.hadoop.util.ReflectionUtils; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; class MemGroupedTable<K, V> extends MemCollection<Pair<K, Iterable<V>>> implements PGroupedTable<K, V> { http://git-wip-us.apache.org/repos/asf/crunch/blob/60b28b12/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java index 3f3bd77..03b5a70 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java @@ -18,13 +18,14 @@ package org.apache.crunch.impl.mem.collect; import java.util.Collection; -import java.util.List; import java.util.Map; import com.google.common.collect.ImmutableList; import org.apache.crunch.CachingOptions; import org.apache.crunch.FilterFn; import org.apache.crunch.GroupingOptions; +import org.apache.crunch.IFilterFn; +import org.apache.crunch.IMapFn; import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; import org.apache.crunch.PGroupedTable; @@ -32,6 +33,7 @@ import org.apache.crunch.PObject; import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.Target; +import org.apache.crunch.fn.IFnHelpers; import org.apache.crunch.lib.Aggregate; import org.apache.crunch.lib.Cogroup; import org.apache.crunch.lib.Join; @@ -41,8 +43,6 @@ import org.apache.crunch.materialize.pobject.MapPObject; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; -import com.google.common.collect.Lists; - public class MemTable<K, V> extends MemCollection<Pair<K, V>> implements PTable<K, V> { private PTableType<K, V> ptype; @@ -138,10 +138,25 @@ public class MemTable<K, V> extends MemCollection<Pair<K, V>> implements PTable< } @Override + public PTable<K, V> filter(IFilterFn<Pair<K, V>> filterFn) { + return parallelDo(IFnHelpers.wrapFilter(filterFn), getPTableType()); + } + + @Override + public PTable<K, V> filter(String name, IFilterFn<Pair<K, V>> filterFn) { + return parallelDo(name, IFnHelpers.wrapFilter(filterFn), getPTableType()); + } + + @Override public <U> PTable<K, U> mapValues(MapFn<V, U> mapFn, PType<U> ptype) { return PTables.mapValues(this, mapFn, ptype); } - + + @Override + public <U> PTable<K, U> mapValues(IMapFn<V, U> mapFn, PType<U> ptype) { + return PTables.mapValues(this, IFnHelpers.wrapMap(mapFn), ptype); + } + @Override public <U> PTable<K, U> mapValues(String name, MapFn<V, U> mapFn, PType<U> ptype) { return PTables.mapValues(name, this, mapFn, ptype); @@ -151,7 +166,12 @@ public class MemTable<K, V> extends MemCollection<Pair<K, V>> implements PTable< public <K2> PTable<K2, V> mapKeys(MapFn<K, K2> mapFn, PType<K2> ptype) { return PTables.mapKeys(this, mapFn, ptype); } - + + @Override + public <K2> PTable<K2, V> mapKeys(IMapFn<K, K2> mapFn, PType<K2> ptype) { + return PTables.mapKeys(this, IFnHelpers.wrapMap(mapFn), ptype); + } + @Override public <K2> PTable<K2, V> mapKeys(String name, MapFn<K, K2> mapFn, PType<K2> ptype) { return PTables.mapKeys(name, this, mapFn, ptype);
