Updated Branches: refs/heads/master 883c565a3 -> a4cf3edf0
CRUNCH-72 - Reduce memory usage of Cartesian#cross Use PTable#join instead of PTable#cogroup to minimize memory usage in Cartesian#cross. Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/a4cf3edf Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/a4cf3edf Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/a4cf3edf Branch: refs/heads/master Commit: a4cf3edf0bdebb8969bf25b304fca7b13a198623 Parents: 883c565 Author: Gabriel Reid <[email protected]> Authored: Thu Sep 20 21:00:44 2012 +0200 Committer: Gabriel Reid <[email protected]> Committed: Thu Sep 20 21:00:44 2012 +0200 ---------------------------------------------------------------------- .../main/java/org/apache/crunch/lib/Cartesian.java | 45 ++++------ .../java/org/apache/crunch/lib/CartesianTest.java | 72 +++++++++------ 2 files changed, 60 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a4cf3edf/crunch/src/main/java/org/apache/crunch/lib/Cartesian.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/Cartesian.java b/crunch/src/main/java/org/apache/crunch/lib/Cartesian.java index f3fc5f5..08327dd 100644 --- a/crunch/src/main/java/org/apache/crunch/lib/Cartesian.java +++ b/crunch/src/main/java/org/apache/crunch/lib/Cartesian.java @@ -17,11 +17,11 @@ */ package org.apache.crunch.lib; -import java.util.Collection; import java.util.Random; import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; +import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; import org.apache.crunch.PTable; import org.apache.crunch.Pair; @@ -132,26 +132,22 @@ public class Cartesian { PTable<Pair<Integer, Integer>, Pair<K2, V>> rightCross = right.parallelDo(new GFCross<Pair<K2, V>>(1, parallelism), rtf.tableOf(rtf.pairs(rtf.ints(), rtf.ints()), rtf.pairs(right.getKeyType(), right.getValueType()))); - PTable<Pair<Integer, Integer>, Pair<Collection<Pair<K1, U>>, Collection<Pair<K2, V>>>> cg = leftCross - .cogroup(rightCross); + PTable<Pair<Integer, Integer>, Pair<Pair<K1, U>, Pair<K2, V>>> cg = leftCross.join(rightCross); PTypeFamily ctf = cg.getTypeFamily(); - return cg - .parallelDo( - new DoFn<Pair<Pair<Integer, Integer>, Pair<Collection<Pair<K1, U>>, Collection<Pair<K2, V>>>>, Pair<Pair<K1, K2>, Pair<U, V>>>() { - @Override - public void process( - Pair<Pair<Integer, Integer>, Pair<Collection<Pair<K1, U>>, Collection<Pair<K2, V>>>> input, - Emitter<Pair<Pair<K1, K2>, Pair<U, V>>> emitter) { - for (Pair<K1, U> l : input.second().first()) { - for (Pair<K2, V> r : input.second().second()) { - emitter.emit(Pair.of(Pair.of(l.first(), r.first()), Pair.of(l.second(), r.second()))); - } - } - } - }, ctf.tableOf(ctf.pairs(left.getKeyType(), right.getKeyType()), - ctf.pairs(left.getValueType(), right.getValueType()))); + return cg.parallelDo( + new MapFn<Pair<Pair<Integer, Integer>, Pair<Pair<K1, U>, Pair<K2, V>>>, Pair<Pair<K1, K2>, Pair<U, V>>>() { + + @Override + public Pair<Pair<K1, K2>, Pair<U, V>> map(Pair<Pair<Integer, Integer>, Pair<Pair<K1, U>, Pair<K2, V>>> input) { + Pair<Pair<K1, U>, Pair<K2, V>> valuePair = input.second(); + return Pair.of(Pair.of(valuePair.first().first(), valuePair.second().first()), + Pair.of(valuePair.first().second(), valuePair.second().second())); + } + }, + ctf.tableOf(ctf.pairs(left.getKeyType(), right.getKeyType()), + ctf.pairs(left.getValueType(), right.getValueType()))); } /** @@ -205,19 +201,14 @@ public class Cartesian { PTable<Pair<Integer, Integer>, V> rightCross = right.parallelDo(new GFCross<V>(1, parallelism), rtf.tableOf(rtf.pairs(rtf.ints(), rtf.ints()), right.getPType())); - PTable<Pair<Integer, Integer>, Pair<Collection<U>, Collection<V>>> cg = leftCross.cogroup(rightCross); + PTable<Pair<Integer, Integer>, Pair<U, V>> cg = leftCross.join(rightCross); PTypeFamily ctf = cg.getTypeFamily(); - return cg.parallelDo(new DoFn<Pair<Pair<Integer, Integer>, Pair<Collection<U>, Collection<V>>>, Pair<U, V>>() { + return cg.parallelDo(new MapFn<Pair<Pair<Integer, Integer>, Pair<U, V>>, Pair<U, V>>() { @Override - public void process(Pair<Pair<Integer, Integer>, Pair<Collection<U>, Collection<V>>> input, - Emitter<Pair<U, V>> emitter) { - for (U l : input.second().first()) { - for (V r : input.second().second()) { - emitter.emit(Pair.of(l, r)); - } - } + public Pair<U, V> map(Pair<Pair<Integer, Integer>, Pair<U, V>> input) { + return input.second(); } }, ctf.pairs(left.getPType(), right.getPType())); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a4cf3edf/crunch/src/test/java/org/apache/crunch/lib/CartesianTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/lib/CartesianTest.java b/crunch/src/test/java/org/apache/crunch/lib/CartesianTest.java index eba7429..b19097c 100644 --- a/crunch/src/test/java/org/apache/crunch/lib/CartesianTest.java +++ b/crunch/src/test/java/org/apache/crunch/lib/CartesianTest.java @@ -18,48 +18,60 @@ package org.apache.crunch.lib; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import java.util.HashSet; -import java.util.Iterator; +import java.util.Collections; +import java.util.List; import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.impl.mem.MemPipeline; import org.apache.crunch.types.writable.Writables; import org.junit.Test; -import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; public class CartesianTest { @Test - public void testCartesianCollection() { - ImmutableList<ImmutableList<Integer>> testCases = ImmutableList.of(ImmutableList.of(1, 2, 3, 4, 5), - ImmutableList.<Integer> of(1, 2, 3), ImmutableList.<Integer> of()); - - for (int t1 = 0; t1 < testCases.size(); t1++) { - ImmutableList<Integer> testCase1 = testCases.get(t1); - for (int t2 = t1; t2 < testCases.size(); t2++) { - ImmutableList<Integer> testCase2 = testCases.get(t2); - - PCollection<Integer> X = MemPipeline.typedCollectionOf(Writables.ints(), testCase1); - PCollection<Integer> Y = MemPipeline.typedCollectionOf(Writables.ints(), testCase2); - - PCollection<Pair<Integer, Integer>> cross = Cartesian.cross(X, Y); - HashSet<Pair<Integer, Integer>> crossSet = new HashSet<Pair<Integer, Integer>>(); - for (Iterator<Pair<Integer, Integer>> i = cross.materialize().iterator(); i.hasNext();) { - crossSet.add(i.next()); - } - assertEquals(crossSet.size(), testCase1.size() * testCase2.size()); - - for (int i = 0; i < testCase1.size(); i++) { - for (int j = 0; j < testCase2.size(); j++) { - assertTrue(crossSet.contains(Pair.of(testCase1.get(i), testCase2.get(j)))); - } - } - } - } + public void testCartesianCollection_SingleValues() { + + PCollection<String> letters = MemPipeline.typedCollectionOf(Writables.strings(), "a", "b"); + PCollection<Integer> ints = MemPipeline.typedCollectionOf(Writables.ints(), 1, 2); + + PCollection<Pair<String, Integer>> cartesianProduct = Cartesian.cross(letters, ints); + + @SuppressWarnings("unchecked") + List<Pair<String, Integer>> expectedResults = Lists.newArrayList(Pair.of("a", 1), Pair.of("a", 2), Pair.of("b", 1), + Pair.of("b", 2)); + List<Pair<String, Integer>> actualResults = Lists.newArrayList(cartesianProduct.materialize()); + Collections.sort(actualResults); + + assertEquals(expectedResults, actualResults); + } + + @Test + public void testCartesianCollection_Tables() { + + PTable<String, Integer> leftTable = MemPipeline.typedTableOf( + Writables.tableOf(Writables.strings(), Writables.ints()), "a", 1, "b", 2); + PTable<String, Float> rightTable = MemPipeline.typedTableOf( + Writables.tableOf(Writables.strings(), Writables.floats()), "A", 1.0f, "B", 2.0f); + + PTable<Pair<String, String>, Pair<Integer, Float>> cartesianProduct = Cartesian.cross(leftTable, rightTable); + + List<Pair<Pair<String, String>, Pair<Integer, Float>>> expectedResults = Lists.newArrayList(); + expectedResults.add(Pair.of(Pair.of("a", "A"), Pair.of(1, 1.0f))); + expectedResults.add(Pair.of(Pair.of("a", "B"), Pair.of(1, 2.0f))); + expectedResults.add(Pair.of(Pair.of("b", "A"), Pair.of(2, 1.0f))); + expectedResults.add(Pair.of(Pair.of("b", "B"), Pair.of(2, 2.0f))); + + List<Pair<Pair<String, String>, Pair<Integer, Float>>> actualResults = Lists.newArrayList(cartesianProduct + .materialize()); + Collections.sort(actualResults); + + assertEquals(expectedResults, actualResults); + } }
