Hey Micah,
Yes you are right and this is what is going on in that // do something.
(Higher Level overview)
Here U and V are same
PCollection<V> collectionWhichCouldBeEmpty = null;
if(path.exists){
collectionWhichCouldBeEmpty= pipeline.read(FromPath, PType.V);
} else{
collectionWhichCouldBeEmpty = pipeline.emptyPCollection();
}
PCollection<U> collectionWhichHasData = DataComingFromDifferentSource();
PTable<K,V> VTable = collectionWhichCouldBeEmpty.by(PType<K>);
PTable<K,U> UTable = collectionWhichHasData.by(PType<K>);
UVTable = Join.join(UTable, VTable, Join.LEFT);
pipeline.write(UVTable.values(), somePath, PType<U>);
pipeline.run() // error is here
Hope this helps.
On Thu, Mar 20, 2014 at 12:50 PM, Micah Whitacre <[email protected]> wrote:
> Jinal can you elaborate on the "//do something" section of the code? I
> thought when I heard it described other PCollections were being joined with
> the emptyPCollection and it was the outcome of the joins and additional
> processing that was actually being persisted.
>
>
> On Thu, Mar 20, 2014 at 10:18 AM, Chao Shi <[email protected]> wrote:
>
> > Hi Josh and Jinal,
> >
> > This was introduced to help the following case: In one of our MR
> programs,
> > there is a command line option that one can optionally specify a path to
> > data to be joined on. Before introducing emptyCollection(), we have to do
> > like this:
> >
> > Path path = ...
> > PCollection in1 = null;
> > if (path != null) {
> > in = pipeline.read(...);
> > }
> > PCollection in2 = pipeline.read(...);
> > if (in1 != null) {
> > in2 = in2.join(in1);
> > }
> >
> > You can see checks for null everywhere. With emptyPColleciton, we can do
> > this:
> >
> > if (path != null) {
> > in2 = pipeline.read();
> > } else {
> > in2 = emptyPCollection();
> > }
> > in1.join(in2)
> >
> > I think Jinal's case should be a bad case for our current implementation.
> > Perhaps we should change it to create an empty output directory rather
> than
> > report an error, which doesn't start the MR and can save the job start-up
> > time. This is the benefit for knowing PCollection in plan-time.
> >
> >
> > 2014-03-16 23:34 GMT+08:00 Josh Wills <[email protected]>:
> >
> > > +chao
> > >
> > > Inlined.
> > >
> > > On Sat, Mar 15, 2014 at 12:34 PM, Jinal Shah <[email protected]
> > >wrote:
> > >
> > >> Hi,
> > >>
> > >> I actually came across a particular case I'm not sure whether the
> > behavior
> > >> is right or not. So here is what is happening I am getting No Output
> > >> exception throwing while trying to run my Crunch job. On further
> > >> investigating I found that I was using Pipeline.emptyCollection(). So
> > here
> > >> is how my scenario looks like
> > >>
> > >> PCollection<V> collectionWhichCouldBeEmpty = null;
> > >> if(path.exists){
> > >> collectionWhichCouldBeEmpty= pipeline.read(FromPath, PType.V);
> > >> } else{
> > >> collectionWhichCouldBeEmpty = pipeline.emptyPCollection();
> > >> }
> > >>
> > >> //do some operations
> > >>
> > >> pipeline.write(Target);
> > >>
> > >> pipeline.run()// this is where it is throwing the error
> > >>
> > >>
> >
> https://github.com/apache/crunch/blob/master/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java#L287
> > >>
> > >> On further debugging I found that the Vertex didn't have an input.
> > >>
> > >>
> >
> https://github.com/apache/crunch/blob/master/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java#L275
> > >>
> > >>
> > >> So If I use the pipeline.read and it creates an Empty PCollection it
> > works
> > >> since it has the input source but If I create an Empty PCollection
> using
> > >> the pipeline.emptyPCollection which doesn't have an input source then
> it
> > >> fails
> > >>
> > >> Not sure if the case is missed or it has to be like this.
> > >>
> > >
> > > It's a good question, and I'm not sure of the answer. Added Chao to the
> > > To: line to ask him what the intention was in this case.
> > >
> > >
> > >> Thanks
> > >> Jinal
> > >>
> > >
> > >
> >
>