Repository: crunch Updated Branches: refs/heads/master afc171fb7 -> ce9aaa3a5
CRUNCH-642 Enable GroupingOptions for Distinct operations. This fixes the existing call for numReducers as it was not working as intended for non-memory PCollections due to using an invalid amount of numReducers. To increase flexibility when using the API, another call was added that allow to directly pass the GroupingOptions. Signed-off-by: Josh Wills <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ce9aaa3a Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ce9aaa3a Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ce9aaa3a Branch: refs/heads/master Commit: ce9aaa3a532a56ea52a698fcc259c2c3d5a21a6c Parents: afc171f Author: Xavier Talpe <[email protected]> Authored: Thu Apr 13 07:52:43 2017 +0200 Committer: Josh Wills <[email protected]> Committed: Wed Apr 12 23:13:55 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/crunch/lib/DistinctIT.java | 58 ++++++++++++++++++++ .../java/org/apache/crunch/lib/Distinct.java | 32 ++++++++++- crunch-test/src/main/resources/list.txt | 6 ++ 3 files changed, 94 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/ce9aaa3a/crunch-core/src/it/java/org/apache/crunch/lib/DistinctIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/DistinctIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/DistinctIT.java new file mode 100644 index 0000000..aba26fd --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/lib/DistinctIT.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import java.io.IOException; + +import org.apache.crunch.PCollection; +import org.apache.crunch.Pipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.From; +import org.apache.crunch.test.CrunchTestSupport; +import org.apache.hadoop.fs.Path; +import org.junit.Test; + +import com.google.common.collect.Lists; + +import static org.junit.Assert.assertEquals; + +public class DistinctIT extends CrunchTestSupport { + + @Test + public void testDistinct() throws IOException { + Pipeline p = new MRPipeline(DistinctIT.class, tempDir.getDefaultConfiguration()); + Path inputPath = tempDir.copyResourcePath("list.txt"); + PCollection<String> in = p.read(From.textFile(inputPath)); + + PCollection<String> distinct = Distinct.distinct(in); + + assertEquals(Lists.newArrayList("a", "b", "c", "d"), Lists.newArrayList(distinct.materialize())); + } + + @Test + public void testDistinctWithExplicitNumReducers() throws IOException { + Pipeline p = new MRPipeline(DistinctIT.class, tempDir.getDefaultConfiguration()); + Path inputPath = tempDir.copyResourcePath("list.txt"); + PCollection<String> in = p.read(From.textFile(inputPath)); + + PCollection<String> distinct = Distinct.distinct(in, 50, 1); + + assertEquals(Lists.newArrayList("a", "b", "c", "d"), Lists.newArrayList(distinct.materialize())); + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/ce9aaa3a/crunch-core/src/main/java/org/apache/crunch/lib/Distinct.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Distinct.java b/crunch-core/src/main/java/org/apache/crunch/lib/Distinct.java index dd73d37..6b55329 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/Distinct.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/Distinct.java @@ -21,6 +21,7 @@ import java.util.Set; import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; +import org.apache.crunch.GroupingOptions; import org.apache.crunch.PCollection; import org.apache.crunch.PTable; import org.apache.crunch.Pair; @@ -45,7 +46,7 @@ public final class Distinct { * @return A new {@code PCollection} that contains the unique elements of the input */ public static <S> PCollection<S> distinct(PCollection<S> input) { - return distinct(input, DEFAULT_FLUSH_EVERY, 0); + return distinct(input, DEFAULT_FLUSH_EVERY, GroupingOptions.builder().build()); } /** @@ -65,7 +66,7 @@ public final class Distinct { * @return A new {@code PCollection} that contains the unique elements of the input */ public static <S> PCollection<S> distinct(PCollection<S> input, int flushEvery) { - return distinct(input, flushEvery, 0); + return distinct(input, flushEvery, GroupingOptions.builder().build()); } /** @@ -101,6 +102,33 @@ public final class Distinct { return PTables.asPTable(distinct((PCollection<Pair<K, V>>) input, flushEvery, numReducers)); } + /** + * A {@code distinct} operation that gives the client more control over how frequently + * elements are flushed to disk in order to allow control over performance or + * memory consumption. + * + * @param input The input {@code PCollection} + * @param flushEvery Flush the elements to disk whenever we encounter this many unique values + * @param options Options to provide finer control on how grouping is performed. + * @return A new {@code PCollection} that contains the unique elements of the input + */ + public static <S> PCollection<S> distinct(PCollection<S> input, int flushEvery, GroupingOptions options) { + Preconditions.checkArgument(flushEvery > 0); + PType<S> pt = input.getPType(); + PTypeFamily ptf = pt.getFamily(); + return input + .parallelDo("pre-distinct", new PreDistinctFn<S>(flushEvery, pt), ptf.tableOf(pt, ptf.nulls())) + .groupByKey(options) + .parallelDo("post-distinct", new PostDistinctFn<S>(), pt); + } + + /** + * A {@code PTable<K, V>} analogue of the {@code distinct} function. + */ + public static <K, V> PTable<K, V> distinct(PTable<K, V> input, int flushEvery, GroupingOptions options) { + return PTables.asPTable(distinct((PCollection<Pair<K, V>>) input, flushEvery, options)); + } + private static class PreDistinctFn<S> extends DoFn<S, Pair<S, Void>> { private final Set<S> values = Sets.newHashSet(); private final int flushEvery; http://git-wip-us.apache.org/repos/asf/crunch/blob/ce9aaa3a/crunch-test/src/main/resources/list.txt ---------------------------------------------------------------------- diff --git a/crunch-test/src/main/resources/list.txt b/crunch-test/src/main/resources/list.txt new file mode 100644 index 0000000..aad3ed8 --- /dev/null +++ b/crunch-test/src/main/resources/list.txt @@ -0,0 +1,6 @@ +a +b +c +a +d +c
