Repository: crunch Updated Branches: refs/heads/master 296abb86c -> 799b1232f
CRUNCH-216 Transpose args in MapsideJoinStrategy Initial move in transposing the parameters for the MapsideJoinStrategy to bring it in line with other join strategies (i.e. the left-side table should be the smaller of the two tables being joined). Introduce static factory methods for creating MapsideJoinStrategy instances that load the left-side table into memory, and deprecate the existing public constructor to warn users of the future change. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/799b1232 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/799b1232 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/799b1232 Branch: refs/heads/master Commit: 799b1232f1aa6179759506722709f26a858adedf Parents: 296abb8 Author: Gabriel Reid <[email protected]> Authored: Sat Feb 15 23:24:48 2014 +0100 Committer: Gabriel Reid <[email protected]> Committed: Tue Feb 18 23:30:17 2014 +0100 ---------------------------------------------------------------------- .../crunch/lib/join/MapsideJoinStrategyIT.java | 186 ++++++++++++++++--- .../crunch/lib/join/MapsideJoinStrategy.java | 92 ++++++++- 2 files changed, 247 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/799b1232/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java index 9972549..9add60a 100644 --- a/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java @@ -42,6 +42,7 @@ import org.junit.Rule; import org.junit.Test; import com.google.common.collect.Lists; +import sun.print.resources.serviceui; public class MapsideJoinStrategyIT { @@ -91,22 +92,44 @@ public class MapsideJoinStrategyIT { @Test public void testMapSideJoin_MemPipeline() { - runMapsideJoin(MemPipeline.getInstance(), true, false); + runMapsideJoin(MemPipeline.getInstance(), true, false, MapsideJoinStrategy.<Integer,String,String>create(false)); + } + + @Test + public void testLegacyMapSideJoin_MemPipeline() { + runLegacyMapsideJoin(MemPipeline.getInstance(), true, false, new MapsideJoinStrategy<Integer, String, String>(false)); } @Test public void testMapSideJoin_MemPipeline_Materialized() { - runMapsideJoin(MemPipeline.getInstance(), true, true); + runMapsideJoin(MemPipeline.getInstance(), true, true, MapsideJoinStrategy.<Integer,String,String>create(true)); + } + + @Test + public void testLegacyMapSideJoin_MemPipeline_Materialized() { + runLegacyMapsideJoin(MemPipeline.getInstance(), true, true, new MapsideJoinStrategy<Integer, String, String>(true)); } @Test - public void testMapSideJoinLeftOuterJoin_MemPipeline() { - runMapsideLeftOuterJoin(MemPipeline.getInstance(), true, false); + public void testMapSideJoinRightOuterJoin_MemPipeline() { + runMapsideRightOuterJoin(MemPipeline.getInstance(), true, false, + MapsideJoinStrategy.<Integer, String, String>create(false)); + } + + @Test + public void testLegacyMapSideJoinLeftOuterJoin_MemPipeline() { + runLegacyMapsideLeftOuterJoin(MemPipeline.getInstance(), true, false, new MapsideJoinStrategy<Integer, String, String>(false)); + } + + @Test + public void testMapSideJoinRightOuterJoin_MemPipeline_Materialized() { + runMapsideRightOuterJoin(MemPipeline.getInstance(), true, true, + MapsideJoinStrategy.<Integer, String, String>create(true)); } @Test - public void testMapSideJoinLeftOuterJoin_MemPipeline_Materialized() { - runMapsideLeftOuterJoin(MemPipeline.getInstance(), true, true); + public void testLegacyMapSideJoinLeftOuterJoin_MemPipeline_Materialized() { + runLegacyMapsideLeftOuterJoin(MemPipeline.getInstance(), true, true, new MapsideJoinStrategy<Integer, String, String>(true)); } @Test @@ -128,35 +151,115 @@ public class MapsideJoinStrategyIT { } @Test + public void testLegacyMapsideJoin_LeftSideIsEmpty() throws IOException { + MRPipeline pipeline = new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()); + PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt"); + PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt"); + + PTable<Integer, String> filteredCustomerTable = customerTable + .parallelDo(FilterFns.<Pair<Integer, String>>REJECT_ALL(), customerTable.getPTableType()); + + + JoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer, String, String>(); + PTable<Integer, Pair<String, String>> joined = mapsideJoin.join(customerTable, filteredCustomerTable, + JoinType.INNER_JOIN); + + List<Pair<Integer, Pair<String, String>>> materializedJoin = Lists.newArrayList(joined.materialize()); + + assertTrue(materializedJoin.isEmpty()); + } + + @Test public void testMapsideJoin() throws IOException { - runMapsideJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()), false, false); + runMapsideJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()), + false, false, MapsideJoinStrategy.<Integer, String, String>create(false)); + } + + @Test + public void testLegacyMapsideJoin() throws IOException { + runLegacyMapsideJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()), + false, false, new MapsideJoinStrategy<Integer, String, String>(false)); } @Test public void testMapsideJoin_Materialized() throws IOException { - runMapsideJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()), false, true); + runMapsideJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()), + false, true, MapsideJoinStrategy.<Integer, String, String>create(true)); + } + + @Test + public void testLegacyMapsideJoin_Materialized() throws IOException { + runLegacyMapsideJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()), + false, true, new MapsideJoinStrategy<Integer, String, String>(true)); + } + + @Test + public void testMapsideJoin_RightOuterJoin() throws IOException { + runMapsideRightOuterJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()), + false, false, MapsideJoinStrategy.<Integer, String, String>create(false)); + } + + @Test + public void testLegacyMapsideJoin_LeftOuterJoin() throws IOException { + runLegacyMapsideLeftOuterJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()), + false, false, + new MapsideJoinStrategy<Integer, String, String>(false)); } @Test - public void testMapsideJoin_LeftOuterJoin() throws IOException { - runMapsideLeftOuterJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()), false, false); + public void testMapsideJoin_RightOuterJoin_Materialized() throws IOException { + runMapsideRightOuterJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()), + false, true, MapsideJoinStrategy.<Integer, String, String>create(true)); } @Test - public void testMapsideJoin_LeftOuterJoin_Materialized() throws IOException { - runMapsideLeftOuterJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()), false, true); + public void testLegacyMapsideJoin_LeftOuterJoin_Materialized() throws IOException { + runLegacyMapsideLeftOuterJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()), + false, true, + new MapsideJoinStrategy<Integer, String, String>(true)); } - private void runMapsideJoin(Pipeline pipeline, boolean inMemory, boolean materialize) { + private void runMapsideJoin(Pipeline pipeline, boolean inMemory, boolean materialize, + MapsideJoinStrategy<Integer,String, String> joinStrategy) { PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt"); PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt"); - JoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer, String, String>(materialize); - PTable<Integer, String> custOrders = mapsideJoin.join(customerTable, orderTable, JoinType.INNER_JOIN) + PTable<Integer, String> custOrders = joinStrategy.join(orderTable, customerTable, JoinType.INNER_JOIN) + .mapValues("concat", new ConcatValuesFn(), Writables.strings()); + + PTable<Integer, String> ORDER_TABLE = orderTable.mapValues(new CapOrdersFn(), orderTable.getValueType()); + PTable<Integer, Pair<String, String>> joined = joinStrategy.join(ORDER_TABLE, custOrders, JoinType.INNER_JOIN); + + List<Pair<Integer, Pair<String, String>>> expectedJoinResult = Lists.newArrayList(); + expectedJoinResult.add(Pair.of(111, Pair.of("CORN FLAKES", "[Corn flakes,John Doe]"))); + expectedJoinResult.add(Pair.of(222, Pair.of("TOILET PAPER", "[Toilet paper,Jane Doe]"))); + expectedJoinResult.add(Pair.of(222, Pair.of("TOILET PAPER", "[Toilet plunger,Jane Doe]"))); + expectedJoinResult.add(Pair.of(222, Pair.of("TOILET PLUNGER", "[Toilet paper,Jane Doe]"))); + expectedJoinResult.add(Pair.of(222, Pair.of("TOILET PLUNGER", "[Toilet plunger,Jane Doe]"))); + expectedJoinResult.add(Pair.of(333, Pair.of("TOILET BRUSH", "[Toilet brush,Someone Else]"))); + Iterable<Pair<Integer, Pair<String, String>>> iter = joined.materialize(); + + PipelineResult res = pipeline.run(); + if (!inMemory) { + assertEquals(materialize ? 2 : 1, res.getStageResults().size()); + } + + List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(iter); + Collections.sort(joinedResultList); + + assertEquals(expectedJoinResult, joinedResultList); + } + + private void runLegacyMapsideJoin(Pipeline pipeline, boolean inMemory, boolean materialize, + MapsideJoinStrategy<Integer, String, String> mapsideJoinStrategy) { + PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt"); + PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt"); + + PTable<Integer, String> custOrders = mapsideJoinStrategy.join(customerTable, orderTable, JoinType.INNER_JOIN) .mapValues("concat", new ConcatValuesFn(), Writables.strings()); PTable<Integer, String> ORDER_TABLE = orderTable.mapValues(new CapOrdersFn(), orderTable.getValueType()); - PTable<Integer, Pair<String, String>> joined = mapsideJoin.join(custOrders, ORDER_TABLE, JoinType.INNER_JOIN); + PTable<Integer, Pair<String, String>> joined = mapsideJoinStrategy.join(custOrders, ORDER_TABLE, JoinType.INNER_JOIN); List<Pair<Integer, Pair<String, String>>> expectedJoinResult = Lists.newArrayList(); expectedJoinResult.add(Pair.of(111, Pair.of("[John Doe,Corn flakes]", "CORN FLAKES"))); @@ -166,28 +269,63 @@ public class MapsideJoinStrategyIT { expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet plunger]", "TOILET PLUNGER"))); expectedJoinResult.add(Pair.of(333, Pair.of("[Someone Else,Toilet brush]", "TOILET BRUSH"))); Iterable<Pair<Integer, Pair<String, String>>> iter = joined.materialize(); - + PipelineResult res = pipeline.run(); if (!inMemory) { assertEquals(materialize ? 2 : 1, res.getStageResults().size()); } - + List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(iter); Collections.sort(joinedResultList); assertEquals(expectedJoinResult, joinedResultList); } - private void runMapsideLeftOuterJoin(Pipeline pipeline, boolean inMemory, boolean materialize) { + private void runMapsideRightOuterJoin(Pipeline pipeline, boolean inMemory, boolean materialize, + MapsideJoinStrategy<Integer, String, String> mapsideJoinStrategy) { PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt"); PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt"); - JoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer, String, String>(materialize); - PTable<Integer, String> custOrders = mapsideJoin.join(customerTable, orderTable, JoinType.LEFT_OUTER_JOIN) + PTable<Integer, String> custOrders = mapsideJoinStrategy.join(orderTable, customerTable, JoinType.RIGHT_OUTER_JOIN) + .mapValues("concat", new ConcatValuesFn(), Writables.strings()); + + PTable<Integer, String> ORDER_TABLE = orderTable.mapValues(new CapOrdersFn(), orderTable.getValueType()); + PTable<Integer, Pair<String, String>> joined = mapsideJoinStrategy.join(ORDER_TABLE, custOrders, + JoinType.RIGHT_OUTER_JOIN); + + List<Pair<Integer, Pair<String, String>>> expectedJoinResult = Lists.newArrayList(); + expectedJoinResult.add(Pair.of(111, Pair.of("CORN FLAKES", "[Corn flakes,John Doe]"))); + expectedJoinResult.add(Pair.of(222, Pair.of("TOILET PAPER", "[Toilet paper,Jane Doe]"))); + expectedJoinResult.add(Pair.of(222, Pair.of("TOILET PAPER", "[Toilet plunger,Jane Doe]"))); + expectedJoinResult.add(Pair.of(222, Pair.of("TOILET PLUNGER", "[Toilet paper,Jane Doe]"))); + expectedJoinResult.add(Pair.of(222, Pair.of("TOILET PLUNGER", "[Toilet plunger,Jane Doe]"))); + expectedJoinResult.add(Pair.of(333, Pair.of("TOILET BRUSH", "[Toilet brush,Someone Else]"))); + expectedJoinResult.add(Pair.of(444, Pair.<String,String>of(null, "[null,Has No Orders]"))); + Iterable<Pair<Integer, Pair<String, String>>> iter = joined.materialize(); + + PipelineResult res = pipeline.run(); + if (!inMemory) { + assertEquals(materialize ? 2 : 1, res.getStageResults().size()); + } + + List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(iter); + Collections.sort(joinedResultList); + + assertEquals(expectedJoinResult, joinedResultList); + } + + private void runLegacyMapsideLeftOuterJoin(Pipeline pipeline, boolean inMemory, boolean materialize, + MapsideJoinStrategy<Integer, String, String> legacyMapsideJoinStrategy) { + PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt"); + PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt"); + + PTable<Integer, String> custOrders = legacyMapsideJoinStrategy.join(customerTable, orderTable, + JoinType.LEFT_OUTER_JOIN) .mapValues("concat", new ConcatValuesFn(), Writables.strings()); PTable<Integer, String> ORDER_TABLE = orderTable.mapValues(new CapOrdersFn(), orderTable.getValueType()); - PTable<Integer, Pair<String, String>> joined = mapsideJoin.join(custOrders, ORDER_TABLE, JoinType.LEFT_OUTER_JOIN); + PTable<Integer, Pair<String, String>> joined = + legacyMapsideJoinStrategy.join(custOrders, ORDER_TABLE, JoinType.LEFT_OUTER_JOIN); List<Pair<Integer, Pair<String, String>>> expectedJoinResult = Lists.newArrayList(); expectedJoinResult.add(Pair.of(111, Pair.of("[John Doe,Corn flakes]", "CORN FLAKES"))); @@ -198,12 +336,12 @@ public class MapsideJoinStrategyIT { expectedJoinResult.add(Pair.of(333, Pair.of("[Someone Else,Toilet brush]", "TOILET BRUSH"))); expectedJoinResult.add(Pair.of(444, Pair.<String,String>of("[Has No Orders,null]", null))); Iterable<Pair<Integer, Pair<String, String>>> iter = joined.materialize(); - + PipelineResult res = pipeline.run(); if (!inMemory) { assertEquals(materialize ? 2 : 1, res.getStageResults().size()); } - + List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(iter); Collections.sort(joinedResultList); http://git-wip-us.apache.org/repos/asf/crunch/blob/799b1232/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java index 680bb2e..cafb4f9 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java @@ -17,12 +17,12 @@ */ package org.apache.crunch.lib.join; -import java.io.IOException; -import java.util.Collection; - +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; +import org.apache.crunch.MapFn; import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.ParallelDoOptions; @@ -30,17 +30,22 @@ import org.apache.crunch.ReadableData; import org.apache.crunch.types.PTypeFamily; import org.apache.hadoop.conf.Configuration; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.Multimap; +import java.io.IOException; +import java.util.Collection; /** * Utility for doing map side joins on a common key between two {@link PTable}s. * <p> * A map side join is an optimized join which doesn't use a reducer; instead, - * the right side of the join is loaded into memory and the join is performed in + * one side of the join is loaded into memory and the join is performed in * a mapper. This style of join has the important implication that the output of * the join is not sorted, which is the case with a conventional (reducer-based) * join. + * <p/> + * Instances of this class should be instantiated via the {@link #create()} or {@link #create(boolean)} factory + * methods, or optionally via the deprecated public constructor for backwards compatibility with + * older versions of Crunch where the right-side table was loaded into memory. The public constructor will be removed + * in a future release. */ public class MapsideJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> { @@ -49,24 +54,54 @@ public class MapsideJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> { /** * Constructs a new instance of the {@code MapsideJoinStratey}, materializing the right-side * join table to disk before the join is performed. + * + * @deprecated Use the {@link #create()} factory method instead */ + @Deprecated public MapsideJoinStrategy() { this(true); } /** - * Constructs a new instance of the {@code MapsideJoinStrategy}. If the {@code }materialize} + * Constructs a new instance of the {@code MapsideJoinStrategy}. If the {@code materialize} * argument is true, then the right-side join {@code PTable} will be materialized to disk * before the in-memory join is performed. If it is false, then Crunch can optionally read * and process the data from the right-side table without having to run a job to materialize * the data to disk first. * * @param materialize Whether or not to materialize the right-side table before the join + * + * @deprecated Use the {@link #create(boolean)} factory method instead */ + @Deprecated public MapsideJoinStrategy(boolean materialize) { this.materialize = materialize; } + /** + * Create a new {@code MapsideJoinStrategy} instance that will load its left-side table into memory, + * and will materialize the contents of the left-side table to disk before running the in-memory join. + * <p/> + * The smaller of the two tables to be joined should be provided as the left-side table of the created join + * strategy instance. + */ + public static <K, U, V> MapsideJoinStrategy<K, U, V> create() { + return create(true); + } + + /** + * Create a new {@code MapsideJoinStrategy} instance that will load its left-side table into memory. + * <p/> + * If the {@code materialize} parameter is true, then the left-side {@code PTable} will be materialized to disk + * before the in-memory join is performed. If it is false, then Crunch can optionally read and process the data + * from the left-side table without having to run a job to materialize the data to disk first. + * + * @param materialize Whether or not to materialize the left-side table before the join + */ + public static <K, U, V> MapsideJoinStrategy<K, U, V> create(boolean materialize) { + return new LoadLeftSideMapsideJoinStrategy(materialize); + } + @Override public PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right, JoinType joinType) { switch (joinType) { @@ -138,4 +173,47 @@ public class MapsideJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> { } } } + + /** + * Loads the left-most table (instead of the right-most) in memory while performing the join. + */ + private static class LoadLeftSideMapsideJoinStrategy<K, U, V> extends MapsideJoinStrategy<K, U, V> { + + private MapsideJoinStrategy<K, V, U> mapsideJoinStrategy; + + public LoadLeftSideMapsideJoinStrategy(boolean materialize) { + mapsideJoinStrategy = new MapsideJoinStrategy<K, V, U>(materialize); + } + + @Override + public PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right, JoinType joinType) { + + JoinType reversedJoinType; + switch (joinType) { + case INNER_JOIN: + reversedJoinType = JoinType.INNER_JOIN; + break; + case RIGHT_OUTER_JOIN: + reversedJoinType = JoinType.LEFT_OUTER_JOIN; + break; + default: + throw new UnsupportedOperationException("Join type " + joinType + " is not supported"); + } + + + return mapsideJoinStrategy.join(right, left, reversedJoinType) + .mapValues("Reverse order out output table values", + new ReversePairOrderFn<V, U>(), + left.getTypeFamily().pairs(left.getValueType(), right.getValueType())); + } + } + + private static class ReversePairOrderFn<V, U> extends MapFn<Pair<V, U>, Pair<U, V>> { + + @Override + public Pair<U, V> map(Pair<V, U> input) { + return Pair.of(input.second(), input.first()); + } + + } }
