Repository: crunch Updated Branches: refs/heads/master 8c35a1399 -> 3ab0b078c
CRUNCH-503: Fix MAX_N and MIN_N aggregators and add MAX_UNIQUE_N and MIN_UNIQUE_N Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/3ab0b078 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/3ab0b078 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/3ab0b078 Branch: refs/heads/master Commit: 3ab0b078c47f23b3ba893fdfb05fd723f663d02b Parents: 8c35a13 Author: Josh Wills <[email protected]> Authored: Thu Mar 19 12:47:11 2015 -0700 Committer: Josh Wills <[email protected]> Committed: Thu Mar 19 15:45:32 2015 -0700 ---------------------------------------------------------------------- .../java/org/apache/crunch/fn/Aggregators.java | 102 +++++++++++++++++-- .../org/apache/crunch/fn/AggregatorsTest.java | 31 +++--- 2 files changed, 109 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/3ab0b078/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java b/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java index e7aeb18..5a9c157 100644 --- a/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java +++ b/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java @@ -22,7 +22,10 @@ import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; +import com.google.common.collect.SortedMultiset; +import com.google.common.collect.TreeMultiset; import org.apache.crunch.Aggregator; import org.apache.crunch.CombineFn; import org.apache.crunch.Emitter; @@ -209,6 +212,17 @@ public final class Aggregators { } /** + * Return the {@code n} largest unique values (or fewer if there are fewer + * values than {@code n}). + * @param n The number of values to return + * @param cls The type of the values to aggregate (must implement {@link Comparable}!) + * @return The newly constructed instance + */ + public static <V extends Comparable<V>> Aggregator<V> MAX_UNIQUE_N(int n, Class<V> cls) { + return new MaxUniqueNAggregator<V>(n); + } + + /** * Return the minimum of all given {@code long} values. * @return The newly constructed instance */ @@ -310,6 +324,16 @@ public final class Aggregators { } /** + * Returns the {@code n} smallest unique values (or fewer if there are fewer unique values than {@code n}). + * @param n The number of values to return + * @param cls The type of the values to aggregate (must implement {@link Comparable}!) + * @return The newly constructed instance + */ + public static <V extends Comparable<V>> Aggregator<V> MIN_UNIQUE_N(int n, Class<V> cls) { + return new MinUniqueNAggregator<V>(n); + } + + /** * Return the first {@code n} values (or fewer if there are fewer values than {@code n}). * * @param n The number of values to return @@ -810,7 +834,7 @@ public final class Aggregators { private static class MaxNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> { private final int arity; - private transient SortedSet<V> elements; + private transient SortedMultiset<V> elements; public MaxNAggregator(int arity) { this.arity = arity; @@ -819,7 +843,7 @@ public final class Aggregators { @Override public void reset() { if (elements == null) { - elements = Sets.newTreeSet(); + elements = TreeMultiset.create(); } else { elements.clear(); } @@ -829,7 +853,40 @@ public final class Aggregators { public void update(V value) { if (elements.size() < arity) { elements.add(value); - } else if (value.compareTo(elements.first()) > 0) { + } else if (value.compareTo(elements.firstEntry().getElement()) > 0) { + elements.remove(elements.firstEntry().getElement()); + elements.add(value); + } + } + + @Override + public Iterable<V> results() { + return ImmutableList.copyOf(elements); + } + } + + private static class MaxUniqueNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> { + private final int arity; + private transient SortedSet<V> elements; + + public MaxUniqueNAggregator(int arity) { + this.arity = arity; + } + + @Override + public void reset() { + if (elements == null) { + elements = new TreeSet<V>(); + } else { + elements.clear(); + } + } + + @Override + public void update(V value) { + if (elements.size() < arity) { + elements.add(value); + } else if (!elements.contains(value) && value.compareTo(elements.first()) > 0) { elements.remove(elements.first()); elements.add(value); } @@ -843,7 +900,7 @@ public final class Aggregators { private static class MinNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> { private final int arity; - private transient SortedSet<V> elements; + private transient SortedMultiset<V> elements; public MinNAggregator(int arity) { this.arity = arity; @@ -852,7 +909,40 @@ public final class Aggregators { @Override public void reset() { if (elements == null) { - elements = Sets.newTreeSet(); + elements = TreeMultiset.create(); + } else { + elements.clear(); + } + } + + @Override + public void update(V value) { + if (elements.size() < arity) { + elements.add(value); + } else if (value.compareTo(elements.lastEntry().getElement()) < 0) { + elements.remove(elements.lastEntry().getElement()); + elements.add(value); + } + } + + @Override + public Iterable<V> results() { + return ImmutableList.copyOf(elements); + } + } + + private static class MinUniqueNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> { + private final int arity; + private transient SortedSet<V> elements; + + public MinUniqueNAggregator(int arity) { + this.arity = arity; + } + + @Override + public void reset() { + if (elements == null) { + elements = new TreeSet<V>(); } else { elements.clear(); } @@ -862,7 +952,7 @@ public final class Aggregators { public void update(V value) { if (elements.size() < arity) { elements.add(value); - } else if (value.compareTo(elements.last()) < 0) { + } else if (!elements.contains(value) && value.compareTo(elements.last()) < 0) { elements.remove(elements.last()); elements.add(value); } http://git-wip-us.apache.org/repos/asf/crunch/blob/3ab0b078/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java b/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java index 6ee1972..9417b08 100644 --- a/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java @@ -17,24 +17,7 @@ */ package org.apache.crunch.fn; -import static org.apache.crunch.fn.Aggregators.MAX_BIGINTS; -import static org.apache.crunch.fn.Aggregators.MAX_DOUBLES; -import static org.apache.crunch.fn.Aggregators.MAX_FLOATS; -import static org.apache.crunch.fn.Aggregators.MAX_INTS; -import static org.apache.crunch.fn.Aggregators.MAX_LONGS; -import static org.apache.crunch.fn.Aggregators.MAX_N; -import static org.apache.crunch.fn.Aggregators.MIN_BIGINTS; -import static org.apache.crunch.fn.Aggregators.MIN_DOUBLES; -import static org.apache.crunch.fn.Aggregators.MIN_FLOATS; -import static org.apache.crunch.fn.Aggregators.MIN_INTS; -import static org.apache.crunch.fn.Aggregators.MIN_LONGS; -import static org.apache.crunch.fn.Aggregators.MIN_N; -import static org.apache.crunch.fn.Aggregators.STRING_CONCAT; -import static org.apache.crunch.fn.Aggregators.SUM_BIGINTS; -import static org.apache.crunch.fn.Aggregators.SUM_DOUBLES; -import static org.apache.crunch.fn.Aggregators.SUM_FLOATS; -import static org.apache.crunch.fn.Aggregators.SUM_INTS; -import static org.apache.crunch.fn.Aggregators.SUM_LONGS; +import static org.apache.crunch.fn.Aggregators.*; import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; @@ -105,6 +88,12 @@ public class AggregatorsTest { assertThat(apply(MAX_INTS(2), 17, 34, 98, 29, 1009), is(ImmutableList.of(98, 1009))); assertThat(apply(MAX_N(1, String.class), "b", "a"), is(ImmutableList.of("b"))); assertThat(apply(MAX_N(3, String.class), "b", "a", "d", "c"), is(ImmutableList.of("b", "c", "d"))); + assertThat(apply(MAX_N(2, Integer.class), 1, 2, 3, 3), is(ImmutableList.of(3, 3))); + } + + @Test + public void testMaxUniqueN() { + assertThat(apply(MAX_UNIQUE_N(2, Integer.class), 1, 2, 3, 3), is(ImmutableList.of(2, 3))); } @Test @@ -112,6 +101,12 @@ public class AggregatorsTest { assertThat(apply(MIN_INTS(2), 17, 34, 98, 29, 1009), is(ImmutableList.of(17, 29))); assertThat(apply(MIN_N(1, String.class), "b", "a"), is(ImmutableList.of("a"))); assertThat(apply(MIN_N(3, String.class), "b", "a", "d", "c"), is(ImmutableList.of("a", "b", "c"))); + assertThat(apply(MIN_N(2, Integer.class), 1, 1, 2, 3), is(ImmutableList.of(1, 1))); + } + + @Test + public void testMinUniqueN() { + assertThat(apply(MIN_UNIQUE_N(2, Integer.class), 3, 2, 1, 1), is(ImmutableList.of(1, 2))); } @Test
