Repository: crunch Updated Branches: refs/heads/master f8c98a6c6 -> 57235348d
CRUNCH-568: Don't use a null key in the Aggregators.aggregate implementation Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/57235348 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/57235348 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/57235348 Branch: refs/heads/master Commit: 57235348d6628d28c3a869f23aca15888aa377be Parents: f8c98a6 Author: Josh Wills <[email protected]> Authored: Tue Oct 6 14:30:24 2015 -0700 Committer: Josh Wills <[email protected]> Committed: Wed Oct 7 10:36:38 2015 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/crunch/lib/Aggregate.java | 8 ++++---- .../it/java/org/apache/crunch/SparkAggregatorIT.java | 14 ++++++++++++++ 2 files changed, 18 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/57235348/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java index 794caa0..dd4e1db 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java @@ -308,11 +308,11 @@ public class Aggregate { public static <S> PCollection<S> aggregate(PCollection<S> collect, Aggregator<S> aggregator) { PTypeFamily tf = collect.getTypeFamily(); - return collect.parallelDo("Aggregate.aggregator", new MapFn<S, Pair<Void, S>>() { - public Pair<Void, S> map(S input) { - return Pair.of(null, input); + return collect.parallelDo("Aggregate.aggregator", new MapFn<S, Pair<Boolean, S>>() { + public Pair<Boolean, S> map(S input) { + return Pair.of(false, input); } - }, tf.tableOf(tf.nulls(), collect.getPType())) + }, tf.tableOf(tf.booleans(), collect.getPType())) .groupByKey(1) .combineValues(aggregator) .values(); http://git-wip-us.apache.org/repos/asf/crunch/blob/57235348/crunch-spark/src/it/java/org/apache/crunch/SparkAggregatorIT.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkAggregatorIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkAggregatorIT.java index bc6ebea..f55ef8f 100644 --- a/crunch-spark/src/it/java/org/apache/crunch/SparkAggregatorIT.java +++ b/crunch-spark/src/it/java/org/apache/crunch/SparkAggregatorIT.java @@ -19,6 +19,7 @@ package org.apache.crunch; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import org.apache.crunch.fn.Aggregators; import org.apache.crunch.impl.spark.SparkPipeline; import org.apache.crunch.io.From; import org.apache.crunch.test.TemporaryPath; @@ -26,6 +27,8 @@ import org.apache.crunch.types.avro.Avros; import org.junit.Rule; import org.junit.Test; +import java.util.Collection; + import static org.junit.Assert.assertEquals; public class SparkAggregatorIT { @@ -44,6 +47,17 @@ public class SparkAggregatorIT { pipeline.done(); } + @Test + public void testAvroFirstN() throws Exception { + SparkPipeline pipeline = new SparkPipeline("local", "aggregator"); + PCollection<String> set1 = pipeline.read(From.textFile(tempDir.copyResourceFileName("set1.txt"), Avros.strings())); + PCollection<String> set2 = pipeline.read(From.textFile(tempDir.copyResourceFileName("set2.txt"), Avros.strings())); + Aggregator<String> first5 = Aggregators.FIRST_N(5); + Collection<String> aggregate = set1.union(set2).aggregate(first5).asCollection().getValue(); + pipeline.done(); + assertEquals(5, aggregate.size()); + } + private static class CntFn extends MapFn<String, Integer> { @Override public Integer map(String input) {
