Repository: crunch Updated Branches: refs/heads/master 775de6cd8 -> f8920d355
CRUNCH-584: Add missing mapValues in PGroupedTable for Java 8 lambdas Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/f8920d35 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/f8920d35 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/f8920d35 Branch: refs/heads/master Commit: f8920d355e17cc30a6cff3325d9ac1c3786003bb Parents: 775de6c Author: David Whiting <[email protected]> Authored: Thu Dec 10 17:20:35 2015 +0100 Committer: David Whiting <[email protected]> Committed: Thu Dec 10 17:30:07 2015 +0100 ---------------------------------------------------------------------- .../src/main/java/org/apache/crunch/PGroupedTable.java | 11 +++++++++++ .../crunch/impl/dist/collect/BaseGroupedTable.java | 8 ++++++++ .../apache/crunch/impl/mem/collect/MemGroupedTable.java | 9 ++++++++- 3 files changed, 27 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/f8920d35/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java index 14bdb32..6ac86de 100644 --- a/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java +++ b/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java @@ -78,6 +78,17 @@ public interface PGroupedTable<K, V> extends PCollection<Pair<K, Iterable<V>>> { * @return A new {@code PTable} instance */ <U> PTable<K, U> mapValues(MapFn<Iterable<V>, U> mapFn, PType<U> ptype); + + + /** + * Maps the {@code Iterable<V>} elements of each record to a new type. Just like + * any {@code parallelDo} operation on a {@code PGroupedTable}, this may only be + * called once. Designed for Java lambdas + * @param mapFn The mapping function (can be lambda/method ref) + * @param ptype The serialization infromation for the returned data + * @return A new {@code PTable} instance + */ + <U> PTable<K, U> mapValues(IMapFn<Iterable<V>, U> mapFn, PType<U> ptype); /** * Maps the {@code Iterable<V>} elements of each record to a new type. Just like http://git-wip-us.apache.org/repos/asf/crunch/blob/f8920d35/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java index d87c8f5..eb2d829 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java @@ -25,6 +25,7 @@ import org.apache.crunch.CombineFn; import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; import org.apache.crunch.GroupingOptions; +import org.apache.crunch.IMapFn; import org.apache.crunch.MapFn; import org.apache.crunch.PGroupedTable; import org.apache.crunch.PTable; @@ -33,6 +34,7 @@ import org.apache.crunch.ReadableData; import org.apache.crunch.SourceTarget; import org.apache.crunch.Target; import org.apache.crunch.fn.Aggregators; +import org.apache.crunch.fn.IFnHelpers; import org.apache.crunch.lib.PTables; import org.apache.crunch.types.PGroupedTableType; import org.apache.crunch.types.PType; @@ -119,6 +121,12 @@ public class BaseGroupedTable<K, V> extends PCollectionImpl<Pair<K, Iterable<V>> } @Override + public <U> PTable<K, U> mapValues(IMapFn<Iterable<V>, U> mapFn, PType<U> ptype) { + return PTables.mapValues(this, IFnHelpers.wrapMap(mapFn), ptype); + } + + + @Override public <U> PTable<K, U> mapValues(String name, MapFn<Iterable<V>, U> mapFn, PType<U> ptype) { return PTables.mapValues(name, this, mapFn, ptype); } http://git-wip-us.apache.org/repos/asf/crunch/blob/f8920d35/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java index f3db972..5451533 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java @@ -22,6 +22,7 @@ import org.apache.crunch.CombineFn; import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; import org.apache.crunch.GroupingOptions; +import org.apache.crunch.IMapFn; import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; import org.apache.crunch.PGroupedTable; @@ -29,6 +30,7 @@ import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.Target; import org.apache.crunch.fn.Aggregators; +import org.apache.crunch.fn.IFnHelpers; import org.apache.crunch.lib.PTables; import org.apache.crunch.types.PGroupedTableType; import org.apache.crunch.types.PTableType; @@ -121,7 +123,12 @@ class MemGroupedTable<K, V> extends MemCollection<Pair<K, Iterable<V>>> implemen public <U> PTable<K, U> mapValues(MapFn<Iterable<V>, U> mapFn, PType<U> ptype) { return PTables.mapValues(this, mapFn, ptype); } - + + @Override + public <U> PTable<K, U> mapValues(IMapFn<Iterable<V>, U> mapFn, PType<U> ptype) { + return PTables.mapValues(this, IFnHelpers.wrapMap(mapFn), ptype); + } + @Override public <U> PTable<K, U> mapValues(String name, MapFn<Iterable<V>, U> mapFn, PType<U> ptype) { return PTables.mapValues(name, this, mapFn, ptype);
