On Fri, Jul 6, 2012 at 10:09 AM, Gabriel Reid <[email protected]>wrote:
> It definitely has a significant performance gain when dealing with big > records that result in a slow shuffle. > > By the way, I discovered (confirmed) an issue with the normal join > implementation, where objects are held on to without using a deep copy. I > was thinking of fixing that this evening -- are you planning on doing the > big package rename today? > It's a python script that does most of the renaming work, so I can do it this weekend. It'll be a pleasant surprise when everyone comes into work on Monday AM. ;) > > On 06 Jul 2012, at 18:57, Josh Wills <[email protected]> wrote: > > > Not a problem man-- can't wait to play with it this weekend! > > > > On Fri, Jul 6, 2012 at 9:50 AM, Gabriel Reid <[email protected]> > wrote: > >> Hi Josh, > >> > >> I've just committed the map side joins, after doing some more > >> extensive testing on it today. Unfortunately, I forgot to squash the > >> multiple commits into a single one (as I was planning on doing), but > >> it's all in there and working. > >> > >> - Gabriel > >> > >> On Fri, Jul 6, 2012 at 7:28 AM, Gabriel Reid <[email protected]> > wrote: > >>> Thanks for making the JIRA issue. > >>> > >>> I was going to do some more testing an a "real" cluster later on > today, and as long as that all checks out then this will be ready to go in > as far as I'm concerned. > >>> > >>> I don't see any reason to hold back on the big renaming for this patch > -- if you're ready to do the package renaming, please go ahead. > >>> > >>> As long as my testing later today checks out and there aren't any > glaring issues from anyone who has tried the patch out, I'll probably be > checking the patch in later today (so it'll probably be in before you do > the renaming anyhow). > >>> > >>> - Gabriel > >>> > >>> > >>> On Friday 6 July 2012 at 00:11, Josh Wills wrote: > >>> > >>>> FYI: https://issues.apache.org/jira/browse/CRUNCH-3 to track this. > >>>> > >>>> Gabriel, is your feeling that this is ready to go in? I'm debating > whether > >>>> or not to check this in before/after I do the massive com.cloudera -> > >>>> org.apache renaming. What do you think? > >>>> > >>>> On Tue, Jul 3, 2012 at 1:25 PM, Gabriel Reid > >>>> <[email protected](mailto: > [email protected])> wrote: > >>>> > >>>>> Hi Joe, > >>>>> > >>>>> Looking forward to hearing your feedback! This is still just a patch > and > >>>>> not committed yet, and there's still definitely room for help (i.e. > ideas > >>>>> on how to improve it, or do it totally differently), so certainly > let me > >>>>> know if you've got some ideas. > >>>>> > >>>>> - Gabriel > >>>>> > >>>>> > >>>>> On Tuesday 3 July 2012 at 18:01, Joseph Adler wrote: > >>>>> > >>>>>> Awesome! Will give this a try soon... wish I could have helped with > this > >>>>> one... > >>>>>> > >>>>>> On Tue, Jul 3, 2012 at 1:53 AM, Gabriel Reid < > [email protected] (mailto:[email protected])(mailto: > >>>>> [email protected] (mailto:[email protected]))> wrote: > >>>>>>> Thanks for pointing that out Chris. I'm guessing the mailing list > is > >>>>>>> stripping out attachments(?). Once the JIRA is up and running then > >>>>>>> that will be taken care of I guess. > >>>>>>> > >>>>>>> The mime type on the last attempt was application/octet-stream, so > >>>>>>> I've renamed this to a .txt file to try to ensure that it'll get a > >>>>>>> text/plain mime type (although I don't know if that'll make a > >>>>>>> difference). I've also pasted it inline below, hopefully one of > those > >>>>>>> solutions works. > >>>>>>> > >>>>>>> - Gabriel > >>>>>>> > >>>>>>> > >>>>>>> diff --git > src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java > >>>>>>> src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java > >>>>>>> index 420e8dc..c8ba596 100644 > >>>>>>> --- src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java > >>>>>>> +++ src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java > >>>>>>> @@ -59,9 +59,9 @@ import com.google.common.collect.Sets; > >>>>>>> public class MRPipeline implements Pipeline { > >>>>>>> > >>>>>>> private static final Log LOG = LogFactory.getLog(MRPipeline.class); > >>>>>>> - > >>>>>>> + > >>>>>>> private static final Random RANDOM = new Random(); > >>>>>>> - > >>>>>>> + > >>>>>>> private final Class<?> jarClass; > >>>>>>> private final String name; > >>>>>>> private final Map<PCollectionImpl<?>, Set<Target>> outputTargets; > >>>>>>> @@ -75,15 +75,15 @@ public class MRPipeline implements Pipeline { > >>>>>>> public MRPipeline(Class<?> jarClass) throws IOException { > >>>>>>> this(jarClass, new Configuration()); > >>>>>>> } > >>>>>>> - > >>>>>>> - public MRPipeline(Class<?> jarClass, String name){ > >>>>>>> + > >>>>>>> + public MRPipeline(Class<?> jarClass, String name) { > >>>>>>> this(jarClass, name, new Configuration()); > >>>>>>> } > >>>>>>> - > >>>>>>> + > >>>>>>> public MRPipeline(Class<?> jarClass, Configuration conf) { > >>>>>>> - this(jarClass, jarClass.getName(), conf); > >>>>>>> + this(jarClass, jarClass.getName(), conf); > >>>>>>> } > >>>>>>> - > >>>>>>> + > >>>>>>> public MRPipeline(Class<?> jarClass, String name, Configuration > conf) { > >>>>>>> this.jarClass = jarClass; > >>>>>>> this.name (http://this.name) = name; > >>>>>>> @@ -102,9 +102,9 @@ public class MRPipeline implements Pipeline { > >>>>>>> > >>>>>>> @Override > >>>>>>> public void setConfiguration(Configuration conf) { > >>>>>>> - this.conf = conf; > >>>>>>> + this.conf = conf; > >>>>>>> } > >>>>>>> - > >>>>>>> + > >>>>>>> @Override > >>>>>>> public PipelineResult run() { > >>>>>>> MSCRPlanner planner = new MSCRPlanner(this, outputTargets); > >>>>>>> @@ -125,8 +125,8 @@ public class MRPipeline implements Pipeline { > >>>>>>> boolean materialized = false; > >>>>>>> for (Target t : outputTargets.get(c)) { > >>>>>>> if (!materialized && t instanceof Source) { > >>>>>>> - c.materializeAt((SourceTarget) t); > >>>>>>> - materialized = true; > >>>>>>> + c.materializeAt((SourceTarget) t); > >>>>>>> + materialized = true; > >>>>>>> } > >>>>>>> } > >>>>>>> } > >>>>>>> @@ -144,7 +144,7 @@ public class MRPipeline implements Pipeline { > >>>>>>> cleanup(); > >>>>>>> return res; > >>>>>>> } > >>>>>>> - > >>>>>>> + > >>>>>>> public <S> PCollection<S> read(Source<S> source) { > >>>>>>> return new InputCollection<S>(source, this); > >>>>>>> } > >>>>>>> @@ -160,85 +160,120 @@ public class MRPipeline implements Pipeline > { > >>>>>>> @SuppressWarnings("unchecked") > >>>>>>> public void write(PCollection<?> pcollection, Target target) { > >>>>>>> if (pcollection instanceof PGroupedTableImpl) { > >>>>>>> - pcollection = ((PGroupedTableImpl<?,?>) pcollection).ungroup(); > >>>>>>> + pcollection = ((PGroupedTableImpl<?, ?>) pcollection).ungroup(); > >>>>>>> } else if (pcollection instanceof UnionCollection || pcollection > >>>>>>> instanceof UnionTable) { > >>>>>>> - pcollection = pcollection.parallelDo("UnionCollectionWrapper", > >>>>>>> - (MapFn)IdentityFn.<Object>getInstance(), pcollection.getPType()); > >>>>>>> + pcollection = pcollection.parallelDo("UnionCollectionWrapper", > >>>>>>> + (MapFn) IdentityFn.<Object> getInstance(), > pcollection.getPType()); > >>>>>>> } > >>>>>>> addOutput((PCollectionImpl<?>) pcollection, target); > >>>>>>> } > >>>>>>> > >>>>>>> private void addOutput(PCollectionImpl<?> impl, Target target) { > >>>>>>> if (!outputTargets.containsKey(impl)) { > >>>>>>> - outputTargets.put(impl, Sets.<Target>newHashSet()); > >>>>>>> + outputTargets.put(impl, Sets.<Target> newHashSet()); > >>>>>>> } > >>>>>>> outputTargets.get(impl).add(target); > >>>>>>> } > >>>>>>> - > >>>>>>> + > >>>>>>> @Override > >>>>>>> public <T> Iterable<T> materialize(PCollection<T> pcollection) { > >>>>>>> - > >>>>>>> - if (pcollection instanceof UnionCollection) { > >>>>>>> - pcollection = pcollection.parallelDo("UnionCollectionWrapper", > >>>>>>> - (MapFn)IdentityFn.<Object>getInstance(), pcollection.getPType()); > >>>>>>> - } > >>>>>>> - PCollectionImpl<T> impl = (PCollectionImpl<T>) pcollection; > >>>>>>> + > >>>>>>> + PCollectionImpl<T> pcollectionImpl = > toPcollectionImpl(pcollection); > >>>>>>> + ReadableSourceTarget<T> srcTarget = > >>>>>>> getMaterializeSourceTarget(pcollectionImpl); > >>>>>>> + > >>>>>>> + MaterializableIterable<T> c = new MaterializableIterable<T>(this, > >>>>>>> srcTarget); > >>>>>>> + if (!outputTargetsToMaterialize.containsKey(pcollectionImpl)) { > >>>>>>> + outputTargetsToMaterialize.put(pcollectionImpl, c); > >>>>>>> + } > >>>>>>> + return c; > >>>>>>> + } > >>>>>>> + > >>>>>>> + /** > >>>>>>> + * Retrieve a ReadableSourceTarget that provides access to the > >>>>>> > >>>>> > >>>>> > >>>>> contents of a > >>>>>>> + * {@link PCollection}. This is primarily intended as a helper > method > >>>>>> > >>>>> > >>>>> > >>>>> to > >>>>>>> + * {@link #materialize(PCollection)}. The underlying data of the > >>>>>>> + * ReadableSourceTarget may not be actually present until the > >>>>>>> pipeline is run. > >>>>>>> + * > >>>>>>> + * @param pcollection > >>>>>>> + * The collection for which the ReadableSourceTarget is to be > >>>>>>> + * retrieved > >>>>>>> + * @return The ReadableSourceTarget > >>>>>>> + * @throws IllegalArgumentException > >>>>>>> + * If no ReadableSourceTarget can be retrieved for the given > >>>>>>> + * PCollection > >>>>>>> + */ > >>>>>>> + public <T> ReadableSourceTarget<T> > >>>>>>> getMaterializeSourceTarget(PCollection<T> pcollection) { > >>>>>>> + PCollectionImpl<T> impl = toPcollectionImpl(pcollection); > >>>>>>> SourceTarget<T> matTarget = impl.getMaterializedAt(); > >>>>>>> if (matTarget != null && matTarget instanceof > ReadableSourceTarget) { > >>>>>>> - return new MaterializableIterable<T>(this, > >>>>>>> (ReadableSourceTarget<T>) matTarget); > >>>>>>> + return (ReadableSourceTarget<T>) matTarget; > >>>>>>> + } > >>>>>>> + > >>>>>>> + ReadableSourceTarget<T> srcTarget = null; > >>>>>>> + if (outputTargets.containsKey(pcollection)) { > >>>>>>> + for (Target target : outputTargets.get(impl)) { > >>>>>>> + if (target instanceof ReadableSourceTarget) { > >>>>>>> + srcTarget = (ReadableSourceTarget<T>) target; > >>>>>>> + break; > >>>>>>> + } > >>>>>>> + } > >>>>>>> } > >>>>>>> - > >>>>>>> - ReadableSourceTarget<T> srcTarget = null; > >>>>>>> - if (outputTargets.containsKey(pcollection)) { > >>>>>>> - for (Target target : outputTargets.get(impl)) { > >>>>>>> - if (target instanceof ReadableSourceTarget) { > >>>>>>> - srcTarget = (ReadableSourceTarget) target; > >>>>>>> - break; > >>>>>>> - } > >>>>>>> - } > >>>>>>> - } > >>>>>>> - > >>>>>>> - if (srcTarget == null) { > >>>>>>> - SourceTarget<T> st = > >>>>>> > >>>>> > >>>>> > >>>>> createIntermediateOutput(pcollection.getPType()); > >>>>>>> - if (!(st instanceof ReadableSourceTarget)) { > >>>>>>> - throw new IllegalArgumentException("The PType for the given > >>>>>>> PCollection is not readable" > >>>>>>> - + " and cannot be materialized"); > >>>>>>> - } else { > >>>>>>> - srcTarget = (ReadableSourceTarget) st; > >>>>>>> - addOutput(impl, srcTarget); > >>>>>>> - } > >>>>>>> - } > >>>>>>> - > >>>>>>> - MaterializableIterable<T> c = new MaterializableIterable<T>(this, > >>>>>> > >>>>> > >>>>> > >>>>> srcTarget); > >>>>>>> - outputTargetsToMaterialize.put(impl, c); > >>>>>>> - return c; > >>>>>>> + > >>>>>>> + if (srcTarget == null) { > >>>>>>> + SourceTarget<T> st = > >>>>>> > >>>>> > >>>>> > >>>>> createIntermediateOutput(pcollection.getPType()); > >>>>>>> + if (!(st instanceof ReadableSourceTarget)) { > >>>>>>> + throw new IllegalArgumentException("The PType for the given > >>>>>>> PCollection is not readable" > >>>>>>> + + " and cannot be materialized"); > >>>>>>> + } else { > >>>>>>> + srcTarget = (ReadableSourceTarget<T>) st; > >>>>>>> + addOutput(impl, srcTarget); > >>>>>>> + } > >>>>>>> + } > >>>>>>> + > >>>>>>> + return srcTarget; > >>>>>>> + } > >>>>>>> + > >>>>>>> + /** > >>>>>>> + * Safely cast a PCollection into a PCollectionImpl, including > >>>>>>> handling the case of UnionCollections. > >>>>>>> + * @param pcollection The PCollection to be cast/transformed > >>>>>>> + * @return The PCollectionImpl representation > >>>>>>> + */ > >>>>>>> + private <T> PCollectionImpl<T> toPcollectionImpl(PCollection<T> > >>>>>>> pcollection) { > >>>>>>> + PCollectionImpl<T> pcollectionImpl = null; > >>>>>>> + if (pcollection instanceof UnionCollection) { > >>>>>>> + pcollectionImpl = (PCollectionImpl<T>) > >>>>>>> pcollection.parallelDo("UnionCollectionWrapper", > >>>>>>> + (MapFn) IdentityFn.<Object> getInstance(), > pcollection.getPType()); > >>>>>>> + } else { > >>>>>>> + pcollectionImpl = (PCollectionImpl<T>) pcollection; > >>>>>>> + } > >>>>>>> + return pcollectionImpl; > >>>>>>> } > >>>>>>> > >>>>>>> public <T> SourceTarget<T> createIntermediateOutput(PType<T> > ptype) { > >>>>>>> - return ptype.getDefaultFileSource(createTempPath()); > >>>>>>> + return ptype.getDefaultFileSource(createTempPath()); > >>>>>>> } > >>>>>>> > >>>>>>> public Path createTempPath() { > >>>>>>> tempFileIndex++; > >>>>>>> return new Path(tempDirectory, "p" + tempFileIndex); > >>>>>>> } > >>>>>>> - > >>>>>>> + > >>>>>>> private static Path createTempDirectory(Configuration conf) { > >>>>>>> Path dir = new Path("/tmp/crunch" + RANDOM.nextInt()); > >>>>>>> - try { > >>>>>>> - FileSystem.get(conf).mkdirs(dir); > >>>>>>> - } catch (IOException e) { > >>>>>>> - LOG.error("Exception creating job output directory", e); > >>>>>>> - throw new RuntimeException(e); > >>>>>>> - } > >>>>>>> + try { > >>>>>>> + FileSystem.get(conf).mkdirs(dir); > >>>>>>> + } catch (IOException e) { > >>>>>>> + LOG.error("Exception creating job output directory", e); > >>>>>>> + throw new RuntimeException(e); > >>>>>>> + } > >>>>>>> return dir; > >>>>>>> } > >>>>>>> - > >>>>>>> + > >>>>>>> @Override > >>>>>>> public <T> void writeTextFile(PCollection<T> pcollection, String > >>>>>> > >>>>> > >>>>> > >>>>> pathName) { > >>>>>>> // Ensure that this is a writable pcollection instance. > >>>>>>> - pcollection = pcollection.parallelDo("asText", > >>>>>> > >>>>> > >>>>> > >>>>> IdentityFn.<T>getInstance(), > >>>>>>> - WritableTypeFamily.getInstance().as(pcollection.getPType())); > >>>>>>> + pcollection = pcollection.parallelDo("asText", IdentityFn.<T> > >>>>>>> getInstance(), WritableTypeFamily > >>>>>>> + .getInstance().as(pcollection.getPType())); > >>>>>>> write(pcollection, At.textFile(pathName)); > >>>>>>> } > >>>>>>> > >>>>>>> @@ -256,7 +291,7 @@ public class MRPipeline implements Pipeline { > >>>>>>> LOG.info (http://LOG.info)("Exception during cleanup", e); > >>>>>>> } > >>>>>>> } > >>>>>>> - > >>>>>>> + > >>>>>>> public int getNextAnonymousStageId() { > >>>>>>> return nextAnonymousStageId++; > >>>>>>> } > >>>>>>> @@ -265,7 +300,7 @@ public class MRPipeline implements Pipeline { > >>>>>>> public void enableDebug() { > >>>>>>> // Turn on Crunch runtime error catching. > >>>>>>> getConfiguration().setBoolean(RuntimeParameters.DEBUG, true); > >>>>>>> - > >>>>>>> + > >>>>>>> // Write Hadoop's WARN logs to the console. > >>>>>>> Logger crunchInfoLogger = > LogManager.getLogger("com.cloudera.crunch"); > >>>>>>> Appender console = crunchInfoLogger.getAppender("A"); > >>>>>>> @@ -277,9 +312,9 @@ public class MRPipeline implements Pipeline { > >>>>>>> LOG.warn("Could not find console appender named 'A' for writing > >>>>>>> Hadoop warning logs"); > >>>>>>> } > >>>>>>> } > >>>>>>> - > >>>>>>> + > >>>>>>> @Override > >>>>>>> public String getName() { > >>>>>>> - return name; > >>>>>>> + return name; > >>>>>>> } > >>>>>>> } > >>>>>>> diff --git > >>>>>> > >>>>> > >>>>> > >>>>> > src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java > >>>>>> > >>>>> > >>>>> > >>>>> > src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java > >>>>>>> index d41a52e..68ef054 100644 > >>>>>>> --- > >>>>>> > >>>>> > >>>>> > >>>>> > src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java > >>>>>>> +++ > >>>>>> > >>>>> > >>>>> > >>>>> > src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java > >>>>>>> @@ -12,6 +12,10 @@ public class CrunchRuntimeException extends > >>>>>>> RuntimeException { > >>>>>>> super(e); > >>>>>>> } > >>>>>>> > >>>>>>> + public CrunchRuntimeException(String msg, Exception e) { > >>>>>>> + super(msg, e); > >>>>>>> + } > >>>>>>> + > >>>>>>> public boolean wasLogged() { > >>>>>>> return logged; > >>>>>>> } > >>>>>>> diff --git > >>>>>> > >>>>> > >>>>> > >>>>> src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java > >>>>>>> src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java > >>>>>>> index 1122d62..4debfeb 100644 > >>>>>>> --- src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java > >>>>>>> +++ src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java > >>>>>>> @@ -45,7 +45,8 @@ public class AvroFileSource<T> extends > >>>>>>> FileSourceImpl<T> implements ReadableSour > >>>>>>> > >>>>>>> @Override > >>>>>>> public Iterable<T> read(Configuration conf) throws IOException { > >>>>>>> - return CompositePathIterable.create(FileSystem.get(conf), path, > >>>>>>> new AvroFileReaderFactory<T>( > >>>>>>> + FileSystem fs = FileSystem.get(path.toUri(), conf); > >>>>>>> + return CompositePathIterable.create(fs, path, new > >>>>>> > >>>>> > >>>>> > >>>>> AvroFileReaderFactory<T>( > >>>>>>> (AvroType<T>) ptype, conf)); > >>>>>>> } > >>>>>>> } > >>>>>>> diff --git > src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java > >>>>>>> src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java > >>>>>>> index 24dec2d..462ef93 100644 > >>>>>>> --- src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java > >>>>>>> +++ src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java > >>>>>>> @@ -26,18 +26,16 @@ import com.cloudera.crunch.io.ReadableSource; > >>>>>>> import com.cloudera.crunch.io.impl.FileSourceImpl; > >>>>>>> import com.cloudera.crunch.types.PType; > >>>>>>> > >>>>>>> -public class SeqFileSource<T> extends FileSourceImpl<T> implements > >>>>>>> - ReadableSource<T> { > >>>>>>> +public class SeqFileSource<T> extends FileSourceImpl<T> implements > >>>>>>> ReadableSource<T> { > >>>>>>> > >>>>>>> public SeqFileSource(Path path, PType<T> ptype) { > >>>>>>> - super(path, ptype, SequenceFileInputFormat.class); > >>>>>>> + super(path, ptype, SequenceFileInputFormat.class); > >>>>>>> } > >>>>>>> - > >>>>>>> + > >>>>>>> @Override > >>>>>>> public Iterable<T> read(Configuration conf) throws IOException { > >>>>>>> - FileSystem fs = FileSystem.get(conf); > >>>>>>> - return CompositePathIterable.create(fs, path, > >>>>>>> - new SeqFileReaderFactory<T>(ptype, conf)); > >>>>>>> + FileSystem fs = FileSystem.get(path.toUri(), conf); > >>>>>>> + return CompositePathIterable.create(fs, path, new > >>>>>>> SeqFileReaderFactory<T>(ptype, conf)); > >>>>>>> } > >>>>>>> > >>>>>>> @Override > >>>>>>> diff --git > >>>>>> > >>>>> > >>>>> > >>>>> src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java > >>>>>>> src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java > >>>>>>> index 69ca12b..4db6658 100644 > >>>>>>> --- > src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java > >>>>>>> +++ > src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java > >>>>>>> @@ -42,7 +42,7 @@ public class SeqFileTableSource<K, V> extends > >>>>>>> FileTableSourceImpl<K, V> implemen > >>>>>>> > >>>>>>> @Override > >>>>>>> public Iterable<Pair<K, V>> read(Configuration conf) throws > >>>>>> > >>>>> > >>>>> > >>>>> IOException { > >>>>>>> - FileSystem fs = FileSystem.get(conf); > >>>>>>> + FileSystem fs = FileSystem.get(path.toUri(), conf); > >>>>>>> return CompositePathIterable.create(fs, path, > >>>>>>> new SeqFileTableReaderFactory<K, V>((PTableType<K, V>) ptype, > conf)); > >>>>>>> } > >>>>>>> diff --git > >>>>>> > >>>>> > >>>>> > >>>>> src/main/java/com/cloudera/crunch/io/text/TextFileSource.java > >>>>>>> src/main/java/com/cloudera/crunch/io/text/TextFileSource.java > >>>>>>> index a876843..e0dbe68 100644 > >>>>>>> --- src/main/java/com/cloudera/crunch/io/text/TextFileSource.java > >>>>>>> +++ src/main/java/com/cloudera/crunch/io/text/TextFileSource.java > >>>>>>> @@ -67,7 +67,7 @@ public class TextFileSource<T> extends > >>>>>>> FileSourceImpl<T> implements > >>>>>>> > >>>>>>> @Override > >>>>>>> public Iterable<T> read(Configuration conf) throws IOException { > >>>>>>> - return CompositePathIterable.create(FileSystem.get(conf), path, > >>>>>>> - new TextFileReaderFactory<T>(ptype, conf)); > >>>>>>> + return CompositePathIterable.create(FileSystem.get(path.toUri(), > >>>>>>> conf), path, > >>>>>>> + new TextFileReaderFactory<T>(ptype, conf)); > >>>>>>> } > >>>>>>> } > >>>>>>> diff --git > src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java > >>>>>>> src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java > >>>>>>> new file mode 100644 > >>>>>>> index 0000000..8072e07 > >>>>>>> --- /dev/null > >>>>>>> +++ src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java > >>>>>>> @@ -0,0 +1,143 @@ > >>>>>>> +package com.cloudera.crunch.lib.join; > >>>>>>> + > >>>>>>> +import java.io.IOException; > >>>>>>> + > >>>>>>> +import org.apache.hadoop.filecache.DistributedCache; > >>>>>>> +import org.apache.hadoop.fs.FileSystem; > >>>>>>> +import org.apache.hadoop.fs.Path; > >>>>>>> + > >>>>>>> +import com.cloudera.crunch.DoFn; > >>>>>>> +import com.cloudera.crunch.Emitter; > >>>>>>> +import com.cloudera.crunch.PTable; > >>>>>>> +import com.cloudera.crunch.Pair; > >>>>>>> +import com.cloudera.crunch.impl.mr.MRPipeline; > >>>>>>> +import com.cloudera.crunch.impl.mr.run.CrunchRuntimeException; > >>>>>>> +import com.cloudera.crunch.io.ReadableSourceTarget; > >>>>>>> +import com.cloudera.crunch.io.impl.SourcePathTargetImpl; > >>>>>>> +import com.cloudera.crunch.types.PType; > >>>>>>> +import com.cloudera.crunch.types.PTypeFamily; > >>>>>>> +import com.google.common.collect.ArrayListMultimap; > >>>>>>> +import com.google.common.collect.Multimap; > >>>>>>> + > >>>>>>> +/** > >>>>>>> + * Utility for doing map side joins on a common key between two > >>>>>>> {@link PTable}s. > >>>>>>> + * <p> > >>>>>>> + * A map side join is an optimized join which doesn't use a > reducer; > >>>>>> > >>>>> > >>>>> > >>>>> instead, > >>>>>>> + * the right side of the join is loaded into memory and the join > is > >>>>>>> performed in > >>>>>>> + * a mapper. This style of join has the important implication that > >>>>>>> the output of > >>>>>>> + * the join is not sorted, which is the case with a conventional > >>>>>>> (reducer-based) > >>>>>>> + * join. > >>>>>>> + * <p> > >>>>>>> + * <b>Note:</b>This utility is only supported when running with a > >>>>>>> + * {@link MRPipeline} as the pipeline. > >>>>>>> + */ > >>>>>>> +public class MapsideJoin { > >>>>>>> + > >>>>>>> + /** > >>>>>>> + * Join two tables using a map side join. The right-side table > will > >>>>>> > >>>>> > >>>>> > >>>>> be loaded > >>>>>>> + * fully in memory, so this method should only be used if the > right > >>>>>> > >>>>> > >>>>> > >>>>> side > >>>>>>> + * table's contents can fit in the memory allocated to mappers. > The > >>>>>> > >>>>> > >>>>> > >>>>> join > >>>>>>> + * performed by this method is an inner join. > >>>>>>> + * > >>>>>>> + * @param left > >>>>>>> + * The left-side table of the join > >>>>>>> + * @param right > >>>>>>> + * The right-side table of the join, whose contents will be fully > >>>>>>> + * read into memory > >>>>>>> + * @return A table keyed on the join key, containing pairs of > joined > >>>>>> > >>>>> > >>>>> > >>>>> values > >>>>>>> + */ > >>>>>>> + public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U> > >>>>>>> left, PTable<K, V> right) { > >>>>>>> + > >>>>>>> + if (!(right.getPipeline() instanceof MRPipeline)) { > >>>>>>> + throw new CrunchRuntimeException("Map-side join is only > >>>>>>> supported within a MapReduce context"); > >>>>>>> + } > >>>>>>> + > >>>>>>> + MRPipeline pipeline = (MRPipeline) right.getPipeline(); > >>>>>>> + pipeline.materialize(right); > >>>>>>> + > >>>>>>> + // TODO Move necessary logic to MRPipeline so that we can > >>>>>> > >>>>> > >>>>> > >>>>> theoretically > >>>>>>> + // optimize his by running the setup of multiple map-side joins > >>>>>>> concurrently > >>>>>>> + pipeline.run(); > >>>>>>> + > >>>>>>> + ReadableSourceTarget<Pair<K, V>> readableSourceTarget = pipeline > >>>>>>> + .getMaterializeSourceTarget(right); > >>>>>>> + if (!(readableSourceTarget instanceof SourcePathTargetImpl)) { > >>>>>>> + throw new CrunchRuntimeException("Right-side contents can't be > >>>>>>> read from a path"); > >>>>>>> + } > >>>>>>> + > >>>>>>> + // Suppress warnings because we've just checked this cast via > >>>>>> > >>>>> > >>>>> > >>>>> instanceof > >>>>>>> + @SuppressWarnings("unchecked") > >>>>>>> + SourcePathTargetImpl<Pair<K, V>> sourcePathTarget = > >>>>>>> (SourcePathTargetImpl<Pair<K, V>>) readableSourceTarget; > >>>>>>> + > >>>>>>> + Path path = sourcePathTarget.getPath(); > >>>>>>> + DistributedCache.addCacheFile(path.toUri(), > >>>>>> > >>>>> > >>>>> > >>>>> pipeline.getConfiguration()); > >>>>>>> + > >>>>>>> + MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, > >>>>>>> V>(path.toString(), > >>>>>>> + right.getPType()); > >>>>>>> + PTypeFamily typeFamily = left.getTypeFamily(); > >>>>>>> + return left.parallelDo( > >>>>>>> + "mapjoin", > >>>>>>> + mapJoinDoFn, > >>>>>>> + typeFamily.tableOf(left.getKeyType(), > >>>>>>> + typeFamily.pairs(left.getValueType(), right.getValueType()))); > >>>>>>> + > >>>>>>> + } > >>>>>>> + > >>>>>>> + static class MapsideJoinDoFn<K, U, V> extends DoFn<Pair<K, U>, > >>>>>>> Pair<K, Pair<U, V>>> { > >>>>>>> + > >>>>>>> + private String inputPath; > >>>>>>> + private PType<Pair<K, V>> ptype; > >>>>>>> + private Multimap<K, V> joinMap; > >>>>>>> + > >>>>>>> + public MapsideJoinDoFn(String inputPath, PType<Pair<K, V>> > ptype) { > >>>>>>> + this.inputPath = inputPath; > >>>>>>> + this.ptype = ptype; > >>>>>>> + } > >>>>>>> + > >>>>>>> + private Path getCacheFilePath() { > >>>>>>> + try { > >>>>>>> + for (Path localPath : > >>>>>>> DistributedCache.getLocalCacheFiles(getConfiguration())) { > >>>>>>> + if (localPath.toString().endsWith(inputPath)) { > >>>>>>> + return > >>>>>>> localPath.makeQualified(FileSystem.getLocal(getConfiguration())); > >>>>>>> + > >>>>>>> + } > >>>>>>> + } > >>>>>>> + } catch (IOException e) { > >>>>>>> + throw new CrunchRuntimeException(e); > >>>>>>> + } > >>>>>>> + > >>>>>>> + throw new CrunchRuntimeException("Can't find local cache file > >>>>>>> for '" + inputPath + "'"); > >>>>>>> + } > >>>>>>> + > >>>>>>> + @Override > >>>>>>> + public void initialize() { > >>>>>>> + super.initialize(); > >>>>>>> + > >>>>>>> + ReadableSourceTarget<Pair<K, V>> sourceTarget = > >>>>>>> (ReadableSourceTarget<Pair<K, V>>) ptype > >>>>>>> + .getDefaultFileSource(getCacheFilePath()); > >>>>>>> + Iterable<Pair<K, V>> iterable = null; > >>>>>>> + try { > >>>>>>> + iterable = sourceTarget.read(getConfiguration()); > >>>>>>> + } catch (IOException e) { > >>>>>>> + throw new CrunchRuntimeException("Error reading right-side of > >>>>>>> map side join: ", e); > >>>>>>> + } > >>>>>>> + > >>>>>>> + joinMap = ArrayListMultimap.create(); > >>>>>>> + for (Pair<K, V> joinPair : iterable) { > >>>>>>> + joinMap.put(joinPair.first(), joinPair.second()); > >>>>>>> + } > >>>>>>> + } > >>>>>>> + > >>>>>>> + @Override > >>>>>>> + public void process(Pair<K, U> input, Emitter<Pair<K, Pair<U, > >>>>>>> V>>> emitter) { > >>>>>>> + K key = input.first(); > >>>>>>> + U value = input.second(); > >>>>>>> + for (V joinValue : joinMap.get(key)) { > >>>>>>> + Pair<U, V> valuePair = Pair.of(value, joinValue); > >>>>>>> + emitter.emit(Pair.of(key, valuePair)); > >>>>>>> + } > >>>>>>> + } > >>>>>>> + > >>>>>>> + } > >>>>>>> + > >>>>>>> +} > >>>>>>> diff --git src/main/java/com/cloudera/crunch/types/PType.java > >>>>>>> src/main/java/com/cloudera/crunch/types/PType.java > >>>>>>> index af4ef1b..ae480aa 100644 > >>>>>>> --- src/main/java/com/cloudera/crunch/types/PType.java > >>>>>>> +++ src/main/java/com/cloudera/crunch/types/PType.java > >>>>>>> @@ -15,6 +15,7 @@ > >>>>>>> > >>>>>>> package com.cloudera.crunch.types; > >>>>>>> > >>>>>>> +import java.io.Serializable; > >>>>>>> import java.util.List; > >>>>>>> > >>>>>>> import org.apache.hadoop.fs.Path; > >>>>>>> @@ -31,7 +32,7 @@ import com.cloudera.crunch.SourceTarget; > >>>>>>> * {@code PCollection}. > >>>>>>> * > >>>>>>> */ > >>>>>>> -public interface PType<T> { > >>>>>>> +public interface PType<T> extends Serializable { > >>>>>>> /** > >>>>>>> * Returns the Java type represented by this {@code PType}. > >>>>>>> */ > >>>>>>> diff --git > src/main/java/com/cloudera/crunch/types/avro/AvroType.java > >>>>>>> src/main/java/com/cloudera/crunch/types/avro/AvroType.java > >>>>>>> index 3db00c0..29af9fb 100644 > >>>>>>> --- src/main/java/com/cloudera/crunch/types/avro/AvroType.java > >>>>>>> +++ src/main/java/com/cloudera/crunch/types/avro/AvroType.java > >>>>>>> @@ -14,6 +14,7 @@ > >>>>>>> */ > >>>>>>> package com.cloudera.crunch.types.avro; > >>>>>>> > >>>>>>> +import java.io.Serializable; > >>>>>>> import java.util.List; > >>>>>>> > >>>>>>> import org.apache.avro.Schema; > >>>>>>> @@ -41,7 +42,8 @@ public class AvroType<T> implements PType<T> { > >>>>>>> private static final Converter AVRO_CONVERTER = new > AvroKeyConverter(); > >>>>>>> > >>>>>>> private final Class<T> typeClass; > >>>>>>> - private final Schema schema; > >>>>>>> + private final String schemaString; > >>>>>>> + private transient Schema schema; > >>>>>>> private final MapFn baseInputMapFn; > >>>>>>> private final MapFn baseOutputMapFn; > >>>>>>> private final List<PType> subTypes; > >>>>>>> @@ -55,6 +57,7 @@ public class AvroType<T> implements PType<T> { > >>>>>>> MapFn outputMapFn, PType... ptypes) { > >>>>>>> this.typeClass = typeClass; > >>>>>>> this.schema = Preconditions.checkNotNull(schema); > >>>>>>> + this.schemaString = schema.toString(); > >>>>>>> this.baseInputMapFn = inputMapFn; > >>>>>>> this.baseOutputMapFn = outputMapFn; > >>>>>>> this.subTypes = ImmutableList.<PType> > builder().add(ptypes).build(); > >>>>>>> @@ -76,6 +79,9 @@ public class AvroType<T> implements PType<T> { > >>>>>>> } > >>>>>>> > >>>>>>> public Schema getSchema() { > >>>>>>> + if (schema == null){ > >>>>>>> + schema = new Schema.Parser().parse(schemaString); > >>>>>>> + } > >>>>>>> return schema; > >>>>>>> } > >>>>>>> > >>>>>>> diff --git > >>>>> src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java > >>>>>>> src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java > >>>>>>> new file mode 100644 > >>>>>>> index 0000000..f265460 > >>>>>>> --- /dev/null > >>>>>>> +++ src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java > >>>>>>> @@ -0,0 +1,60 @@ > >>>>>>> +package com.cloudera.crunch.impl.mr; > >>>>>>> + > >>>>>>> +import static org.junit.Assert.assertEquals; > >>>>>>> +import static org.mockito.Mockito.doReturn; > >>>>>>> +import static org.mockito.Mockito.mock; > >>>>>>> +import static org.mockito.Mockito.spy; > >>>>>>> +import static org.mockito.Mockito.when; > >>>>>>> + > >>>>>>> +import java.io.IOException; > >>>>>>> + > >>>>>>> +import org.junit.Before; > >>>>>>> +import org.junit.Test; > >>>>>>> + > >>>>>>> +import com.cloudera.crunch.SourceTarget; > >>>>>>> +import com.cloudera.crunch.impl.mr.collect.PCollectionImpl; > >>>>>>> +import com.cloudera.crunch.io.ReadableSourceTarget; > >>>>>>> +import com.cloudera.crunch.types.avro.Avros; > >>>>>>> + > >>>>>>> +public class MRPipelineTest { > >>>>>>> + > >>>>>>> + private MRPipeline pipeline; > >>>>>>> + > >>>>>>> + @Before > >>>>>>> + public void setUp() throws IOException { > >>>>>>> + pipeline = spy(new MRPipeline(MRPipelineTest.class)); > >>>>>>> + } > >>>>>>> + > >>>>>>> + @Test > >>>>>>> + public void testGetMaterializeSourceTarget_AlreadyMaterialized() > { > >>>>>>> + PCollectionImpl<String> materializedPcollection = > >>>>>>> mock(PCollectionImpl.class); > >>>>>>> + ReadableSourceTarget<String> readableSourceTarget = > >>>>>>> mock(ReadableSourceTarget.class); > >>>>>>> + > >>>>>> > >>>>> > >>>>> > >>>>> > when(materializedPcollection.getMaterializedAt()).thenReturn(readableSourceTarget); > >>>>>>> + > >>>>>>> + assertEquals(readableSourceTarget, > >>>>>>> pipeline.getMaterializeSourceTarget(materializedPcollection)); > >>>>>>> + } > >>>>>>> + > >>>>>>> + @Test > >>>>>>> + public void > >>>>>> > >>>>> > >>>>> > >>>>> testGetMaterializeSourceTarget_NotMaterialized_HasOutput() { > >>>>>>> + > >>>>>>> + PCollectionImpl<String> pcollection = > mock(PCollectionImpl.class); > >>>>>>> + ReadableSourceTarget<String> readableSourceTarget = > >>>>>>> mock(ReadableSourceTarget.class); > >>>>>>> + when(pcollection.getPType()).thenReturn(Avros.strings()); > >>>>>>> + > >>>>>> > >>>>> > >>>>> > >>>>> > doReturn(readableSourceTarget).when(pipeline).createIntermediateOutput(Avros.strings()); > >>>>>>> + when(pcollection.getMaterializedAt()).thenReturn(null); > >>>>>>> + > >>>>>>> + assertEquals(readableSourceTarget, > >>>>>>> pipeline.getMaterializeSourceTarget(pcollection)); > >>>>>>> + } > >>>>>>> + > >>>>>>> + @Test(expected = IllegalArgumentException.class) > >>>>>>> + public void > >>>>>> > >>>>> > >>>>> > >>>>> > testGetMaterializeSourceTarget_NotMaterialized_NotReadableSourceTarget() > >>>>>>> { > >>>>>>> + PCollectionImpl<String> pcollection = > mock(PCollectionImpl.class); > >>>>>>> + SourceTarget<String> nonReadableSourceTarget = > >>>>>> > >>>>> > >>>>> > >>>>> mock(SourceTarget.class); > >>>>>>> + when(pcollection.getPType()).thenReturn(Avros.strings()); > >>>>>>> + > >>>>>> > >>>>> > >>>>> > >>>>> > doReturn(nonReadableSourceTarget).when(pipeline).createIntermediateOutput(Avros.strings()); > >>>>>>> + when(pcollection.getMaterializedAt()).thenReturn(null); > >>>>>>> + > >>>>>>> + pipeline.getMaterializeSourceTarget(pcollection); > >>>>>>> + } > >>>>>>> + > >>>>>>> +} > >>>>>>> diff --git > >>>>>> > >>>>> > >>>>> > >>>>> src/test/java/com/cloudera/crunch/lib/join/MapsideJoinTest.java > >>>>>>> src/test/java/com/cloudera/crunch/lib/join/MapsideJoinTest.java > >>>>>>> new file mode 100644 > >>>>>>> index 0000000..97e0c63 > >>>>>>> --- /dev/null > >>>>>>> +++ src/test/java/com/cloudera/crunch/lib/join/MapsideJoinTest.java > >>>>>>> @@ -0,0 +1,102 @@ > >>>>>>> +package com.cloudera.crunch.lib.join; > >>>>>>> + > >>>>>>> +import static org.junit.Assert.assertEquals; > >>>>>>> +import static org.junit.Assert.assertTrue; > >>>>>>> + > >>>>>>> +import java.io.IOException; > >>>>>>> +import java.util.Collections; > >>>>>>> +import java.util.List; > >>>>>>> + > >>>>>>> +import org.junit.Test; > >>>>>>> + > >>>>>>> +import com.cloudera.crunch.FilterFn; > >>>>>>> +import com.cloudera.crunch.MapFn; > >>>>>>> +import com.cloudera.crunch.PTable; > >>>>>>> +import com.cloudera.crunch.Pair; > >>>>>>> +import com.cloudera.crunch.Pipeline; > >>>>>>> +import com.cloudera.crunch.impl.mem.MemPipeline; > >>>>>>> +import com.cloudera.crunch.impl.mr.MRPipeline; > >>>>>>> +import com.cloudera.crunch.impl.mr.run.CrunchRuntimeException; > >>>>>>> +import com.cloudera.crunch.test.FileHelper; > >>>>>>> +import com.cloudera.crunch.types.writable.Writables; > >>>>>>> +import com.google.common.collect.Lists; > >>>>>>> + > >>>>>>> +public class MapsideJoinTest { > >>>>>>> + > >>>>>>> + private static class LineSplitter extends MapFn<String, > >>>>>>> Pair<Integer, String>> { > >>>>>>> + > >>>>>>> + @Override > >>>>>>> + public Pair<Integer, String> map(String input) { > >>>>>>> + String[] fields = input.split("\\|"); > >>>>>>> + return Pair.of(Integer.parseInt(fields[0]), fields[1]); > >>>>>>> + } > >>>>>>> + > >>>>>>> + } > >>>>>>> + > >>>>>>> + private static class NegativeFilter extends > FilterFn<Pair<Integer, > >>>>>> > >>>>> > >>>>> > >>>>> String>> { > >>>>>>> + > >>>>>>> + @Override > >>>>>>> + public boolean accept(Pair<Integer, String> input) { > >>>>>>> + return false; > >>>>>>> + } > >>>>>>> + > >>>>>>> + } > >>>>>>> + > >>>>>>> + @Test(expected = CrunchRuntimeException.class) > >>>>>>> + public void testNonMapReducePipeline() { > >>>>>>> + runMapsideJoin(MemPipeline.getInstance()); > >>>>>>> + } > >>>>>>> + > >>>>>>> + @Test > >>>>>>> + public void testMapsideJoin_RightSideIsEmpty() throws > IOException { > >>>>>>> + MRPipeline pipeline = new MRPipeline(MapsideJoinTest.class); > >>>>>>> + PTable<Integer, String> customerTable = readTable(pipeline, > >>>>>>> "customers.txt"); > >>>>>>> + PTable<Integer, String> orderTable = readTable(pipeline, > >>>>>> > >>>>> > >>>>> > >>>>> "orders.txt"); > >>>>>>> + > >>>>>>> + PTable<Integer, String> filteredOrderTable = > >>>>>>> orderTable.parallelDo(new NegativeFilter(), > >>>>>>> + orderTable.getPTableType()); > >>>>>>> + > >>>>>>> + PTable<Integer, Pair<String, String>> joined = > >>>>>>> MapsideJoin.join(customerTable, > >>>>>>> + filteredOrderTable); > >>>>>>> + > >>>>>>> + List<Pair<Integer, Pair<String, String>>> materializedJoin = > >>>>>>> Lists.newArrayList(joined > >>>>>>> + .materialize()); > >>>>>>> + > >>>>>>> + assertTrue(materializedJoin.isEmpty()); > >>>>>>> + > >>>>>>> + } > >>>>>>> + > >>>>>>> + @Test > >>>>>>> + public void testMapsideJoin() throws IOException { > >>>>>>> + runMapsideJoin(new MRPipeline(MapsideJoinTest.class)); > >>>>>>> + } > >>>>>>> + > >>>>>>> + private void runMapsideJoin(Pipeline pipeline) { > >>>>>>> + PTable<Integer, String> customerTable = readTable(pipeline, > >>>>>>> "customers.txt"); > >>>>>>> + PTable<Integer, String> orderTable = readTable(pipeline, > >>>>>> > >>>>> > >>>>> > >>>>> "orders.txt"); > >>>>>>> + > >>>>>>> + PTable<Integer, Pair<String, String>> joined = > >>>>>>> MapsideJoin.join(customerTable, orderTable); > >>>>>>> + > >>>>>>> + List<Pair<Integer, Pair<String, String>>> expectedJoinResult = > >>>>>>> Lists.newArrayList(); > >>>>>>> + expectedJoinResult.add(Pair.of(111, Pair.of("John Doe", "Corn > >>>>>> > >>>>> > >>>>> > >>>>> flakes"))); > >>>>>>> + expectedJoinResult.add(Pair.of(222, Pair.of("Jane Doe", "Toilet > >>>>>> > >>>>> > >>>>> > >>>>> paper"))); > >>>>>>> + expectedJoinResult.add(Pair.of(222, Pair.of("Jane Doe", "Toilet > >>>>>>> plunger"))); > >>>>>>> + expectedJoinResult.add(Pair.of(333, Pair.of("Someone Else", > >>>>>>> "Toilet brush"))); > >>>>>>> + > >>>>>>> + List<Pair<Integer, Pair<String, String>>> joinedResultList = > >>>>>>> Lists.newArrayList(joined > >>>>>>> + .materialize()); > >>>>>>> + Collections.sort(joinedResultList); > >>>>>>> + > >>>>>>> + assertEquals(expectedJoinResult, joinedResultList); > >>>>>>> + } > >>>>>>> + > >>>>>>> + private static PTable<Integer, String> readTable(Pipeline > pipeline, > >>>>>>> String filename) { > >>>>>>> + try { > >>>>>>> + return > >>>>>> > >>>>> > >>>>> > >>>>> > pipeline.readTextFile(FileHelper.createTempCopyOf(filename)).parallelDo("asTable", > >>>>>>> + new LineSplitter(), Writables.tableOf(Writables.ints(), > >>>>>>> Writables.strings())); > >>>>>>> + } catch (IOException e) { > >>>>>>> + throw new RuntimeException(e); > >>>>>>> + } > >>>>>>> + } > >>>>>>> + > >>>>>>> +} > >>>>>>> diff --git > src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java > >>>>>>> src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java > >>>>>>> index 74e2ad3..c6a0b46 100644 > >>>>>>> --- src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java > >>>>>>> +++ src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java > >>>>>>> @@ -25,6 +25,7 @@ import org.apache.avro.Schema.Type; > >>>>>>> import org.apache.avro.generic.GenericData; > >>>>>>> import org.apache.avro.util.Utf8; > >>>>>>> import org.apache.hadoop.io.LongWritable; > >>>>>>> +import org.junit.Ignore; > >>>>>>> import org.junit.Test; > >>>>>>> > >>>>>>> import com.cloudera.crunch.Pair; > >>>>>>> @@ -103,6 +104,7 @@ public class AvrosTest { > >>>>>>> } > >>>>>>> > >>>>>>> @Test > >>>>>>> + @Ignore("This test creates an invalid schema that causes > >>>>>>> Schema#toString to fail") > >>>>>>> public void testNestedTables() throws Exception { > >>>>>>> PTableType<Long, Long> pll = Avros.tableOf(Avros.longs(), > >>>>>> > >>>>> > >>>>> > >>>>> Avros.longs()); > >>>>>>> PTableType<Pair<Long, Long>, String> nest = Avros.tableOf(pll, > >>>>>>> Avros.strings()); > >>>>>>> diff --git src/test/resources/customers.txt > >>>>>> > >>>>> > >>>>> > >>>>> src/test/resources/customers.txt > >>>>>>> new file mode 100644 > >>>>>>> index 0000000..98f3f3d > >>>>>>> --- /dev/null > >>>>>>> +++ src/test/resources/customers.txt > >>>>>>> @@ -0,0 +1,4 @@ > >>>>>>> +111|John Doe > >>>>>>> +222|Jane Doe > >>>>>>> +333|Someone Else > >>>>>>> +444|Has No Orders > >>>>>>> \ No newline at end of file > >>>>>>> diff --git src/test/resources/orders.txt > src/test/resources/orders.txt > >>>>>>> new file mode 100644 > >>>>>>> index 0000000..2f1383f > >>>>>>> --- /dev/null > >>>>>>> +++ src/test/resources/orders.txt > >>>>>>> @@ -0,0 +1,4 @@ > >>>>>>> +222|Toilet plunger > >>>>>>> +333|Toilet brush > >>>>>>> +222|Toilet paper > >>>>>>> +111|Corn flakes > >>>>>>> \ No newline at end of file > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On Tue, Jul 3, 2012 at 10:42 AM, Christian Tzolov > >>>>>>> <[email protected] (mailto:[email protected])> > >>>>>> > >>>>> > >>>>> > >>>>> wrote: > >>>>>>>> Hi Gabriel, > >>>>>>>> > >>>>>>>> Seems like the attachment is missing. > >>>>>>>> > >>>>>>>> Cheers, > >>>>>>>> Chris > >>>>>>>> > >>>>>>>> On Tue, Jul 3, 2012 at 9:23 AM, Gabriel Reid < > [email protected] (mailto:[email protected])(mailto: > >>>>> [email protected] (mailto:[email protected]))> wrote: > >>>>>>>> > >>>>>>>>> Hi guys, > >>>>>>>>> > >>>>>>>>> Attached (hopefully) is a patch for an initial implementation of > >>>>> map > >>>>>>>>> side joins. It's currently implemented as a static method in a > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> class > >>>>>>>>> called MapsideJoin, with the same interface as the existing Join > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> class > >>>>>>>>> (with only inner joins being implemented for now). The way it > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> works is > >>>>>>>>> that the right-side PTable of the join is put in the distributed > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> cache > >>>>>>>>> and then read by the join function at runtime. > >>>>>>>>> > >>>>>>>>> There's one spot that I can see for a potentially interesting > >>>>>>>>> optimization -- MRPipeline#run is called once for each map side > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> join > >>>>>>>>> that is set up, but if the setup of the joins was done within > >>>>>>>>> MRPipeline, then we could set up multiple map side joins in > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> parallel > >>>>>>>>> with a single call to MRPipeline#run. OTOH, a whole bunch of map > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> side > >>>>>>>>> joins in parallel probably isn't that common of an operation. > >>>>>>>>> > >>>>>>>>> If anyone feels like taking a look at the patch, any feedback is > >>>>>>>>> appreciated. If nobody sees something that needs serious changes > in > >>>>>>>>> the patch, I'll commit it. > >>>>>>>>> > >>>>>>>>> - Gabriel > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Thu, Jun 21, 2012 at 9:09 AM, Gabriel Reid < > >>>>> [email protected] (mailto:[email protected])> > >>>>>>>>> wrote: > >>>>>>>>>> Replying to all... > >>>>>>>>>> > >>>>>>>>>> On Thu, Jun 21, 2012 at 8:40 AM, Josh Wills < > [email protected] (mailto:[email protected])(mailto: > >>>>> [email protected] (mailto:[email protected]))> wrote: > >>>>>>>>>>> > >>>>>>>>>>> So there's a philosophical issue here: should Crunch ever make > >>>>>>>>>>> decisions about how to do something itself based on its > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> estimates of > >>>>>>>>>>> the size of the data sets, or should it always do exactly what > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> the > >>>>>>>>>>> developer indicates? > >>>>>>>>>>> > >>>>>>>>>>> I can make a case either way, but I think that no matter what, > >>>>> we > >>>>>>>>>>> would want to have explicit functions for performing a join > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> that reads > >>>>>>>>>>> one data set into memory, so I think we can proceed w/the > >>>>>>>>>>> implementation while folks weigh in on what their preferences > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> are for > >>>>>>>>>>> the default join() behavior (e.g., just do a reduce-side join, > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> or try > >>>>>>>>>>> to figure out the best join given information about the input > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> data and > >>>>>>>>>>> some configuration parameters.) > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> I definitely agree on needing to have an explicit way to invoke > >>>>> one or > >>>>>>>>>> the other -- and in general I don't like having magic behind the > >>>>>>>>>> scenes to decide on behaviour (especially considering Crunch is > >>>>>>>>>> generally intended to be closer to the metal than Pig and Hive). > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> I'm > >>>>>>>>>> not sure if the runtime decision is something specific to some > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> of my > >>>>>>>>>> use cases or if it could be useful to a wider audience. > >>>>>>>>>> > >>>>>>>>>> The ability to dynamically decide at runtime whether a map side > >>>>> join > >>>>>>>>>> should be used can also easily be tacked on outside of Crunch, > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> and > >>>>>>>>>> won't impact the underlying implementation (as you pointed out), > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> so I > >>>>>>>>>> definitely also agree on focusing on the underlying > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> implementation > >>>>>>>>>> first, and we can worry about the options used for exposing it > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> later > >>>>>>>>>> on. > >>>>>>>>>> > >>>>>>>>>> - Gabriel > >>>> > >>>> > >>>> -- > >>>> Director of Data Science > >>>> Cloudera <http://www.cloudera.com> > >>>> Twitter: @josh_wills <http://twitter.com/josh_wills> > >>> > >>> > >>> > -- Director of Data Science Cloudera <http://www.cloudera.com> Twitter: @josh_wills <http://twitter.com/josh_wills>
