Use the distributed cache for map side joins
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/f70a6df8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/f70a6df8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/f70a6df8 Branch: refs/heads/master Commit: f70a6df8b2fe48efa14b432fddf731c3e8d94d86 Parents: b13bd4f Author: Gabriel Reid <[email protected]> Authored: Mon Jul 2 14:06:31 2012 +0200 Committer: Gabriel Reid <[email protected]> Committed: Fri Jul 6 17:56:57 2012 +0200 ---------------------------------------------------------------------- .../cloudera/crunch/io/avro/AvroFileSource.java | 3 +- .../com/cloudera/crunch/io/seq/SeqFileSource.java | 12 ++-- .../cloudera/crunch/io/seq/SeqFileTableSource.java | 2 +- .../cloudera/crunch/io/text/TextFileSource.java | 4 +- .../com/cloudera/crunch/lib/join/MapsideJoin.java | 45 +++++++++++---- 5 files changed, 43 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f70a6df8/src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java b/src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java index 1122d62..4debfeb 100644 --- a/src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java +++ b/src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java @@ -45,7 +45,8 @@ public class AvroFileSource<T> extends FileSourceImpl<T> implements ReadableSour @Override public Iterable<T> read(Configuration conf) throws IOException { - return CompositePathIterable.create(FileSystem.get(conf), path, new AvroFileReaderFactory<T>( + FileSystem fs = FileSystem.get(path.toUri(), conf); + return CompositePathIterable.create(fs, path, new AvroFileReaderFactory<T>( (AvroType<T>) ptype, conf)); } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f70a6df8/src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java b/src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java index 24dec2d..462ef93 100644 --- a/src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java +++ b/src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java @@ -26,18 +26,16 @@ import com.cloudera.crunch.io.ReadableSource; import com.cloudera.crunch.io.impl.FileSourceImpl; import com.cloudera.crunch.types.PType; -public class SeqFileSource<T> extends FileSourceImpl<T> implements - ReadableSource<T> { +public class SeqFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T> { public SeqFileSource(Path path, PType<T> ptype) { - super(path, ptype, SequenceFileInputFormat.class); + super(path, ptype, SequenceFileInputFormat.class); } - + @Override public Iterable<T> read(Configuration conf) throws IOException { - FileSystem fs = FileSystem.get(conf); - return CompositePathIterable.create(fs, path, - new SeqFileReaderFactory<T>(ptype, conf)); + FileSystem fs = FileSystem.get(path.toUri(), conf); + return CompositePathIterable.create(fs, path, new SeqFileReaderFactory<T>(ptype, conf)); } @Override http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f70a6df8/src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java b/src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java index 69ca12b..4db6658 100644 --- a/src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java +++ b/src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java @@ -42,7 +42,7 @@ public class SeqFileTableSource<K, V> extends FileTableSourceImpl<K, V> implemen @Override public Iterable<Pair<K, V>> read(Configuration conf) throws IOException { - FileSystem fs = FileSystem.get(conf); + FileSystem fs = FileSystem.get(path.toUri(), conf); return CompositePathIterable.create(fs, path, new SeqFileTableReaderFactory<K, V>((PTableType<K, V>) ptype, conf)); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f70a6df8/src/main/java/com/cloudera/crunch/io/text/TextFileSource.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/io/text/TextFileSource.java b/src/main/java/com/cloudera/crunch/io/text/TextFileSource.java index a876843..e0dbe68 100644 --- a/src/main/java/com/cloudera/crunch/io/text/TextFileSource.java +++ b/src/main/java/com/cloudera/crunch/io/text/TextFileSource.java @@ -67,7 +67,7 @@ public class TextFileSource<T> extends FileSourceImpl<T> implements @Override public Iterable<T> read(Configuration conf) throws IOException { - return CompositePathIterable.create(FileSystem.get(conf), path, - new TextFileReaderFactory<T>(ptype, conf)); + return CompositePathIterable.create(FileSystem.get(path.toUri(), conf), path, + new TextFileReaderFactory<T>(ptype, conf)); } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f70a6df8/src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java b/src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java index 958b010..8072e07 100644 --- a/src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java +++ b/src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java @@ -2,6 +2,8 @@ package com.cloudera.crunch.lib.join; import java.io.IOException; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import com.cloudera.crunch.DoFn; @@ -53,21 +55,25 @@ public class MapsideJoin { MRPipeline pipeline = (MRPipeline) right.getPipeline(); pipeline.materialize(right); - // TODO Make this method internal to MRPipeline so that we don't run once - // for every separate MapsideJoin at the same level + // TODO Move necessary logic to MRPipeline so that we can theoretically + // optimize his by running the setup of multiple map-side joins concurrently pipeline.run(); - // TODO Verify that this cast is safe -- are there any situations where this - // wouldn't work? - SourcePathTargetImpl<Pair<K, V>> sourcePathTarget = (SourcePathTargetImpl<Pair<K, V>>) pipeline + ReadableSourceTarget<Pair<K, V>> readableSourceTarget = pipeline .getMaterializeSourceTarget(right); + if (!(readableSourceTarget instanceof SourcePathTargetImpl)) { + throw new CrunchRuntimeException("Right-side contents can't be read from a path"); + } - // TODO Put the data in the distributed cache + // Suppress warnings because we've just checked this cast via instanceof + @SuppressWarnings("unchecked") + SourcePathTargetImpl<Pair<K, V>> sourcePathTarget = (SourcePathTargetImpl<Pair<K, V>>) readableSourceTarget; Path path = sourcePathTarget.getPath(); - PType<Pair<K, V>> pType = right.getPType(); + DistributedCache.addCacheFile(path.toUri(), pipeline.getConfiguration()); - MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(path.toString(), pType); + MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(path.toString(), + right.getPType()); PTypeFamily typeFamily = left.getTypeFamily(); return left.parallelDo( "mapjoin", @@ -79,21 +85,36 @@ public class MapsideJoin { static class MapsideJoinDoFn<K, U, V> extends DoFn<Pair<K, U>, Pair<K, Pair<U, V>>> { - private String path; + private String inputPath; private PType<Pair<K, V>> ptype; private Multimap<K, V> joinMap; - public MapsideJoinDoFn(String path, PType<Pair<K, V>> ptype) { - this.path = path; + public MapsideJoinDoFn(String inputPath, PType<Pair<K, V>> ptype) { + this.inputPath = inputPath; this.ptype = ptype; } + private Path getCacheFilePath() { + try { + for (Path localPath : DistributedCache.getLocalCacheFiles(getConfiguration())) { + if (localPath.toString().endsWith(inputPath)) { + return localPath.makeQualified(FileSystem.getLocal(getConfiguration())); + + } + } + } catch (IOException e) { + throw new CrunchRuntimeException(e); + } + + throw new CrunchRuntimeException("Can't find local cache file for '" + inputPath + "'"); + } + @Override public void initialize() { super.initialize(); ReadableSourceTarget<Pair<K, V>> sourceTarget = (ReadableSourceTarget<Pair<K, V>>) ptype - .getDefaultFileSource(new Path(path)); + .getDefaultFileSource(getCacheFilePath()); Iterable<Pair<K, V>> iterable = null; try { iterable = sourceTarget.read(getConfiguration());
