+chao There's HFileSource in Crunch, which reads HFiles from HDFS. Chao might know a better way.
J On Thu, Feb 13, 2014 at 10:30 AM, Jinal Shah <[email protected]>wrote: > Is there a way to do it Crunch itself to do incremental reads from HFiles > which are stored in hdfs? > > Thanks > > > On Thu, Feb 13, 2014 at 11:50 AM, Josh Wills <[email protected]> wrote: > > > The only option I have for you in that case is pipeline.run or > > pipeline.done; LoadIncrementalHFiles isn't Crunch code, so we can't > > incorporate it into the planner's decision making process. Does > > LoadIncrementalHFiles even run an MR job? > > > > > > On Thu, Feb 13, 2014 at 9:27 AM, Jinal Shah <[email protected]> > > wrote: > > > > > Hi Josh, > > > I tried the option you said and it worked perfectly fine where I'm > doing > > > parallelDo. But for HBase I'm using > > > HFileUtils.writeToHFilesForIncrementalLoad() to write and then Reading > > > using LoadIncrementalHFiles class. So how do I tell the planner in > this > > > case to run in Sequential format. Here is how the code looks like > > > > > > HFileUtils.writeToHFilesForIncrementalLoad(PCollection<keyValue>, > table, > > > path) > > > pipeline.run() > > > > > > LoadIncrementalHFiles loadIncremental = new > > LoadIncrementalHFiles(config); > > > > > > loadIncremental.doBulkLoad(path, table); > > > > > > Thanks > > > Jinal > > > > > > > > > On Wed, Feb 12, 2014 at 11:27 AM, Josh Wills <[email protected]> > > wrote: > > > > > > > I'm not sure what you want here-- there is a mechanism to force the > > > planner > > > > to run stages sequentially (even if the input to stage 2 does not > > > directly > > > > depend directly on the output from stage 1) by using > ParallelDoOptions > > to > > > > introduce such a dependency, as I indicated before: > > > > > > > > SourceTarget marker = ...; > > > > HBase.read() > > > > doSomeChangesOnData > > > > dummyDoFnToCreateMarker > > > > HBase.write() > > > > marker.write() > > > > HBase.read().parallelDo(DoFn, PType, > > > > ParallelDoOptions.builder().sourceTarget(marker).build()); > > > > > > > > That's less of a hint to the planner and more of a command. Another > > > option > > > > would be to set the maximum number of simultaneously running jobs in > > > Crunch > > > > to 1 using the crunch.max.running.jobs configuration parameter, > which > > > > would > > > > run everything in the pipeline sequentially, one job at a time. > > > > > > > > > > > > > > > > On Wed, Feb 12, 2014 at 9:15 AM, Jinal Shah <[email protected] > > > > > > wrote: > > > > > > > > > Can I get some comment on this? > > > > > > > > > > > > > > > On Thu, Feb 6, 2014 at 11:00 AM, Jinal Shah < > [email protected] > > > > > > > > wrote: > > > > > > > > > > > Hi Josh and Micah, > > > > > > > > > > > > In both the scenerios I can easily do a Pipeline.run() and get > > going > > > > from > > > > > > there. But my main question would be why should I do a > > pipeline.run() > > > > in > > > > > > between just to make the planner run something in a sequential > > format > > > > > > rather than the way it would have planned otherwise. What I'm > > getting > > > > at > > > > > is > > > > > > that there should some mechanism that will tell the Planner to do > > > > > something > > > > > > in a certain way to some extend like you can take example of > Apache > > > > Hive, > > > > > > till 0.7 release Hive use to provide a mechanism called HINT > which > > > > would > > > > > > tell the query planner to run something as indicated in the HINT > > > rather > > > > > > than the way it would have been otherwise. I know that you might > > say > > > it > > > > > > might not create optimized plan but at this point the consumer is > > > more > > > > > > focused on the way it should be planned rather than the > > optimization. > > > > > > > > > > > > May be there might be option already there in Crunch that I might > > > have > > > > > not > > > > > > explored but just wanted to put my point out there. If there is > an > > > > > option I > > > > > > would love to learn about it. > > > > > > > > > > > > > > > > > > On Thu, Feb 6, 2014 at 10:44 AM, Josh Wills < > [email protected]> > > > > > wrote: > > > > > > > > > > > >> Hey Jinal, > > > > > >> > > > > > >> On scenario 2, the easiest way to do this is to force a run() > > > between > > > > > the > > > > > >> write and the second read, ala: > > > > > >> > > > > > >> HBase.read() > > > > > >> doSomeChangesOnData > > > > > >> HBase.write() > > > > > >> Pipeline.run() > > > > > >> HBase.read() > > > > > >> > > > > > >> If that isn't possible for some reason, you'll need to add an > > output > > > > > file > > > > > >> to the first phase that can be used to indicate that the > > HBase.write > > > > is > > > > > >> complete, and then have the second read depend on that file > > existing > > > > > >> before > > > > > >> it can run, which can be done via ParallelDoOptions, e.g., > > > > > >> > > > > > >> SourceTarget marker = ...; > > > > > >> HBase.read() > > > > > >> doSomeChangesOnData > > > > > >> dummyDoFnToCreateMarker > > > > > >> HBase.write() > > > > > >> marker.write() > > > > > >> HBase.read().parallelDo(DoFn, PType, > > > > > >> ParallelDoOptions.builder().sourceTarget(marker).build()); > > > > > >> > > > > > >> but that's obviously uglier and more complicated. > > > > > >> > > > > > >> J > > > > > >> > > > > > >> > > > > > >> On Wed, Feb 5, 2014 at 7:14 PM, Jinal Shah < > > [email protected] > > > > > > > > > >> wrote: > > > > > >> > > > > > >> > Hi Josh, > > > > > >> > > > > > > >> > Here is a small example of what I am looking for. So here is > > what > > > > I'm > > > > > >> doing > > > > > >> > > > > > > >> > Scenario 1: > > > > > >> > > > > > > >> > PCollection<Something> s = FunctionDoingSomething(); > > > > > >> > pipeline.write(s, path); > > > > > >> > doSomeFilteringOn(s); > > > > > >> > > > > > > >> > I want that when I do some filtering this should be done in > the > > > map > > > > > >> phase > > > > > >> > instead it is doing it in the Reduce phase due to which I have > > to > > > > > >> introduce > > > > > >> > a pipeline.run() and now this is what the code looks like > > > > > >> > > > > > > >> > PCollection<Something> s = FunctionDoingSomething(); > > > > > >> > pipeline.write(s, path); > > > > > >> > pipeline.run() > > > > > >> > doSomeFilteringOn(s); > > > > > >> > > > > > > >> > Scenerio 2: > > > > > >> > > > > > > >> > I'm doing an operation on HBase and here is how it looks. > > > > > >> > > > > > > >> > Hbase.read() > > > > > >> > doSomeChangesOnData > > > > > >> > HBase.write() > > > > > >> > HBase.read() > > > > > >> > > > > > > >> > Now Crunch at this points considers both the reads as separate > > and > > > > > >> tries to > > > > > >> > run it in parallel so now before I even write my changes it > > reads > > > > > those > > > > > >> > changes so I have to again put a pipeline.run() in order to > > break > > > it > > > > > >> into 2 > > > > > >> > separate flow and execute them in sequence. > > > > > >> > > > > > > >> > So I'm asking is there any way to send an HINT to the Planner > > that > > > > how > > > > > >> it > > > > > >> > create the Plan instead of it deciding by itself or someway to > > > have > > > > > more > > > > > >> > control how to make a planner understand in certain > situations. > > > > > >> > > > > > > >> > Thanks > > > > > >> > Jinal > > > > > >> > > > > > > >> > > > > > > >> > On Thu, Jan 30, 2014 at 11:10 AM, Josh Wills < > > [email protected] > > > > > > > > > >> wrote: > > > > > >> > > > > > > >> > > On Thu, Jan 30, 2014 at 7:09 AM, Jinal Shah < > > > > > [email protected]> > > > > > >> > > wrote: > > > > > >> > > > > > > > >> > > > Hi everyone, > > > > > >> > > > > > > > > >> > > > This is Jinal Shah, I'm new to the group. I had a question > > > about > > > > > >> > > Execution > > > > > >> > > > Control in Crunch. Is there any way we can force Crunch to > > do > > > > > >> certain > > > > > >> > > > operations in parallel or certain operations in sequential > > > ways. > > > > > For > > > > > >> > > > example, let's say if we want the pipeline to executed a > > > > > particular > > > > > >> > DoFn > > > > > >> > > > function in the Map phase instead of the Reduce phase or > > > > > >> vice-versa. Or > > > > > >> > > > Execute a particular Flow only after a particular flow is > > > > > completed > > > > > >> as > > > > > >> > > > oppose to running it in parallel. > > > > > >> > > > > > > > > >> > > > > > > > >> > > Forcing a DoFn to operate in a map or reduce phase is tough > > for > > > > the > > > > > >> > planner > > > > > >> > > to do right now; we sort of rely on the developer to have a > > > mental > > > > > >> model > > > > > >> > of > > > > > >> > > how the jobs will proceed. The place where you usually want > to > > > > > force a > > > > > >> > DoFn > > > > > >> > > to execute in the reduce vs. the map phase is when you have > > > > > dependent > > > > > >> > > groupByKey operations, and you can use cache() or > > materialize() > > > on > > > > > the > > > > > >> > > intermediate output that you want to split on, and the > planner > > > > will > > > > > >> > respect > > > > > >> > > that. > > > > > >> > > > > > > > >> > > On the latter question, the thing to look for is > > > > > >> > > org.apache.crunch.ParallelDoOptions, which isn't something > > I've > > > > > doc'd > > > > > >> in > > > > > >> > > the user guide yet (it's on the todo list, I promise.) You > can > > > > give > > > > > a > > > > > >> > > parallelDo call an additional argument that specifies one or > > > more > > > > > >> > > SourceTargets that have to exist before a particular DoFn is > > > > allowed > > > > > >> to > > > > > >> > > run. In this way, you can force aspects of the pipeline to > be > > > > > >> sequential > > > > > >> > > instead of parallel. We make use of ParallelDoOptions inside > > of > > > > the > > > > > >> > > MapsideJoinStrategy code, to ensure that the data set that > > we'll > > > > be > > > > > >> > loading > > > > > >> > > in-memory actually exists in the file system before we run > the > > > > code > > > > > >> that > > > > > >> > > reads it into memory. > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > >> > > > Maybe this might be asked before so sorry if it came > again. > > If > > > > you > > > > > >> guys > > > > > >> > > > have further question on the details do let me know > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > Thanks everyone and Have a great day. > > > > > >> > > > > > > > > >> > > > Thanks > > > > > >> > > > Jinal > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > -- > > > > > >> > > Director of Data Science > > > > > >> > > Cloudera <http://www.cloudera.com> > > > > > >> > > Twitter: @josh_wills <http://twitter.com/josh_wills> > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > Director of Data Science > > > > Cloudera <http://www.cloudera.com> > > > > Twitter: @josh_wills <http://twitter.com/josh_wills> > > > > > > > > > > > > > > > -- > > Director of Data Science > > Cloudera <http://www.cloudera.com> > > Twitter: @josh_wills <http://twitter.com/josh_wills> > > > -- Director of Data Science Cloudera <http://www.cloudera.com> Twitter: @josh_wills <http://twitter.com/josh_wills>
