Updated Branches: refs/heads/master 9a1c42760 -> 035b1b91d
CRUNCH-150: Crunch lib function for converting PCollection<Pair<K, V>> to a PTable<K, V>. Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/035b1b91 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/035b1b91 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/035b1b91 Branch: refs/heads/master Commit: 035b1b91d60c1ed5029135d73706ffd54b184a8c Parents: 9a1c427 Author: Josh Wills <[email protected]> Authored: Thu Jan 24 14:55:26 2013 -0800 Committer: Josh Wills <[email protected]> Committed: Tue Jan 29 12:24:29 2013 -0800 ---------------------------------------------------------------------- .../main/java/org/apache/crunch/lib/Distinct.java | 15 +++++ .../main/java/org/apache/crunch/lib/PTables.java | 25 ++++++++ .../main/java/org/apache/crunch/lib/Sample.java | 47 ++++++++++++--- 3 files changed, 79 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/035b1b91/crunch/src/main/java/org/apache/crunch/lib/Distinct.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/Distinct.java b/crunch/src/main/java/org/apache/crunch/lib/Distinct.java index 15f7205..533f3fb 100644 --- a/crunch/src/main/java/org/apache/crunch/lib/Distinct.java +++ b/crunch/src/main/java/org/apache/crunch/lib/Distinct.java @@ -22,6 +22,7 @@ import java.util.Set; import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; @@ -48,6 +49,13 @@ public final class Distinct { } /** + * A {@code PTable<K, V>} analogue of the {@code distinct} function. + */ + public static <K, V> PTable<K, V> distinct(PTable<K, V> input) { + return PTables.asPTable(distinct((PCollection<Pair<K, V>>) input)); + } + + /** * A {@code distinct} operation that gives the client more control over how frequently * elements are flushed to disk in order to allow control over performance or * memory consumption. @@ -66,6 +74,13 @@ public final class Distinct { .parallelDo("post-distinct", new PostDistinctFn<S>(), pt); } + /** + * A {@code PTable<K, V>} analogue of the {@code distinct} function. + */ + public static <K, V> PTable<K, V> distinct(PTable<K, V> input, int flushEvery) { + return PTables.asPTable(distinct((PCollection<Pair<K, V>>) input, flushEvery)); + } + private static class PreDistinctFn<S> extends DoFn<S, Pair<S, Void>> { private final Set<S> values = Sets.newHashSet(); private final int flushEvery; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/035b1b91/crunch/src/main/java/org/apache/crunch/lib/PTables.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/PTables.java b/crunch/src/main/java/org/apache/crunch/lib/PTables.java index e788656..e907680 100644 --- a/crunch/src/main/java/org/apache/crunch/lib/PTables.java +++ b/crunch/src/main/java/org/apache/crunch/lib/PTables.java @@ -25,9 +25,11 @@ import org.apache.crunch.PCollection; import org.apache.crunch.PGroupedTable; import org.apache.crunch.PTable; import org.apache.crunch.Pair; +import org.apache.crunch.fn.IdentityFn; import org.apache.crunch.types.PGroupedTableType; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; +import org.apache.crunch.types.PTypeFamily; import com.google.common.collect.Lists; @@ -37,6 +39,24 @@ import com.google.common.collect.Lists; */ public class PTables { + /** + * Convert the given {@code PCollection<Pair<K, V>>} to a {@code PTable<K, V>}. + * @param pcollect The {@code PCollection} to convert + * @return A {@code PTable} that contains the same data as the input {@code PCollection} + */ + public static <K, V> PTable<K, V> asPTable(PCollection<Pair<K, V>> pcollect) { + PType<Pair<K, V>> pt = pcollect.getPType(); + PTypeFamily ptf = pt.getFamily(); + PTableType<K, V> ptt = ptf.tableOf(pt.getSubTypes().get(0), pt.getSubTypes().get(1)); + DoFn<Pair<K, V>, Pair<K, V>> id = IdentityFn.getInstance(); + return pcollect.parallelDo("asPTable", id, ptt); + } + + /** + * Extract the keys from the given {@code PTable<K, V>} as a {@code PCollection<K>}. + * @param ptable The {@code PTable} + * @return A {@code PCollection<K>} + */ public static <K, V> PCollection<K> keys(PTable<K, V> ptable) { return ptable.parallelDo("PTables.keys", new DoFn<Pair<K, V>, K>() { @Override @@ -46,6 +66,11 @@ public class PTables { }, ptable.getKeyType()); } + /** + * Extract the values from the given {@code PTable<K, V>} as a {@code PCollection<V>}. + * @param ptable The {@code PTable} + * @return A {@code PCollection<V>} + */ public static <K, V> PCollection<V> values(PTable<K, V> ptable) { return ptable.parallelDo("PTables.values", new DoFn<Pair<K, V>, V>() { @Override http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/035b1b91/crunch/src/main/java/org/apache/crunch/lib/Sample.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/Sample.java b/crunch/src/main/java/org/apache/crunch/lib/Sample.java index 54f8731..5be2292 100644 --- a/crunch/src/main/java/org/apache/crunch/lib/Sample.java +++ b/crunch/src/main/java/org/apache/crunch/lib/Sample.java @@ -19,15 +19,16 @@ package org.apache.crunch.lib; import java.util.Random; -import org.apache.crunch.DoFn; -import org.apache.crunch.Emitter; +import org.apache.crunch.FilterFn; import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; import com.google.common.base.Preconditions; public class Sample { - private static class SamplerFn<S> extends DoFn<S, S> { + private static class SamplerFn<S> extends FilterFn<S> { private final long seed; private final double acceptanceProbability; @@ -41,23 +42,53 @@ public class Sample { @Override public void initialize() { - r = new Random(seed); + if (r == null) { + r = new Random(seed); + } } @Override - public void process(S input, Emitter<S> emitter) { - if (r.nextDouble() < acceptanceProbability) { - emitter.emit(input); - } + public boolean accept(S input) { + return r.nextDouble() < acceptanceProbability; } } + /** + * Output records from the given {@code PCollection} with the given probability. + * + * @param input The {@code PCollection} to sample from + * @param probability The probability (0.0 < p < 1.0) + * @return The output {@code PCollection} created from sampling + */ public static <S> PCollection<S> sample(PCollection<S> input, double probability) { return sample(input, System.currentTimeMillis(), probability); } + /** + * Output records from the given {@code PCollection} using a given seed. Useful for unit + * testing. + * + * @param input The {@code PCollection} to sample from + * @param seed The seed + * @param probability The probability (0.0 < p < 1.0) + * @return The output {@code PCollection} created from sampling + */ public static <S> PCollection<S> sample(PCollection<S> input, long seed, double probability) { String stageName = String.format("sample(%.2f)", probability); return input.parallelDo(stageName, new SamplerFn<S>(seed, probability), input.getPType()); } + + /** + * A {@code PTable<K, V>} analogue of the {@code sample} function. + */ + public static <K, V> PTable<K, V> sample(PTable<K, V> input, double probability) { + return PTables.asPTable(sample((PCollection<Pair<K, V>>) input, probability)); + } + + /** + * A {@code PTable<K, V>} analogue of the {@code sample} function. + */ + public static <K, V> PTable<K, V> sample(PTable<K, V> input, long seed, double probability) { + return PTables.asPTable(sample((PCollection<Pair<K, V>>) input, seed, probability)); + } }
