CRUNCH-118: Add aggregator/lib patterns for aggregating the unique elements of a collection
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/a71871d3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/a71871d3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/a71871d3 Branch: refs/heads/master Commit: a71871d36de81d5af3f6d1bd604792e7a4313f5e Parents: 711eaed Author: Josh Wills <[email protected]> Authored: Wed Nov 21 09:31:59 2012 -0800 Committer: Matthias Friedrich <[email protected]> Committed: Sat Nov 24 10:05:17 2012 +0100 ---------------------------------------------------------------------- .../java/org/apache/crunch/fn/Aggregators.java | 50 +++++++- .../main/java/org/apache/crunch/lib/Distinct.java | 100 +++++++++++++++ .../java/org/apache/crunch/fn/AggregatorsTest.java | 9 +- .../java/org/apache/crunch/lib/DistinctTest.java | 37 ++++++ 4 files changed, 192 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a71871d3/crunch/src/main/java/org/apache/crunch/fn/Aggregators.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/fn/Aggregators.java b/crunch/src/main/java/org/apache/crunch/fn/Aggregators.java index 9ee0de7..5364d62 100644 --- a/crunch/src/main/java/org/apache/crunch/fn/Aggregators.java +++ b/crunch/src/main/java/org/apache/crunch/fn/Aggregators.java @@ -20,6 +20,7 @@ package org.apache.crunch.fn; import java.math.BigInteger; import java.util.LinkedList; import java.util.List; +import java.util.Set; import java.util.SortedSet; import org.apache.crunch.Aggregator; @@ -326,7 +327,7 @@ public final class Aggregators { public static <V> Aggregator<V> LAST_N(int n) { return new LastNAggregator<V>(n); } - + /** * Concatenate strings, with a separator between strings. There * is no limits of length for the concatenated string. @@ -340,7 +341,7 @@ public final class Aggregators { * define if we should skip null values. Throw * NullPointerException if set to false and there is a null * value. - * @return The newly constructed instance instance + * @return The newly constructed instance */ public static Aggregator<String> STRING_CONCAT(String separator, boolean skipNull) { return new StringConcatAggregator(separator, skipNull); @@ -371,7 +372,7 @@ public final class Aggregators { * the maximum length of the input strings. If it's set <= 0, * there is no limit. The number of characters of the input string * will be < maxInputLength to be concatenated. - * @return The newly constructed instance instance + * @return The newly constructed instance */ public static Aggregator<String> STRING_CONCAT(String separator, boolean skipNull, long maxOutputLength, long maxInputLength) { @@ -379,6 +380,17 @@ public final class Aggregators { } /** + * Collect the unique elements of the input, as defined by the {@code equals} method for + * the input objects. No guarantees are made about the order in which the final elements + * will be returned. + * + * @return The newly constructed instance + */ + public static <V> Aggregator<V> UNIQUE_ELEMENTS() { + return new SetAggregator<V>(); + } + + /** * Apply separate aggregators to each component of a {@link Pair}. */ public static <V1, V2> Aggregator<Pair<V1, V2>> pairAggregator( @@ -1052,4 +1064,36 @@ public final class Aggregators { } } + private static class SetAggregator<V> extends SimpleAggregator<V> { + private final Set<V> elements; + private final int sizeLimit; + + public SetAggregator() { + this(-1); + } + + public SetAggregator(int sizeLimit) { + this.elements = Sets.newHashSet(); + this.sizeLimit = sizeLimit; + } + + @Override + public void reset() { + elements.clear(); + } + + @Override + public void update(V value) { + elements.add(value); + if (sizeLimit > 0 && elements.size() > sizeLimit) { + elements.iterator().remove(); + } + } + + @Override + public Iterable<V> results() { + return ImmutableList.copyOf(elements); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a71871d3/crunch/src/main/java/org/apache/crunch/lib/Distinct.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/Distinct.java b/crunch/src/main/java/org/apache/crunch/lib/Distinct.java new file mode 100644 index 0000000..fcf7b7e --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/lib/Distinct.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import java.util.Set; + +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.PCollection; +import org.apache.crunch.Pair; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.PTypeFamily; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; + +/** + * Functions for computing the distinct elements of a {@code PCollection}. + */ +public class Distinct { + + private static final int DEFAULT_FLUSH_EVERY = 50000; + + /** + * Construct a new {@code PCollection} that contains the unique elements of a + * given input {@code PCollection}. + * + * @param input The input {@code PCollection} + * @return A new {@code PCollection} that contains the unique elements of the input + */ + public static <S> PCollection<S> distinct(PCollection<S> input) { + return distinct(input, DEFAULT_FLUSH_EVERY); + } + + /** + * A {@code distinct} operation that gives the client more control over how frequently + * elements are flushed to disk in order to allow control over performance or + * memory consumption. + * + * @param input The input {@code PCollection} + * @param flushEvery Flush the elements to disk whenever we encounter this many unique values + * @return A new {@code PCollection} that contains the unique elements of the input + */ + public static <S> PCollection<S> distinct(PCollection<S> input, int flushEvery) { + Preconditions.checkArgument(flushEvery > 0); + PType<S> pt = input.getPType(); + PTypeFamily ptf = pt.getFamily(); + return input + .parallelDo("pre-distinct", new PreDistinctFn<S>(flushEvery), ptf.tableOf(pt, ptf.nulls())) + .groupByKey() + .parallelDo("post-distinct", new PostDistinctFn<S>(), pt); + } + + private static class PreDistinctFn<S> extends DoFn<S, Pair<S, Void>> { + private final Set<S> values = Sets.newHashSet(); + private final int flushEvery; + + public PreDistinctFn(int flushEvery) { + this.flushEvery = flushEvery; + } + + @Override + public void process(S input, Emitter<Pair<S, Void>> emitter) { + values.add(input); + if (values.size() > flushEvery) { + cleanup(emitter); + } + } + + @Override + public void cleanup(Emitter<Pair<S, Void>> emitter) { + for (S in : values) { + emitter.emit(Pair.<S, Void>of(in, null)); + } + values.clear(); + } + } + + private static class PostDistinctFn<S> extends DoFn<Pair<S, java.lang.Iterable<java.lang.Void>>, S> { + @Override + public void process(Pair<S, Iterable<Void>> input, Emitter<S> emitter) { + emitter.emit(input.first()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a71871d3/crunch/src/test/java/org/apache/crunch/fn/AggregatorsTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/fn/AggregatorsTest.java b/crunch/src/test/java/org/apache/crunch/fn/AggregatorsTest.java index 4ebb872..bd63653 100644 --- a/crunch/src/test/java/org/apache/crunch/fn/AggregatorsTest.java +++ b/crunch/src/test/java/org/apache/crunch/fn/AggregatorsTest.java @@ -55,6 +55,7 @@ import org.junit.Test; import com.google.common.base.Function; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; @@ -122,7 +123,13 @@ public class AggregatorsTest { public void testLastN() { assertThat(apply(Aggregators.<Integer>LAST_N(2), 17, 34, 98, 29, 1009), is(ImmutableList.of(29, 1009))); } - + + @Test + public void testUniqueElements() { + assertThat(ImmutableSet.copyOf(apply(Aggregators.<Integer>UNIQUE_ELEMENTS(), 17, 29, 29, 16, 17)), + is(ImmutableSet.of(17, 29, 16))); + } + @Test public void testPairs() { List<Pair<Long, Double>> input = ImmutableList.of(Pair.of(1720L, 17.29), Pair.of(9L, -3.14)); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a71871d3/crunch/src/test/java/org/apache/crunch/lib/DistinctTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/lib/DistinctTest.java b/crunch/src/test/java/org/apache/crunch/lib/DistinctTest.java new file mode 100644 index 0000000..4c9d816 --- /dev/null +++ b/crunch/src/test/java/org/apache/crunch/lib/DistinctTest.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib; + +import static org.junit.Assert.assertEquals; + +import org.apache.crunch.PCollection; +import org.apache.crunch.impl.mem.MemPipeline; +import org.apache.crunch.types.avro.Avros; +import org.junit.Test; + +import com.google.common.collect.ImmutableSet; + +public class DistinctTest { + @Test + public void testDistinct() { + PCollection<Integer> input = MemPipeline.typedCollectionOf(Avros.ints(), + 17, 29, 17, 29, 17, 29, 36, 45, 17, 45, 36, 29); + Iterable<Integer> unique = Distinct.distinct(input).materialize(); + assertEquals(ImmutableSet.of(17, 29, 36, 45), ImmutableSet.copyOf(unique)); + } +}
