Repository: crunch Updated Branches: refs/heads/apache-crunch-0.8 1101f913b -> 74c18d453
CRUNCH-393 Handle object reuse in Aggregate.top Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/74c18d45 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/74c18d45 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/74c18d45 Branch: refs/heads/apache-crunch-0.8 Commit: 74c18d4535f9735897fb855bfb61924301eb812b Parents: 1101f91 Author: Gabriel Reid <[email protected]> Authored: Tue May 13 23:40:15 2014 +0200 Committer: Gabriel Reid <[email protected]> Committed: Tue May 13 23:43:02 2014 +0200 ---------------------------------------------------------------------- .../java/org/apache/crunch/lib/AggregateIT.java | 21 +++++++++++++ .../java/org/apache/crunch/lib/Aggregate.java | 32 ++++++++++++++++---- 2 files changed, 47 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/74c18d45/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java index 56ee3ac..1408c73 100644 --- a/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.Collection; +import java.util.List; import java.util.Map; import org.apache.crunch.MapFn; @@ -33,7 +34,9 @@ import org.apache.crunch.Pair; import org.apache.crunch.Pipeline; import org.apache.crunch.impl.mem.MemPipeline; import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.From; import org.apache.crunch.test.Employee; +import org.apache.crunch.test.StringWrapper; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; import org.apache.crunch.types.PTableType; @@ -139,6 +142,24 @@ public class AggregateIT { } @Test + public void testTopN_MRPipeline() throws IOException { + Pipeline pipeline = new MRPipeline(AggregateIT.class, tmpDir.getDefaultConfiguration()); + PTable<StringWrapper, String> entries = pipeline + .read(From.textFile(tmpDir.copyResourceFileName("set1.txt"), Avros.strings())) + .by(new StringWrapper.StringToStringWrapperMapFn(), Avros.reflects(StringWrapper.class)); + PTable<StringWrapper, String> topEntries = Aggregate.top(entries, 3, true); + List<Pair<StringWrapper, String>> expectedTop3 = Lists.newArrayList( + Pair.of(StringWrapper.wrap("e"), "e"), + Pair.of(StringWrapper.wrap("c"), "c"), + Pair.of(StringWrapper.wrap("b"), "b")); + + assertEquals( + expectedTop3, + Lists.newArrayList(topEntries.materialize())); + + } + + @Test public void testCollectValues_Writables() throws IOException { Pipeline pipeline = new MRPipeline(AggregateIT.class, tmpDir.getDefaultConfiguration()); Map<Integer, Collection<Text>> collectionMap = pipeline.readTextFile(tmpDir.copyResourceFileName("set2.txt")) http://git-wip-us.apache.org/repos/asf/crunch/blob/74c18d45/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java index 3d132d4..7a71646 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java @@ -110,19 +110,22 @@ public class Aggregate { private final int limit; private final boolean maximize; + private final PType<Pair<K, V>> pairType; private transient PriorityQueue<Pair<K, V>> values; - public TopKFn(int limit, boolean ascending) { + public TopKFn(int limit, boolean ascending, PType<Pair<K, V>> pairType) { this.limit = limit; this.maximize = ascending; + this.pairType = pairType; } public void initialize() { this.values = new PriorityQueue<Pair<K, V>>(limit, new PairValueComparator<K, V>(maximize)); + pairType.initialize(getConfiguration()); } public void process(Pair<K, V> input, Emitter<Pair<Integer, Pair<K, V>>> emitter) { - values.add(input); + values.add(pairType.getDetachedValue(input)); if (values.size() > limit) { values.poll(); } @@ -139,10 +142,17 @@ public class Aggregate { private final int limit; private final boolean maximize; + private PType<Pair<K, V>> pairType; - public TopKCombineFn(int limit, boolean maximize) { + public TopKCombineFn(int limit, boolean maximize, PType<Pair<K, V>> pairType) { this.limit = limit; this.maximize = maximize; + this.pairType = pairType; + } + + @Override + public void initialize() { + pairType.initialize(getConfiguration()); } @Override @@ -151,7 +161,7 @@ public class Aggregate { Comparator<Pair<K, V>> cmp = new PairValueComparator<K, V>(maximize); PriorityQueue<Pair<K, V>> queue = new PriorityQueue<Pair<K, V>>(limit, cmp); for (Pair<K, V> pair : input.second()) { - queue.add(pair); + queue.add(pairType.getDetachedValue(pair)); if (queue.size() > limit) { queue.poll(); } @@ -165,13 +175,23 @@ public class Aggregate { } } + /** + * Selects the top N pairs from the given table, with sorting being performed on the values (i.e. the second + * value in the pair) of the table. + * + * @param ptable table containing the pairs from which the top N is to be selected + * @param limit number of top elements to select + * @param maximize if true, the maximum N values from the table will be selected, otherwise the minimal + * N values will be selected + * @return table containing the top N values from the incoming table + */ public static <K, V> PTable<K, V> top(PTable<K, V> ptable, int limit, boolean maximize) { PTypeFamily ptf = ptable.getTypeFamily(); PTableType<K, V> base = ptable.getPTableType(); PType<Pair<K, V>> pairType = ptf.pairs(base.getKeyType(), base.getValueType()); PTableType<Integer, Pair<K, V>> inter = ptf.tableOf(ptf.ints(), pairType); - return ptable.parallelDo("top" + limit + "map", new TopKFn<K, V>(limit, maximize), inter) - .groupByKey(1).combineValues(new TopKCombineFn<K, V>(limit, maximize)) + return ptable.parallelDo("top" + limit + "map", new TopKFn<K, V>(limit, maximize, pairType), inter) + .groupByKey(1).combineValues(new TopKCombineFn<K, V>(limit, maximize, pairType)) .parallelDo("top" + limit + "reduce", new DoFn<Pair<Integer, Pair<K, V>>, Pair<K, V>>() { public void process(Pair<Integer, Pair<K, V>> input, Emitter<Pair<K, V>> emitter) { emitter.emit(input.second());
