Updated Branches: refs/heads/master fc2d5782a -> b24dc5804
CRUNCH-208: Add mapValues convenience functions for PTable and PGroupedTable as well as a mapKeys function for PTable. Deprecate the MapKeysFn and MapValuesFn in favor of these new methods. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/b24dc580 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/b24dc580 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/b24dc580 Branch: refs/heads/master Commit: b24dc58044f655e456400d91fd10513954b7f654 Parents: fc2d578 Author: Josh Wills <[email protected]> Authored: Mon May 20 00:01:17 2013 -0700 Committer: Josh Wills <[email protected]> Committed: Mon May 20 11:40:23 2013 -0700 ---------------------------------------------------------------------- .../src/it/java/org/apache/crunch/PageRankIT.java | 22 ++-- .../src/it/java/org/apache/crunch/TfIdfIT.java | 24 ++-- .../main/java/org/apache/crunch/PGroupedTable.java | 35 +++++- .../src/main/java/org/apache/crunch/PTable.java | 24 ++++ .../main/java/org/apache/crunch/fn/MapKeysFn.java | 3 + .../java/org/apache/crunch/fn/MapValuesFn.java | 3 + .../crunch/impl/mem/collect/MemGroupedTable.java | 20 +++- .../apache/crunch/impl/mem/collect/MemTable.java | 21 +++ .../crunch/impl/mr/collect/DoCollectionImpl.java | 3 - .../crunch/impl/mr/collect/PGroupedTableImpl.java | 18 +++ .../apache/crunch/impl/mr/collect/PTableBase.java | 21 +++ .../main/java/org/apache/crunch/lib/PTables.java | 99 +++++++++++++++ 12 files changed, 263 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java b/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java index 6291ef8..23c71b3 100644 --- a/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java @@ -119,19 +119,19 @@ public class PageRankIT { } }, ptf.tableOf(ptf.strings(), ptf.floats())); - return input.cogroup(outbound).parallelDo( - new MapFn<Pair<String, Pair<Collection<PageRankData>, Collection<Float>>>, Pair<String, PageRankData>>() { + return input.cogroup(outbound).mapValues( + new MapFn<Pair<Collection<PageRankData>, Collection<Float>>, PageRankData>() { @Override - public Pair<String, PageRankData> map(Pair<String, Pair<Collection<PageRankData>, Collection<Float>>> input) { - PageRankData prd = Iterables.getOnlyElement(input.second().first()); - Collection<Float> propagatedScores = input.second().second(); + public PageRankData map(Pair<Collection<PageRankData>, Collection<Float>> input) { + PageRankData prd = Iterables.getOnlyElement(input.first()); + Collection<Float> propagatedScores = input.second(); float sum = 0.0f; for (Float s : propagatedScores) { sum += s; } - return Pair.of(input.first(), prd.next(d + (1.0f - d) * sum)); + return prd.next(d + (1.0f - d) * sum); } - }, input.getPTableType()); + }, input.getValueType()); } public static void run(Pipeline pipeline, String urlInput, @@ -144,12 +144,12 @@ public class PageRankIT { return Pair.of(urls[0], urls[1]); } }, ptf.tableOf(ptf.strings(), ptf.strings())).groupByKey() - .parallelDo(new MapFn<Pair<String, Iterable<String>>, Pair<String, PageRankData>>() { + .mapValues(new MapFn<Iterable<String>, PageRankData>() { @Override - public Pair<String, PageRankData> map(Pair<String, Iterable<String>> input) { - return Pair.of(input.first(), new PageRankData(1.0f, 0.0f, input.second())); + public PageRankData map(Iterable<String> input) { + return new PageRankData(1.0f, 0.0f, input); } - }, ptf.tableOf(ptf.strings(), prType)); + }, prType); Float delta = 1.0f; while (delta > 0.01) { http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/it/java/org/apache/crunch/TfIdfIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/TfIdfIT.java b/crunch-core/src/it/java/org/apache/crunch/TfIdfIT.java index 218f538..23e45ca 100644 --- a/crunch-core/src/it/java/org/apache/crunch/TfIdfIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/TfIdfIT.java @@ -152,24 +152,22 @@ public class TfIdfIT implements Serializable { * Collection<Pair<title, tfidf>>> */ return joinedResults - .parallelDo( - "calculate tfidf", - new MapFn<Pair<String, Pair<Long, Collection<Pair<String, Long>>>>, Pair<String, Collection<Pair<String, Double>>>>() { + .mapValues( + new MapFn<Pair<Long, Collection<Pair<String, Long>>>, Collection<Pair<String, Double>>>() { @Override - public Pair<String, Collection<Pair<String, Double>>> map( - Pair<String, Pair<Long, Collection<Pair<String, Long>>>> input) { + public Collection<Pair<String, Double>> map( + Pair<Long, Collection<Pair<String, Long>>> input) { Collection<Pair<String, Double>> tfidfs = Lists.newArrayList(); - String word = input.first(); - double n = input.second().first(); + double n = input.first(); double idf = Math.log(N / n); - for (Pair<String, Long> tf : input.second().second()) { + for (Pair<String, Long> tf : input.second()) { double tfidf = tf.second() * idf; tfidfs.add(Pair.of(tf.first(), tfidf)); } - return Pair.of(word, tfidfs); + return tfidfs; } - }, ptf.tableOf(ptf.strings(), ptf.collections(ptf.pairs(ptf.strings(), ptf.doubles())))); + }, ptf.collections(ptf.pairs(ptf.strings(), ptf.doubles()))); } public void run(Pipeline pipeline, PTypeFamily typeFamily, boolean singleRun) throws IOException { @@ -187,13 +185,13 @@ public class TfIdfIT implements Serializable { pipeline.run(); } - PTable<String, Collection<Pair<String, Double>>> uppercased = results.parallelDo( - new MapKeysFn<String, String, Collection<Pair<String, Double>>>() { + PTable<String, Collection<Pair<String, Double>>> uppercased = results.mapKeys( + new MapFn<String, String>() { @Override public String map(String k1) { return k1.toUpperCase(); } - }, results.getPTableType()); + }, results.getKeyType()); pipeline.writeTextFile(uppercased, outputPath2); pipeline.done(); http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java index d77ffdb..68085c6 100644 --- a/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java +++ b/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java @@ -18,10 +18,12 @@ package org.apache.crunch; import org.apache.crunch.Aggregator; +import org.apache.crunch.types.PGroupedTableType; +import org.apache.crunch.types.PType; /** - * The Crunch representation of a grouped {@link PTable}. - * + * The Crunch representation of a grouped {@link PTable}, which corresponds to the output of + * the shuffle phase of a MapReduce job. */ public interface PGroupedTable<K, V> extends PCollection<Pair<K, Iterable<V>>> { @@ -45,9 +47,38 @@ public interface PGroupedTable<K, V> extends PCollection<Pair<K, Iterable<V>>> { PTable<K, V> combineValues(Aggregator<V> aggregator); /** + * Maps the {@code Iterable<V>} elements of each record to a new type. Just like + * any {@code parallelDo} operation on a {@code PGroupedTable}, this may only be + * called once. + * + * @param mapFn The mapping function + * @param ptype The serialization information for the returned data + * @return A new {@code PTable} instance + */ + <U> PTable<K, U> mapValues(MapFn<Iterable<V>, U> mapFn, PType<U> ptype); + + /** + * Maps the {@code Iterable<V>} elements of each record to a new type. Just like + * any {@code parallelDo} operation on a {@code PGroupedTable}, this may only be + * called once. + * + * @param name A name for this operation + * @param mapFn The mapping function + * @param ptype The serialization information for the returned data + * @return A new {@code PTable} instance + */ + <U> PTable<K, U> mapValues(String name, MapFn<Iterable<V>, U> mapFn, PType<U> ptype); + + /** * Convert this grouping back into a multimap. * * @return an ungrouped version of the data in this {@code PGroupedTable}. */ PTable<K, V> ungroup(); + + /** + * Return the {@code PGroupedTableType} containing serialization information for + * this {@code PGroupedTable}. + */ + PGroupedTableType<K, V> getGroupedTableType(); } http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/main/java/org/apache/crunch/PTable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/PTable.java b/crunch-core/src/main/java/org/apache/crunch/PTable.java index 8df9853..738b3cb 100644 --- a/crunch-core/src/main/java/org/apache/crunch/PTable.java +++ b/crunch-core/src/main/java/org/apache/crunch/PTable.java @@ -96,6 +96,30 @@ public interface PTable<K, V> extends PCollection<Pair<K, V>> { PType<V> getValueType(); /** + * Returns a {@code PTable} that has the same keys as this instance, but + * uses the given function to map the values. + */ + <U> PTable<K, U> mapValues(MapFn<V, U> mapFn, PType<U> ptype); + + /** + * Returns a {@code PTable} that has the same keys as this instance, but + * uses the given function to map the values. + */ + <U> PTable<K, U> mapValues(String name, MapFn<V, U> mapFn, PType<U> ptype); + + /** + * Returns a {@code PTable} that has the same values as this instance, but + * uses the given function to map the keys. + */ + <K2> PTable<K2, V> mapKeys(MapFn<K, K2> mapFn, PType<K2> ptype); + + /** + * Returns a {@code PTable} that has the same values as this instance, but + * uses the given function to map the keys. + */ + <K2> PTable<K2, V> mapKeys(String name, MapFn<K, K2> mapFn, PType<K2> ptype); + + /** * Aggregate all of the values with the same key into a single key-value pair * in the returned PTable. */ http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/main/java/org/apache/crunch/fn/MapKeysFn.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/MapKeysFn.java b/crunch-core/src/main/java/org/apache/crunch/fn/MapKeysFn.java index cbaf24d..1dd8130 100644 --- a/crunch-core/src/main/java/org/apache/crunch/fn/MapKeysFn.java +++ b/crunch-core/src/main/java/org/apache/crunch/fn/MapKeysFn.java @@ -21,6 +21,9 @@ import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; import org.apache.crunch.Pair; +/** + * @deprecated Use {@link org.apache.crunch.PTable#mapKeys(org.apache.crunch.MapFn, org.apache.crunch.types.PType)} + */ public abstract class MapKeysFn<K1, K2, V> extends DoFn<Pair<K1, V>, Pair<K2, V>> { @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/main/java/org/apache/crunch/fn/MapValuesFn.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/MapValuesFn.java b/crunch-core/src/main/java/org/apache/crunch/fn/MapValuesFn.java index b90f5ff..9b171f4 100644 --- a/crunch-core/src/main/java/org/apache/crunch/fn/MapValuesFn.java +++ b/crunch-core/src/main/java/org/apache/crunch/fn/MapValuesFn.java @@ -21,6 +21,9 @@ import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; import org.apache.crunch.Pair; +/** + * @deprecated Use {@link org.apache.crunch.PTable#mapValues(org.apache.crunch.MapFn, org.apache.crunch.types.PType)} + */ public abstract class MapValuesFn<K, V1, V2> extends DoFn<Pair<K, V1>, Pair<K, V2>> { @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java index d105bb4..12c17b6 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java @@ -25,6 +25,7 @@ import java.util.TreeMap; import org.apache.crunch.Aggregator; import org.apache.crunch.CombineFn; import org.apache.crunch.GroupingOptions; +import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; import org.apache.crunch.PGroupedTable; import org.apache.crunch.PTable; @@ -32,6 +33,8 @@ import org.apache.crunch.Pair; import org.apache.crunch.Pipeline; import org.apache.crunch.Target; import org.apache.crunch.fn.Aggregators; +import org.apache.crunch.lib.PTables; +import org.apache.crunch.types.PGroupedTableType; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; @@ -74,13 +77,18 @@ class MemGroupedTable<K, V> extends MemCollection<Pair<K, Iterable<V>>> implemen @Override public PType<Pair<K, Iterable<V>>> getPType() { + return getGroupedTableType(); + } + + @Override + public PGroupedTableType<K, V> getGroupedTableType() { PTableType<K, V> parentType = parent.getPTableType(); if (parentType != null) { return parentType.getGroupedTableType(); } return null; } - + @Override public PTypeFamily getTypeFamily() { return parent.getTypeFamily(); @@ -107,6 +115,16 @@ class MemGroupedTable<K, V> extends MemCollection<Pair<K, Iterable<V>>> implemen } @Override + public <U> PTable<K, U> mapValues(MapFn<Iterable<V>, U> mapFn, PType<U> ptype) { + return PTables.mapValues(this, mapFn, ptype); + } + + @Override + public <U> PTable<K, U> mapValues(String name, MapFn<Iterable<V>, U> mapFn, PType<U> ptype) { + return PTables.mapValues(name, this, mapFn, ptype); + } + + @Override public PTable<K, V> ungroup() { return parent; } http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java index f8a5960..99405e6 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.crunch.FilterFn; import org.apache.crunch.GroupingOptions; +import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; import org.apache.crunch.PGroupedTable; import org.apache.crunch.PObject; @@ -129,6 +130,26 @@ public class MemTable<K, V> extends MemCollection<Pair<K, V>> implements PTable< } @Override + public <U> PTable<K, U> mapValues(MapFn<V, U> mapFn, PType<U> ptype) { + return PTables.mapValues(this, mapFn, ptype); + } + + @Override + public <U> PTable<K, U> mapValues(String name, MapFn<V, U> mapFn, PType<U> ptype) { + return PTables.mapValues(name, this, mapFn, ptype); + } + + @Override + public <K2> PTable<K2, V> mapKeys(MapFn<K, K2> mapFn, PType<K2> ptype) { + return PTables.mapKeys(this, mapFn, ptype); + } + + @Override + public <K2> PTable<K2, V> mapKeys(String name, MapFn<K, K2> mapFn, PType<K2> ptype) { + return PTables.mapKeys(name, this, mapFn, ptype); + } + + @Override public PTable<K, V> top(int count) { return Aggregate.top(this, count, true); } http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java index 7b8f2ea..8881e3f 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java @@ -18,16 +18,13 @@ package org.apache.crunch.impl.mr.collect; import java.util.List; -import java.util.Set; import org.apache.crunch.DoFn; import org.apache.crunch.ParallelDoOptions; -import org.apache.crunch.SourceTarget; import org.apache.crunch.impl.mr.plan.DoNode; import org.apache.crunch.types.PType; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; public class DoCollectionImpl<S> extends PCollectionImpl<S> { http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java index ccac5d5..d277b75 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java @@ -27,12 +27,14 @@ import org.apache.crunch.CombineFn; import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; import org.apache.crunch.GroupingOptions; +import org.apache.crunch.MapFn; import org.apache.crunch.PGroupedTable; import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.SourceTarget; import org.apache.crunch.fn.Aggregators; import org.apache.crunch.impl.mr.plan.DoNode; +import org.apache.crunch.lib.PTables; import org.apache.crunch.types.PGroupedTableType; import org.apache.crunch.types.PType; import org.apache.crunch.util.PartitionUtils; @@ -103,11 +105,27 @@ public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V> } } + @Override public PTable<K, V> ungroup() { return parallelDo("ungroup", new Ungroup<K, V>(), parent.getPTableType()); } @Override + public <U> PTable<K, U> mapValues(MapFn<Iterable<V>, U> mapFn, PType<U> ptype) { + return PTables.mapValues(this, mapFn, ptype); + } + + @Override + public <U> PTable<K, U> mapValues(String name, MapFn<Iterable<V>, U> mapFn, PType<U> ptype) { + return PTables.mapValues(name, this, mapFn, ptype); + } + + @Override + public PGroupedTableType<K, V> getGroupedTableType() { + return ptype; + } + + @Override protected void acceptInternal(PCollectionImpl.Visitor visitor) { visitor.visitGroupedTable(this); } http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java index 3c2393d..c477fad 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.crunch.FilterFn; import org.apache.crunch.GroupingOptions; +import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; import org.apache.crunch.PObject; import org.apache.crunch.PTable; @@ -119,6 +120,26 @@ abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K, V>> implements P } @Override + public <U> PTable<K, U> mapValues(MapFn<V, U> mapFn, PType<U> ptype) { + return PTables.mapValues(this, mapFn, ptype); + } + + @Override + public <U> PTable<K, U> mapValues(String name, MapFn<V, U> mapFn, PType<U> ptype) { + return PTables.mapValues(name, this, mapFn, ptype); + } + + @Override + public <K2> PTable<K2, V> mapKeys(MapFn<K, K2> mapFn, PType<K2> ptype) { + return PTables.mapKeys(this, mapFn, ptype); + } + + @Override + public <K2> PTable<K2, V> mapKeys(String name, MapFn<K, K2> mapFn, PType<K2> ptype) { + return PTables.mapKeys(name, this, mapFn, ptype); + } + + @Override public PTable<K, V> top(int count) { return Aggregate.top(this, count, true); } http://git-wip-us.apache.org/repos/asf/crunch/blob/b24dc580/crunch-core/src/main/java/org/apache/crunch/lib/PTables.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/PTables.java b/crunch-core/src/main/java/org/apache/crunch/lib/PTables.java index e907680..e0a3bf3 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/PTables.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/PTables.java @@ -21,11 +21,13 @@ import java.util.List; import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; +import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; import org.apache.crunch.PGroupedTable; import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.fn.IdentityFn; +import org.apache.crunch.fn.PairMapFn; import org.apache.crunch.types.PGroupedTableType; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; @@ -51,6 +53,103 @@ public class PTables { DoFn<Pair<K, V>, Pair<K, V>> id = IdentityFn.getInstance(); return pcollect.parallelDo("asPTable", id, ptt); } + + /** + * Maps a {@code PTable<K1, V>} to a {@code PTable<K2, V>} using the given {@code MapFn<K1, K2>} on + * the keys of the {@code PTable}. + * + * @param ptable The {@code PTable} to be mapped + * @param mapFn The mapping function + * @param ptype The PType for the returned keys + * @return A new {@code PTable<K2, V>} instance + */ + public static <K1, K2, V> PTable<K2, V> mapKeys(PTable<K1, V> ptable, MapFn<K1, K2> mapFn, + PType<K2> ptype) { + return mapKeys("PTables.mapKeys", ptable, mapFn, ptype); + } + + /** + * Maps a {@code PTable<K1, V>} to a {@code PTable<K2, V>} using the given {@code MapFn<K1, K2>} on + * the keys of the {@code PTable}. + * + * @param name The name of the transform + * @param ptable The {@code PTable} to be mapped + * @param mapFn The mapping function + * @param ptype The PType for the returned keys + * @return A new {@code PTable<K2, V>} instance + */ + public static <K1, K2, V> PTable<K2, V> mapKeys(String name, PTable<K1, V> ptable, MapFn<K1, K2> mapFn, + PType<K2> ptype) { + PTypeFamily ptf = ptable.getTypeFamily(); + return ptable.parallelDo(name, + new PairMapFn<K1, V, K2, V>(mapFn, IdentityFn.<V>getInstance()), + ptf.tableOf(ptype, ptable.getValueType())); + } + + /** + * Maps a {@code PTable<K, U>} to a {@code PTable<K, V>} using the given {@code MapFn<U, V>} on + * the values of the {@code PTable}. + * + * @param ptable The {@code PTable} to be mapped + * @param mapFn The mapping function + * @param ptype The PType for the returned values + * @return A new {@code PTable<K, V>} instance + */ + public static <K, U, V> PTable<K, V> mapValues(PTable<K, U> ptable, MapFn<U, V> mapFn, + PType<V> ptype) { + return mapValues("PTables.mapValues", ptable, mapFn, ptype); + } + + /** + * Maps a {@code PTable<K, U>} to a {@code PTable<K, V>} using the given {@code MapFn<U, V>} on + * the values of the {@code PTable}. + * + * @param name The name of the transform + * @param ptable The {@code PTable} to be mapped + * @param mapFn The mapping function + * @param ptype The PType for the returned values + * @return A new {@code PTable<K, V>} instance + */ + public static <K, U, V> PTable<K, V> mapValues(String name, PTable<K, U> ptable, MapFn<U, V> mapFn, + PType<V> ptype) { + PTypeFamily ptf = ptable.getTypeFamily(); + return ptable.parallelDo(name, + new PairMapFn<K, U, K, V>(IdentityFn.<K>getInstance(), mapFn), + ptf.tableOf(ptable.getKeyType(), ptype)); + } + + /** + * An analogue of the {@code mapValues} function for {@code PGroupedTable<K, U>} collections. + * + * @param ptable The {@code PGroupedTable} to be mapped + * @param mapFn The mapping function + * @param ptype The PType for the returned values + * @return A new {@code PTable<K, V>} instance + */ + public static <K, U, V> PTable<K, V> mapValues(PGroupedTable<K, U> ptable, + MapFn<Iterable<U>, V> mapFn, + PType<V> ptype) { + return mapValues("PTables.mapValues", ptable, mapFn, ptype); + } + + /** + * An analogue of the {@code mapValues} function for {@code PGroupedTable<K, U>} collections. + * + * @param name The name of the operation + * @param ptable The {@code PGroupedTable} to be mapped + * @param mapFn The mapping function + * @param ptype The PType for the returned values + * @return A new {@code PTable<K, V>} instance + */ + public static <K, U, V> PTable<K, V> mapValues(String name, + PGroupedTable<K, U> ptable, + MapFn<Iterable<U>, V> mapFn, + PType<V> ptype) { + PTypeFamily ptf = ptable.getTypeFamily(); + return ptable.parallelDo(name, + new PairMapFn<K, Iterable<U>, K, V>(IdentityFn.<K>getInstance(), mapFn), + ptf.tableOf((PType<K>) ptable.getPType().getSubTypes().get(0), ptype)); + } /** * Extract the keys from the given {@code PTable<K, V>} as a {@code PCollection<K>}.
