Repository: crunch Updated Branches: refs/heads/master ca8ff16b6 -> d5e40e339
CRUNCH-642 Enable numReducers option for Distinct operations. Signed-off-by: Josh Wills <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/d5e40e33 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/d5e40e33 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/d5e40e33 Branch: refs/heads/master Commit: d5e40e3393b4fb1e2f3c60d158191ec3e81302f8 Parents: ca8ff16 Author: Xavier Talpe <[email protected]> Authored: Mon Apr 10 15:51:32 2017 +0200 Committer: Josh Wills <[email protected]> Committed: Tue Apr 11 00:12:19 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/crunch/lib/Distinct.java | 34 ++++++++++++++++---- 1 file changed, 27 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/d5e40e33/crunch-core/src/main/java/org/apache/crunch/lib/Distinct.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Distinct.java b/crunch-core/src/main/java/org/apache/crunch/lib/Distinct.java index 8100e09..dd73d37 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/Distinct.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/Distinct.java @@ -45,7 +45,7 @@ public final class Distinct { * @return A new {@code PCollection} that contains the unique elements of the input */ public static <S> PCollection<S> distinct(PCollection<S> input) { - return distinct(input, DEFAULT_FLUSH_EVERY); + return distinct(input, DEFAULT_FLUSH_EVERY, 0); } /** @@ -65,22 +65,42 @@ public final class Distinct { * @return A new {@code PCollection} that contains the unique elements of the input */ public static <S> PCollection<S> distinct(PCollection<S> input, int flushEvery) { + return distinct(input, flushEvery, 0); + } + + /** + * A {@code PTable<K, V>} analogue of the {@code distinct} function. + */ + public static <K, V> PTable<K, V> distinct(PTable<K, V> input, int flushEvery) { + return PTables.asPTable(distinct((PCollection<Pair<K, V>>) input, flushEvery)); + } + + /** + * A {@code distinct} operation that gives the client more control over how frequently + * elements are flushed to disk in order to allow control over performance or + * memory consumption. + * + * @param input The input {@code PCollection} + * @param flushEvery Flush the elements to disk whenever we encounter this many unique values + * @param numReducers The number of reducers to use + * @return A new {@code PCollection} that contains the unique elements of the input + */ + public static <S> PCollection<S> distinct(PCollection<S> input, int flushEvery, int numReducers) { Preconditions.checkArgument(flushEvery > 0); PType<S> pt = input.getPType(); PTypeFamily ptf = pt.getFamily(); return input .parallelDo("pre-distinct", new PreDistinctFn<S>(flushEvery, pt), ptf.tableOf(pt, ptf.nulls())) - .groupByKey() + .groupByKey(numReducers) .parallelDo("post-distinct", new PostDistinctFn<S>(), pt); } - - /** + /** * A {@code PTable<K, V>} analogue of the {@code distinct} function. */ - public static <K, V> PTable<K, V> distinct(PTable<K, V> input, int flushEvery) { - return PTables.asPTable(distinct((PCollection<Pair<K, V>>) input, flushEvery)); + public static <K, V> PTable<K, V> distinct(PTable<K, V> input, int flushEvery, int numReducers) { + return PTables.asPTable(distinct((PCollection<Pair<K, V>>) input, flushEvery, numReducers)); } - + private static class PreDistinctFn<S> extends DoFn<S, Pair<S, Void>> { private final Set<S> values = Sets.newHashSet(); private final int flushEvery;
