Initial implementation of map side joins
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/b13bd4f4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/b13bd4f4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/b13bd4f4 Branch: refs/heads/master Commit: b13bd4f416dcff62fd006a8104df252a406262d0 Parents: 8b64b84 Author: Gabriel Reid <[email protected]> Authored: Mon Jul 2 11:37:00 2012 +0200 Committer: Gabriel Reid <[email protected]> Committed: Fri Jul 6 17:56:57 2012 +0200 ---------------------------------------------------------------------- .../com/cloudera/crunch/lib/join/MapsideJoin.java | 122 +++++++++++++++ .../cloudera/crunch/lib/join/MapsideJoinTest.java | 102 ++++++++++++ src/test/resources/customers.txt | 4 + src/test/resources/orders.txt | 4 + 4 files changed, 232 insertions(+), 0 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/b13bd4f4/src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java b/src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java new file mode 100644 index 0000000..958b010 --- /dev/null +++ b/src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java @@ -0,0 +1,122 @@ +package com.cloudera.crunch.lib.join; + +import java.io.IOException; + +import org.apache.hadoop.fs.Path; + +import com.cloudera.crunch.DoFn; +import com.cloudera.crunch.Emitter; +import com.cloudera.crunch.PTable; +import com.cloudera.crunch.Pair; +import com.cloudera.crunch.impl.mr.MRPipeline; +import com.cloudera.crunch.impl.mr.run.CrunchRuntimeException; +import com.cloudera.crunch.io.ReadableSourceTarget; +import com.cloudera.crunch.io.impl.SourcePathTargetImpl; +import com.cloudera.crunch.types.PType; +import com.cloudera.crunch.types.PTypeFamily; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; + +/** + * Utility for doing map side joins on a common key between two {@link PTable}s. + * <p> + * A map side join is an optimized join which doesn't use a reducer; instead, + * the right side of the join is loaded into memory and the join is performed in + * a mapper. This style of join has the important implication that the output of + * the join is not sorted, which is the case with a conventional (reducer-based) + * join. + * <p> + * <b>Note:</b>This utility is only supported when running with a + * {@link MRPipeline} as the pipeline. + */ +public class MapsideJoin { + + /** + * Join two tables using a map side join. The right-side table will be loaded + * fully in memory, so this method should only be used if the right side + * table's contents can fit in the memory allocated to mappers. The join + * performed by this method is an inner join. + * + * @param left + * The left-side table of the join + * @param right + * The right-side table of the join, whose contents will be fully + * read into memory + * @return A table keyed on the join key, containing pairs of joined values + */ + public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right) { + + if (!(right.getPipeline() instanceof MRPipeline)) { + throw new CrunchRuntimeException("Map-side join is only supported within a MapReduce context"); + } + + MRPipeline pipeline = (MRPipeline) right.getPipeline(); + pipeline.materialize(right); + + // TODO Make this method internal to MRPipeline so that we don't run once + // for every separate MapsideJoin at the same level + pipeline.run(); + + // TODO Verify that this cast is safe -- are there any situations where this + // wouldn't work? + SourcePathTargetImpl<Pair<K, V>> sourcePathTarget = (SourcePathTargetImpl<Pair<K, V>>) pipeline + .getMaterializeSourceTarget(right); + + // TODO Put the data in the distributed cache + + Path path = sourcePathTarget.getPath(); + PType<Pair<K, V>> pType = right.getPType(); + + MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(path.toString(), pType); + PTypeFamily typeFamily = left.getTypeFamily(); + return left.parallelDo( + "mapjoin", + mapJoinDoFn, + typeFamily.tableOf(left.getKeyType(), + typeFamily.pairs(left.getValueType(), right.getValueType()))); + + } + + static class MapsideJoinDoFn<K, U, V> extends DoFn<Pair<K, U>, Pair<K, Pair<U, V>>> { + + private String path; + private PType<Pair<K, V>> ptype; + private Multimap<K, V> joinMap; + + public MapsideJoinDoFn(String path, PType<Pair<K, V>> ptype) { + this.path = path; + this.ptype = ptype; + } + + @Override + public void initialize() { + super.initialize(); + + ReadableSourceTarget<Pair<K, V>> sourceTarget = (ReadableSourceTarget<Pair<K, V>>) ptype + .getDefaultFileSource(new Path(path)); + Iterable<Pair<K, V>> iterable = null; + try { + iterable = sourceTarget.read(getConfiguration()); + } catch (IOException e) { + throw new CrunchRuntimeException("Error reading right-side of map side join: ", e); + } + + joinMap = ArrayListMultimap.create(); + for (Pair<K, V> joinPair : iterable) { + joinMap.put(joinPair.first(), joinPair.second()); + } + } + + @Override + public void process(Pair<K, U> input, Emitter<Pair<K, Pair<U, V>>> emitter) { + K key = input.first(); + U value = input.second(); + for (V joinValue : joinMap.get(key)) { + Pair<U, V> valuePair = Pair.of(value, joinValue); + emitter.emit(Pair.of(key, valuePair)); + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/b13bd4f4/src/test/java/com/cloudera/crunch/lib/join/MapsideJoinTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/com/cloudera/crunch/lib/join/MapsideJoinTest.java b/src/test/java/com/cloudera/crunch/lib/join/MapsideJoinTest.java new file mode 100644 index 0000000..97e0c63 --- /dev/null +++ b/src/test/java/com/cloudera/crunch/lib/join/MapsideJoinTest.java @@ -0,0 +1,102 @@ +package com.cloudera.crunch.lib.join; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import org.junit.Test; + +import com.cloudera.crunch.FilterFn; +import com.cloudera.crunch.MapFn; +import com.cloudera.crunch.PTable; +import com.cloudera.crunch.Pair; +import com.cloudera.crunch.Pipeline; +import com.cloudera.crunch.impl.mem.MemPipeline; +import com.cloudera.crunch.impl.mr.MRPipeline; +import com.cloudera.crunch.impl.mr.run.CrunchRuntimeException; +import com.cloudera.crunch.test.FileHelper; +import com.cloudera.crunch.types.writable.Writables; +import com.google.common.collect.Lists; + +public class MapsideJoinTest { + + private static class LineSplitter extends MapFn<String, Pair<Integer, String>> { + + @Override + public Pair<Integer, String> map(String input) { + String[] fields = input.split("\\|"); + return Pair.of(Integer.parseInt(fields[0]), fields[1]); + } + + } + + private static class NegativeFilter extends FilterFn<Pair<Integer, String>> { + + @Override + public boolean accept(Pair<Integer, String> input) { + return false; + } + + } + + @Test(expected = CrunchRuntimeException.class) + public void testNonMapReducePipeline() { + runMapsideJoin(MemPipeline.getInstance()); + } + + @Test + public void testMapsideJoin_RightSideIsEmpty() throws IOException { + MRPipeline pipeline = new MRPipeline(MapsideJoinTest.class); + PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt"); + PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt"); + + PTable<Integer, String> filteredOrderTable = orderTable.parallelDo(new NegativeFilter(), + orderTable.getPTableType()); + + PTable<Integer, Pair<String, String>> joined = MapsideJoin.join(customerTable, + filteredOrderTable); + + List<Pair<Integer, Pair<String, String>>> materializedJoin = Lists.newArrayList(joined + .materialize()); + + assertTrue(materializedJoin.isEmpty()); + + } + + @Test + public void testMapsideJoin() throws IOException { + runMapsideJoin(new MRPipeline(MapsideJoinTest.class)); + } + + private void runMapsideJoin(Pipeline pipeline) { + PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt"); + PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt"); + + PTable<Integer, Pair<String, String>> joined = MapsideJoin.join(customerTable, orderTable); + + List<Pair<Integer, Pair<String, String>>> expectedJoinResult = Lists.newArrayList(); + expectedJoinResult.add(Pair.of(111, Pair.of("John Doe", "Corn flakes"))); + expectedJoinResult.add(Pair.of(222, Pair.of("Jane Doe", "Toilet paper"))); + expectedJoinResult.add(Pair.of(222, Pair.of("Jane Doe", "Toilet plunger"))); + expectedJoinResult.add(Pair.of(333, Pair.of("Someone Else", "Toilet brush"))); + + List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(joined + .materialize()); + Collections.sort(joinedResultList); + + assertEquals(expectedJoinResult, joinedResultList); + } + + private static PTable<Integer, String> readTable(Pipeline pipeline, String filename) { + try { + return pipeline.readTextFile(FileHelper.createTempCopyOf(filename)).parallelDo("asTable", + new LineSplitter(), Writables.tableOf(Writables.ints(), Writables.strings())); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/b13bd4f4/src/test/resources/customers.txt ---------------------------------------------------------------------- diff --git a/src/test/resources/customers.txt b/src/test/resources/customers.txt new file mode 100644 index 0000000..98f3f3d --- /dev/null +++ b/src/test/resources/customers.txt @@ -0,0 +1,4 @@ +111|John Doe +222|Jane Doe +333|Someone Else +444|Has No Orders \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/b13bd4f4/src/test/resources/orders.txt ---------------------------------------------------------------------- diff --git a/src/test/resources/orders.txt b/src/test/resources/orders.txt new file mode 100644 index 0000000..2f1383f --- /dev/null +++ b/src/test/resources/orders.txt @@ -0,0 +1,4 @@ +222|Toilet plunger +333|Toilet brush +222|Toilet paper +111|Corn flakes \ No newline at end of file
