Thanks Gabriel for clarifying it :)
On Fri, Oct 18, 2013 at 11:28 PM, Gabriel Reid <[email protected]>wrote: > Hi Chandan, > > Inlined below. > > On Sat, Oct 19, 2013 at 3:31 AM, Chandan Biswas <[email protected]> > wrote: > > Please correct me if I am wrong. I want to understand more how crunch > > create map reduce jobs as pointed out by Micah in earlier mail. > > Suppose I am doing some steps of operation as follows: > > I have a PTable<K,T> table. > > PGroupedTable<K,T> grpedTable1=table.groupByKey(); > > Now I am applying CombineFn on grpedTable1 and getting table2 > > PTable<K,T> table2=grpedTable1.parallelDo(..,CombineFn<K,T>,..); > > PGrpoupedTable<K,T> grpedTable2=table2.groupByKey(); > > PTable<K,U> table3=grpedTable2.parallelDo(..,DoFn,...); > > > > So, which type of grpedTable2 or grpdTable1 will be used for reducers? My > > understanding is type of grpedTable2 will be used for reducers and type > of > > grpedTable1 will be used for shuffle/sorting at map side. Otherwise, > there > > will be no way send the Iterable data to reducers. > > If that is the case, then the point of not changing the type by CombineFn > > doesn't hold. Otherwise, not changing the type by CombineFn makes > complete > > sense. > > > > In this example, there would be two MapReduce jobs kicked off. The > first one would read in table, and then use a Combiner (based on the > CombineFn) before the reducer (i.e. before the groupByKey), and then > the same CombineFn within the reducer, to create table2. > > Going from table2 would be another MapReduce job that would do nothing > in the mapper, and execute the supplied DoFn in the reducer. > > > It will be awesome to have such functionality like Spark as Josh pointed > > out to overcome it in Crunch. > > Just to be clear, adding the "Aggregatable" functionality in Crunch > won't actually add anything that isn't possible right now -- instead, > it will just wrap current functionality into a more readable unit (at > least that's how I see it). > > - Gabriel > > > > Thanks, > > Chandan > > > > > > > > On Fri, Oct 18, 2013 at 7:34 PM, Josh Wills <[email protected]> wrote: > > > >> I'm certainly not opposed to having something like this. Spark makes > this > >> distinction via Accumulable vs. Accumulator: > >> > >> > >> > http://spark.incubator.apache.org/docs/0.8.0/api/core/index.html#org.apache.spark.Accumulable > >> > >> > http://spark.incubator.apache.org/docs/0.8.0/api/core/index.html#org.apache.spark.Accumulator > >> > >> Maybe we want something like "Aggregatable<R, T>" to go along with our > >> Aggregator<T> (which could extend Aggregatable<T, T>)? > >> > >> > >> > >> On Fri, Oct 18, 2013 at 1:36 PM, Gabriel Reid <[email protected] > >> >wrote: > >> > >> > This use case (map/combine <K,V> to <K,U>) seems to come up > >> > repeatedly. The solution (map <K,V> to <K, Collection<V>> and then > >> > combine) works but is also pretty unintuitive. > >> > > >> > Any thoughts on adding a util in Crunch to do this? It would basically > >> > just need to be a static util method that takes a MapFn<<K,V><K,U>> > >> > and a CombineFn<K,U> and would take care of the singleton collection > >> > mapping stuff internally. On the one hand I'm thinking that this could > >> > be pretty useful, but I'm not sure if it would make things more > >> > intuitive or possibly have the reverse effect. > >> > > >> > Any opinions? I'm up for putting it together if people think it's > worth > >> it. > >> > > >> > - Gabriel > >> > > >> > > >> > On Fri, Oct 18, 2013 at 4:14 PM, Micah Whitacre <[email protected]> > >> wrote: > >> > > Thinking about the technical issues at first glance you could say > the > >> > > restriction is just the way the java generics are written for the > >> > CombineFn > >> > > class but if you think about what is actually happening it would be > >> > awkward > >> > > to support changing types in the CombineFn especially when it is > paired > >> > > with a GroupByKey. As I showed in the example the CombineFn > >> essentially > >> > > bookends the GBK operation by performing processing on the types > before > >> > and > >> > > after the sorting. The GBK's types describe the output of the map > >> phase > >> > > and input to the reduce. If the CombineFn changed the types then > the > >> > > output wouldn't match the types describe by the GBK. I'm guessing > this > >> > > could lead to a number of problems trying to compute the types and > plan > >> > for > >> > > the job. > >> > > > >> > > > >> > > On Fri, Oct 18, 2013 at 8:55 AM, Micah Whitacre <[email protected]> > >> > wrote: > >> > > > >> > >> I'm not sure I follow how there is extra effort involved. Are you > >> > talking > >> > >> development effort or processing effort? From a development > effort in > >> > both > >> > >> cases you need to write code that translates T to U and combines > the > >> > >> values. The difference is whether that logic exists inside of a > >> single > >> > >> DoFn or is split into a MapFn + CombineFn. So the development > effort > >> > >> should be the same. > >> > >> > >> > >> > >> > >> On Fri, Oct 18, 2013 at 8:11 AM, Chandan Biswas < > >> [email protected] > >> > >wrote: > >> > >> > >> > >>> yeah.. i see what you are talking about. But it will take extra > >> effort > >> > to > >> > >>> convert to U type. So, is there any specific reason the way > CombineFn > >> > >>> created initially that CombineFn will not allow other return type. > >> Was > >> > >>> there any constraints (design / complexity) to restrict to this > >> > behavior? > >> > >>> Thanks, > >> > >>> > >> > >>> > >> > >>> On Thu, Oct 17, 2013 at 8:47 PM, Micah Whitacre <[email protected] > > > >> > wrote: > >> > >>> > >> > >>> > Chandan, > >> > >>> > So let's apply your situation to the types and conversion > that > >> is > >> > >>> > proposed and break it down where logic will be applied. Say we > >> have > >> > a > >> > >>> > PCollection that is like the following: > >> > >>> > > >> > >>> > Mapper 1: > >> > >>> > <id1, "Hello"> > >> > >>> > <id2, "World"> > >> > >>> > <id1, "I like turtles"> > >> > >>> > > >> > >>> > Mapper 2 > >> > >>> > <id2, "Goodbye"> > >> > >>> > > >> > >>> > This will be represented by the PTable<String, Comment>. We > then > >> > apply > >> > >>> a > >> > >>> > MapFn to transform it into PTable<String, Book> and we'd get the > >> > >>> following > >> > >>> > in our PCollection: > >> > >>> > > >> > >>> > Mapper 1 > >> > >>> > <id1, <"Hello", 1>> > >> > >>> > <id2, <"World", 1>> > >> > >>> > <id1, <"I like turtles", 1>> > >> > >>> > > >> > >>> > Mapper 2 > >> > >>> > <id2, <"Goodbye", 1>> > >> > >>> > > >> > >>> > Then if we were to use the GBK + CombineFn, the output of the > map > >> > phase > >> > >>> > would be.. > >> > >>> > > >> > >>> > Mapper 1 > >> > >>> > <id2, <"World", 1>> > >> > >>> > <id1, <"I like turtles", 2>> > >> > >>> > > >> > >>> > Mapper 2 > >> > >>> > <id2, <"Goodbye", 1>> > >> > >>> > > >> > >>> > Notice Mapper 1 would only be emitting 2 items instead of 3 and > >> > >>> therefore > >> > >>> > less data is sent over the wire and has to be sorted. Also in > the > >> > >>> reducer > >> > >>> > after the GBK is completed the CombineFn would finish its work > and > >> > you'd > >> > >>> > get the following: > >> > >>> > > >> > >>> > Reducer 1 > >> > >>> > <id2, <"Goodbye", 2>> > >> > >>> > <id1, <"I like turtles", 2>> > >> > >>> > > >> > >>> > The only case where this would not improve performance is if you > >> > never > >> > >>> emit > >> > >>> > data for the same key from the same mapper or your mapper > doesn't > >> > reduce > >> > >>> > the size of the data. > >> > >>> > > >> > >>> > > >> > >>> > On Thu, Oct 17, 2013 at 8:18 PM, Chandan Biswas < > >> > [email protected] > >> > >>> > >wrote: > >> > >>> > > >> > >>> > > I have PTable<String,Comment>. and getting after reduce > >> > PTable<String, > >> > >>> > > Book> > >> > >>> > > > >> > >>> > > T--> Comment{ String comment, String author}, U--> Book{String > >> id, > >> > >>> String > >> > >>> > > lengthiestComment, int noOfComments} > >> > >>> > > > >> > >>> > > But wanted to some aggregations in the map side based on some > >> logic > >> > >>> > instead > >> > >>> > > of all aggregations at reduce side. > >> > >>> > > Yes in worst case, data flow over the n/w will remain same, > but > >> > >>> sorting > >> > >>> > > will be improved. > >> > >>> > > > >> > >>> > > Thanks, > >> > >>> > > Chandan > >> > >>> > > > >> > >>> > > > >> > >>> > > On Thu, Oct 17, 2013 at 6:46 PM, Josh Wills < > [email protected] > >> > > >> > >>> wrote: > >> > >>> > > > >> > >>> > > > On Thu, Oct 17, 2013 at 4:41 PM, Chandan Biswas < > >> > >>> [email protected] > >> > >>> > > > >wrote: > >> > >>> > > > > >> > >>> > > > > Yeah, I agree with Micah that it will not eliminate the > >> reduce > >> > >>> phase > >> > >>> > > > > entirely. But the dummy object of U suggested by Josh (or > >> > >>> converting > >> > >>> > > to U > >> > >>> > > > > type in map for every record) will not improve > performance > >> > >>> because > >> > >>> > > same > >> > >>> > > > > amounts of records will be sorted and aggregated in the > >> reduce > >> > >>> phase. > >> > >>> > > > > >> > >>> > > > > >> > >>> > > > I don't think that's true-- the records of type U will be > >> > combined > >> > >>> on > >> > >>> > the > >> > >>> > > > map-side, which would reduce the amount of data that is > pushed > >> > over > >> > >>> the > >> > >>> > > > network and improve performance. > >> > >>> > > > > >> > >>> > > > Can you give any additional details about what T and U are > in > >> > this > >> > >>> > > > scenario? :) > >> > >>> > > > > >> > >>> > > > > >> > >>> > > > > >> > >>> > > > > But > >> > >>> > > > > my point is, can we improve it by applying a combiner > where > >> the > >> > >>> > > combineFn > >> > >>> > > > > provides output as different type. If we have same type, > we > >> can > >> > >>> use > >> > >>> > the > >> > >>> > > > > combiner to do some aggregation in map side which improves > >> > >>> > performance. > >> > >>> > > > > But, can we have some mechanism by which the same > advantage > >> > can be > >> > >>> > > > achieved > >> > >>> > > > > when combineFn emits different type. I think, emitting > same > >> > type > >> > >>> by > >> > >>> > > > > CombineFn has restricted its use. Can we have new > CombineFn > >> > that > >> > >>> > allows > >> > >>> > > > us > >> > >>> > > > > to output different type not only same type as input? > >> > >>> > > > > > >> > >>> > > > > > >> > >>> > > > > On Thu, Oct 17, 2013 at 5:05 PM, Josh Wills < > >> > [email protected]> > >> > >>> > > wrote: > >> > >>> > > > > > >> > >>> > > > > > Yeah, my experience in these kinds of situations is that > >> you > >> > >>> need > >> > >>> > to > >> > >>> > > > come > >> > >>> > > > > > up with a "dummy" or singleton version of U for the case > >> > where > >> > >>> > there > >> > >>> > > is > >> > >>> > > > > > only a single T and do that conversion on the map side > of > >> the > >> > >>> job, > >> > >>> > > > before > >> > >>> > > > > > the combiner runs. I think Chao had an issue like this > >> awhile > >> > >>> ago, > >> > >>> > > > where > >> > >>> > > > > he > >> > >>> > > > > > had a PTable<String, Double> and wanted to write a > combiner > >> > that > >> > >>> > > would > >> > >>> > > > > > return a PTable<String, Collection<Double>>. The > solution > >> > was to > >> > >>> > > > convert > >> > >>> > > > > > the map-side object to a PTable<String, > >> Collection<Double>>, > >> > >>> where > >> > >>> > > the > >> > >>> > > > > > value on the map-side was a singleton list containing > just > >> > that > >> > >>> > > double > >> > >>> > > > > > value. Does that sort of trick work here? > >> > >>> > > > > > > >> > >>> > > > > > > >> > >>> > > > > > On Thu, Oct 17, 2013 at 2:57 PM, Micah Whitacre < > >> > >>> [email protected]> > >> > >>> > > > > wrote: > >> > >>> > > > > > > >> > >>> > > > > > > Ok so the feature you are trying to achieve is the > >> > proactive > >> > >>> > > > > combination > >> > >>> > > > > > of > >> > >>> > > > > > > data before performing the GBK like the javadoc > >> describes. > >> > >>> > > > Essentially > >> > >>> > > > > > in > >> > >>> > > > > > > that situation the CombineFn is being used as a > >> > Combiner[1] to > >> > >>> > > > combine > >> > >>> > > > > > the > >> > >>> > > > > > > data local to that mapper before doing the GBK and > then > >> > >>> further > >> > >>> > > > > combining > >> > >>> > > > > > > the data in the reduce operation. It will not > >> necessarily > >> > >>> > > eliminate > >> > >>> > > > > the > >> > >>> > > > > > > need for all processing in the reduce. > >> > >>> > > > > > > > >> > >>> > > > > > > If you want to use this functionality you will need > to do > >> > the > >> > >>> > > > > following: > >> > >>> > > > > > > > >> > >>> > > > > > > PTable<S, T> map to PTable<S, U> > >> > >>> > > > > > > PTable<S, U> gbk to PGT<S, U> > >> > >>> > > > > > > PGT<S, U> combine PTable<S, U> > >> > >>> > > > > > > > >> > >>> > > > > > > This will take advantage of any optimization provided > by > >> > the > >> > >>> > > > CombineFn. > >> > >>> > > > > > > > >> > >>> > > > > > > [1] - http://wiki.apache.org/hadoop/HadoopMapReduce > >> > >>> > > > > > > > >> > >>> > > > > > > > >> > >>> > > > > > > > >> > >>> > > > > > > On Thu, Oct 17, 2013 at 4:30 PM, Chandan Biswas < > >> > >>> > > > [email protected] > >> > >>> > > > > > > >wrote: > >> > >>> > > > > > > > >> > >>> > > > > > > > Hello Micah, > >> > >>> > > > > > > > Yes we are using MapFn now. That aggregation and > >> > >>> computation is > >> > >>> > > > being > >> > >>> > > > > > > done > >> > >>> > > > > > > > in reduce phase. As CombineFn after GBK runs into > map > >> > side, > >> > >>> > then > >> > >>> > > > > those > >> > >>> > > > > > > most > >> > >>> > > > > > > > computations can be done in map side which are now > >> > running > >> > >>> in > >> > >>> > > > reduce > >> > >>> > > > > > > phase. > >> > >>> > > > > > > > Some smaller aggregations and computations can be > done > >> on > >> > >>> > reduce > >> > >>> > > > > phase. > >> > >>> > > > > > > > My point was to do some aggregation (and create a > new > >> > >>> object) > >> > >>> > in > >> > >>> > > > map > >> > >>> > > > > > > phase > >> > >>> > > > > > > > instead of in reduce phase. > >> > >>> > > > > > > > > >> > >>> > > > > > > > Thanks, > >> > >>> > > > > > > > Chandan > >> > >>> > > > > > > > > >> > >>> > > > > > > > > >> > >>> > > > > > > > On Thu, Oct 17, 2013 at 3:48 PM, Micah Whitacre < > >> > >>> > > [email protected]> > >> > >>> > > > > > > wrote: > >> > >>> > > > > > > > > >> > >>> > > > > > > > > Chandan, > >> > >>> > > > > > > > > I think what you are wanting will just be a > simple > >> > >>> MapFn > >> > >>> > > > instead > >> > >>> > > > > > of > >> > >>> > > > > > > a > >> > >>> > > > > > > > > CombineFn. The doc of the CombineFn[1] sounds > like > >> > what > >> > >>> you > >> > >>> > > want > >> > >>> > > > > > with > >> > >>> > > > > > > > the > >> > >>> > > > > > > > > statement "A special > >> > >>> > > > > > > > > DoFn< > >> > >>> > > > > > > > >> > >>> > > http://crunch.apache.org/apidocs/0.7.0/org/apache/crunch/DoFn.html > >> > >>> > > > > >> > >>> > > > > > > > > implementation > >> > >>> > > > > > > > > that converts an > >> > >>> > > > > > > > > Iterable< > >> > >>> > > > > > > > > > >> > >>> > > > > > > > > >> > >>> > > > > > > > >> > >>> > > > > > > >> > >>> > > > > > >> > >>> > > > > >> > >>> > > > >> > >>> > > >> > >>> > >> > > >> > http://download.oracle.com/javase/6/docs/api/java/lang/Iterable.html?is-external=true > >> > >>> > > > > > > > > > > >> > >>> > > > > > > > > of > >> > >>> > > > > > > > > values into a single value" but it is expecting > the > >> > value > >> > >>> to > >> > >>> > be > >> > >>> > > > of > >> > >>> > > > > > the > >> > >>> > > > > > > > same > >> > >>> > > > > > > > > time. Since you are wanting to combine the values > >> > into a > >> > >>> > > > different > >> > >>> > > > > > > form > >> > >>> > > > > > > > it > >> > >>> > > > > > > > > should be fairly trivial to write a MapFn that > >> converts > >> > >>> the > >> > >>> > > > > > Iterable<T> > >> > >>> > > > > > > > -> > >> > >>> > > > > > > > > U. > >> > >>> > > > > > > > > > >> > >>> > > > > > > > > [1] - > >> > >>> > > > > > > > > > >> > >>> > > > > > > > >> > >>> > > > > > >> > >>> > > > >> > >>> > >> > > http://crunch.apache.org/apidocs/0.7.0/org/apache/crunch/CombineFn.html > >> > >>> > > > > > > > > > >> > >>> > > > > > > > > > >> > >>> > > > > > > > > On Thu, Oct 17, 2013 at 3:30 PM, Chandan Biswas < > >> > >>> > > > > > [email protected] > >> > >>> > > > > > > > > >wrote: > >> > >>> > > > > > > > > > >> > >>> > > > > > > > > > I was trying to refactoring some stuffs and > trying > >> to > >> > >>> use > >> > >>> > > > > > combineFn. > >> > >>> > > > > > > > > > But when I went into deeper, found that I can't > do > >> > it as > >> > >>> > > Crunch > >> > >>> > > > > > > doesn't > >> > >>> > > > > > > > > > allow it the functionality I needed. For > example, I > >> > >>> have a > >> > >>> > > > > > > > > > PGroupedTable<S,T>. I wanted to apply > >> CombineFn<S,T> > >> > on > >> > >>> it > >> > >>> > > and > >> > >>> > > > > > wanted > >> > >>> > > > > > > > to > >> > >>> > > > > > > > > > get PCollection<S,U> instead of T. Right now, > >> > CombineFn > >> > >>> > > allows > >> > >>> > > > > only > >> > >>> > > > > > > > same > >> > >>> > > > > > > > > > type as return value. The use case of this need > is > >> > that > >> > >>> > there > >> > >>> > > > > will > >> > >>> > > > > > be > >> > >>> > > > > > > > > some > >> > >>> > > > > > > > > > time saving in sorting. It's natural that when > >> > >>> aggregating > >> > >>> > > some > >> > >>> > > > > > > objects > >> > >>> > > > > > > > > at > >> > >>> > > > > > > > > > map side can create a new different type object. > >> > >>> > > > > > > > > > > >> > >>> > > > > > > > > > Any thought on it? Am I missing any thing? If > this > >> > can > >> > >>> be > >> > >>> > > > written > >> > >>> > > > > > in > >> > >>> > > > > > > > > > different way using existing way please let me > >> know. > >> > >>> > > > > > > > > > > >> > >>> > > > > > > > > > Thanks > >> > >>> > > > > > > > > > Chandan > >> > >>> > > > > > > > > > > >> > >>> > > > > > > > > > >> > >>> > > > > > > > > >> > >>> > > > > > > > >> > >>> > > > > > > >> > >>> > > > > > > >> > >>> > > > > > > >> > >>> > > > > > -- > >> > >>> > > > > > Director of Data Science > >> > >>> > > > > > Cloudera <http://www.cloudera.com> > >> > >>> > > > > > Twitter: @josh_wills <http://twitter.com/josh_wills> > >> > >>> > > > > > > >> > >>> > > > > > >> > >>> > > > > >> > >>> > > > > >> > >>> > > > > >> > >>> > > > -- > >> > >>> > > > Director of Data Science > >> > >>> > > > Cloudera <http://www.cloudera.com> > >> > >>> > > > Twitter: @josh_wills <http://twitter.com/josh_wills> > >> > >>> > > > > >> > >>> > > > >> > >>> > > >> > >>> > >> > >> > >> > >> > >> > > >> > >> > >> > >> -- > >> Director of Data Science > >> Cloudera <http://www.cloudera.com> > >> Twitter: @josh_wills <http://twitter.com/josh_wills> > >> >
