Hey Jinal, Been thinking about it off-and-on all day, and I don't have a better solution right now than pipeline.run()...
J On Wed, Feb 26, 2014 at 6:46 PM, Jinal Shah <[email protected]> wrote: > So Josh what do you think can be done? > > > On Wed, Feb 26, 2014 at 10:37 AM, Jinal Shah <[email protected] > >wrote: > > > As well as it is trying to run it in parallel so now it is failing on > that. > > > > > > On Wed, Feb 26, 2014 at 10:30 AM, Jinal Shah <[email protected] > >wrote: > > > >> I did as you said but now it is running the DoFn twice since after that > >> parallel do I'm writing that output to HDFS so it divided that both work > >> into 2 once while storing the output it is running it in the reduce > phase > >> and then while doing the union it is running it in the map phase. > >> > >> > >> On Tue, Feb 25, 2014 at 7:41 PM, Josh Wills <[email protected]> > wrote: > >> > >>> So my thought would be that if the DoFn in this step: > >>> > >>> beforeWrite.parallelDo(DoFn, U, ParallelDoOptions.builder(). > >>> sources(target).build()); > >>> > >>> signaled that it was going to write a lot of data with a large > >>> scaleFactor, > >>> then the planner would use the output from beforeWrite as a checkpoint, > >>> and > >>> save the DoFn processing for the map phase. > >>> > >>> > >>> On Tue, Feb 25, 2014 at 5:08 PM, Jinal Shah <[email protected]> > >>> wrote: > >>> > >>> > Yup this is to avoid .run() ;-) . But I want the beforeWrite output > to > >>> be > >>> > stored. So how do I apply the scaleFactor method and how will help to > >>> make > >>> > the DoFn for afterWrite run in Mapside. > >>> > > >>> > > >>> > On Tue, Feb 25, 2014 at 6:58 PM, Josh Wills <[email protected]> > >>> wrote: > >>> > > >>> > > Okay. Out of curiosity, if you override the float scaleFactor() > >>> method > >>> > that > >>> > > you apply here: > >>> > > > >>> > > PCollection<U> afterParallelDo = afterWrite.parallelDo(DoFn, U, > >>> > > ParallelDoOptions.builder().sources(target).build()); > >>> > > > >>> > > and apply it to beforeWrite, does it still insist on writing out > >>> > > beforeWrite on the reduce side? > >>> > > > >>> > > BTW, I'm assuming there is (again) some reason not to force a run() > >>> here. > >>> > > ;-) > >>> > > > >>> > > > >>> > > > >>> > > On Tue, Feb 25, 2014 at 4:51 PM, Jinal Shah < > [email protected] > >>> > > >>> > > wrote: > >>> > > > >>> > > > I wanted to run that in the map phase instead of reduce. If I > >>> don't do > >>> > > that > >>> > > > it will run in the reduce phase. > >>> > > > > >>> > > > > >>> > > > On Tue, Feb 25, 2014 at 5:38 PM, Josh Wills <[email protected] > > > >>> > wrote: > >>> > > > > >>> > > > > On Tue, Feb 25, 2014 at 3:04 PM, Jinal Shah < > >>> [email protected] > >>> > > > >>> > > > > wrote: > >>> > > > > > >>> > > > > > Hi, > >>> > > > > > > >>> > > > > > I'm trying to do an union of 3 PTables but I'm getting this > >>> error > >>> > > > > > http://pastebin.com/TkMPunJu > >>> > > > > > > >>> > > > > > this is where it is throwing it > >>> > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > https://github.com/apache/crunch/blob/master/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputCollection.java#L66 > >>> > > > > > > >>> > > > > > this is what I'm trying to do > >>> > > > > > > >>> > > > > > PCollection<U> beforeWrite = someOperation(); > >>> > > > > > > >>> > > > > > SourceTarget<U> target = new > AvroFileTarget().asSourceTaget(U); > >>> > > > > > > >>> > > > > > pipeline.write(beforeWrite, target); > >>> > > > > > > >>> > > > > > PCollection<U> afterWrite = pipeline.read(target); > >>> > > > > > > >>> > > > > > >>> > > > > Why are you creating afterWrite here, instead of doing the > >>> processing > >>> > > in > >>> > > > > the next step (the one that yields afterParallelDo) against > >>> > > beforeWrite? > >>> > > > > > >>> > > > > > >>> > > > > > PCollection<U> afterParallelDo = afterWrite.parallelDo(DoFn, > U, > >>> > > > > > ParallelDoOptions.builder().sources(target).build()); > >>> > > > > > > >>> > > > > > PTable<K,U> afterSomeOperation = someOperations(); > >>> > > > > > > >>> > > > > > PTable<K,U> thatNeedsToBeAdded = comingFromHbase(); > >>> > > > > > > >>> > > > > > PTable<K,U> unionNeeded = > >>> > > > afterSomeOperation.union(thatNeedsToBeAdded); > >>> > > > > // > >>> > > > > > this is where it fails for some reason since it is looking > for > >>> the > >>> > > > target > >>> > > > > > which is not generated yet. > >>> > > > > > > >>> > > > > > > >>> > > > > > Can anyone help me in understanding why this is happening? > >>> > > > > > > >>> > > > > > Thanks > >>> > > > > > Jinal > >>> > > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > -- > >>> > > > > Director of Data Science > >>> > > > > Cloudera <http://www.cloudera.com> > >>> > > > > Twitter: @josh_wills <http://twitter.com/josh_wills> > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> > >>> > >>> -- > >>> Director of Data Science > >>> Cloudera <http://www.cloudera.com> > >>> Twitter: @josh_wills <http://twitter.com/josh_wills> > >>> > >> > >> > > > -- Director of Data Science Cloudera <http://www.cloudera.com> Twitter: @josh_wills <http://twitter.com/josh_wills>
