Repository: crunch Updated Branches: refs/heads/apache-crunch-0.8 a110d6d3b -> 20dea659d
CRUNCH-373 Detach values in MapsideJoin Use detached values in the materialized map of values in the MapsideJoinStrategy so that non-primitive values are correctly handled as separate instances. Add test derived from provided test case from Rachit Soni. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/20dea659 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/20dea659 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/20dea659 Branch: refs/heads/apache-crunch-0.8 Commit: 20dea659db68ac856bfc98fe32642ba14c39a109 Parents: a110d6d Author: Gabriel Reid <[email protected]> Authored: Tue Apr 1 20:23:21 2014 -0500 Committer: Gabriel Reid <[email protected]> Committed: Wed Apr 2 14:14:39 2014 +0200 ---------------------------------------------------------------------- .../crunch/lib/join/MapsideJoinStrategyIT.java | 64 +++++++++++++++++++- .../crunch/lib/join/MapsideJoinStrategy.java | 12 +++- 2 files changed, 71 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/20dea659/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java index 9972549..15cc08d 100644 --- a/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java @@ -21,11 +21,16 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.io.OutputStream; import java.util.Collections; import java.util.List; import java.util.Locale; +import com.google.common.collect.Lists; +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; import org.apache.crunch.MapFn; +import org.apache.crunch.PCollection; import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.Pipeline; @@ -36,13 +41,14 @@ import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; -import com.google.common.collect.Lists; - public class MapsideJoinStrategyIT { private static String saveTempDir; @@ -147,6 +153,60 @@ public class MapsideJoinStrategyIT { runMapsideLeftOuterJoin(new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()), false, true); } + @Test + public void testMapSideJoinWithImmutableBytesWritable() throws IOException, InterruptedException { + //Write out input files + FileSystem fs = FileSystem.get(tmpDir.getDefaultConfiguration()); + Path path1 = tmpDir.getPath("input1.txt"); + Path path2 = tmpDir.getPath("input2.txt"); + + OutputStream out1 = fs.create(path1, true); + OutputStream out2 = fs.create(path2, true); + + for(int i = 0; i < 4; i++){ + byte[] value = ("value" + i + "\n").getBytes(); + out1.write(value); + out2.write(value); + } + + out1.flush(); + out1.close(); + out2.flush(); + out2.close(); + + final MRPipeline pipeline = new MRPipeline(MapsideJoinStrategyIT.class, tmpDir.getDefaultConfiguration()); + + final PCollection<String> values1 = pipeline.readTextFile(path1.toString()); + final PCollection<String> values2 = pipeline.readTextFile(path2.toString()); + + final PTable<Text, Text> convertedValues1 = convertStringToText(values1); + final PTable<Text, Text> convertedValues2 = convertStringToText(values2); + + // for map side join + final MapsideJoinStrategy<Text, Text, Text> mapSideJoinStrategy = new MapsideJoinStrategy<Text, Text, Text>(); + + final PTable<Text, Pair<Text, Text>> updatedJoinedRows = mapSideJoinStrategy.join(convertedValues1, convertedValues2, JoinType.INNER_JOIN); + pipeline.run(); + + // Join should have 2 results + // Join should have contentBytes1 and contentBytes2 + assertEquals(4, updatedJoinedRows.materializeToMap().size()); + } + + /** + * The method is used to convert string to entity key + */ + public static PTable<Text, Text> convertStringToText(final PCollection<String> entityKeysStringPCollection) { + return entityKeysStringPCollection.parallelDo(new DoFn<String, Pair<Text, Text>>() { + + @Override + public void process(final String input, final Emitter<Pair<Text, Text>> emitter) { + emitter.emit(new Pair<Text, Text>(new Text(input), new Text(input))); + } + }, Writables.tableOf(Writables.writables(Text.class), Writables.writables(Text.class))); + } + + private void runMapsideJoin(Pipeline pipeline, boolean inMemory, boolean materialize) { PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt"); PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt"); http://git-wip-us.apache.org/repos/asf/crunch/blob/20dea659/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java index 680bb2e..3778cf4 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/MapsideJoinStrategy.java @@ -27,6 +27,7 @@ import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.ParallelDoOptions; import org.apache.crunch.ReadableData; +import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PTypeFamily; import org.apache.hadoop.conf.Configuration; @@ -84,7 +85,8 @@ public class MapsideJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> { private PTable<K, Pair<U,V>> joinInternal(PTable<K, U> left, PTable<K, V> right, boolean includeUnmatchedLeftValues) { PTypeFamily tf = left.getTypeFamily(); ReadableData<Pair<K, V>> rightReadable = right.asReadable(materialize); - MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(rightReadable, includeUnmatchedLeftValues); + MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>( + rightReadable, right.getPTableType(), includeUnmatchedLeftValues); ParallelDoOptions options = ParallelDoOptions.builder() .sourceTargets(rightReadable.getSourceTargets()) .build(); @@ -96,11 +98,13 @@ public class MapsideJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> { static class MapsideJoinDoFn<K, U, V> extends DoFn<Pair<K, U>, Pair<K, Pair<U, V>>> { private final ReadableData<Pair<K, V>> readable; + private final PTableType<K, V> tableType; private final boolean includeUnmatched; private Multimap<K, V> joinMap; - public MapsideJoinDoFn(ReadableData<Pair<K, V>> rs, boolean includeUnmatched) { + public MapsideJoinDoFn(ReadableData<Pair<K, V>> rs, PTableType<K, V> tableType, boolean includeUnmatched) { this.readable = rs; + this.tableType = tableType; this.includeUnmatched = includeUnmatched; } @@ -112,11 +116,13 @@ public class MapsideJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> { @Override public void initialize() { super.initialize(); + tableType.initialize(getConfiguration()); joinMap = ArrayListMultimap.create(); try { for (Pair<K, V> joinPair : readable.read(getContext())) { - joinMap.put(joinPair.first(), joinPair.second()); + Pair<K, V> detachedPair = tableType.getDetachedValue(joinPair); + joinMap.put(detachedPair.first(), detachedPair.second()); } } catch (IOException e) { throw new CrunchRuntimeException("Error reading map-side join data", e);
