Yup this is to avoid .run() ;-) . But I want the beforeWrite output to be stored. So how do I apply the scaleFactor method and how will help to make the DoFn for afterWrite run in Mapside.
On Tue, Feb 25, 2014 at 6:58 PM, Josh Wills <[email protected]> wrote: > Okay. Out of curiosity, if you override the float scaleFactor() method that > you apply here: > > PCollection<U> afterParallelDo = afterWrite.parallelDo(DoFn, U, > ParallelDoOptions.builder().sources(target).build()); > > and apply it to beforeWrite, does it still insist on writing out > beforeWrite on the reduce side? > > BTW, I'm assuming there is (again) some reason not to force a run() here. > ;-) > > > > On Tue, Feb 25, 2014 at 4:51 PM, Jinal Shah <[email protected]> > wrote: > > > I wanted to run that in the map phase instead of reduce. If I don't do > that > > it will run in the reduce phase. > > > > > > On Tue, Feb 25, 2014 at 5:38 PM, Josh Wills <[email protected]> wrote: > > > > > On Tue, Feb 25, 2014 at 3:04 PM, Jinal Shah <[email protected]> > > > wrote: > > > > > > > Hi, > > > > > > > > I'm trying to do an union of 3 PTables but I'm getting this error > > > > http://pastebin.com/TkMPunJu > > > > > > > > this is where it is throwing it > > > > > > > > > > > > > > https://github.com/apache/crunch/blob/master/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputCollection.java#L66 > > > > > > > > this is what I'm trying to do > > > > > > > > PCollection<U> beforeWrite = someOperation(); > > > > > > > > SourceTarget<U> target = new AvroFileTarget().asSourceTaget(U); > > > > > > > > pipeline.write(beforeWrite, target); > > > > > > > > PCollection<U> afterWrite = pipeline.read(target); > > > > > > > > > > Why are you creating afterWrite here, instead of doing the processing > in > > > the next step (the one that yields afterParallelDo) against > beforeWrite? > > > > > > > > > > PCollection<U> afterParallelDo = afterWrite.parallelDo(DoFn, U, > > > > ParallelDoOptions.builder().sources(target).build()); > > > > > > > > PTable<K,U> afterSomeOperation = someOperations(); > > > > > > > > PTable<K,U> thatNeedsToBeAdded = comingFromHbase(); > > > > > > > > PTable<K,U> unionNeeded = > > afterSomeOperation.union(thatNeedsToBeAdded); > > > // > > > > this is where it fails for some reason since it is looking for the > > target > > > > which is not generated yet. > > > > > > > > > > > > Can anyone help me in understanding why this is happening? > > > > > > > > Thanks > > > > Jinal > > > > > > > > > > > > > > > > -- > > > Director of Data Science > > > Cloudera <http://www.cloudera.com> > > > Twitter: @josh_wills <http://twitter.com/josh_wills> > > > > > >
