On Sat, Oct 19, 2013 at 2:34 AM, Josh Wills <[email protected]> wrote: > I'm certainly not opposed to having something like this. Spark makes this > distinction via Accumulable vs. Accumulator: > > http://spark.incubator.apache.org/docs/0.8.0/api/core/index.html#org.apache.spark.Accumulable > http://spark.incubator.apache.org/docs/0.8.0/api/core/index.html#org.apache.spark.Accumulator > > Maybe we want something like "Aggregatable<R, T>" to go along with our > Aggregator<T> (which could extend Aggregatable<T, T>)? > >
That sounds like a great way of exposing that functionality -- I'll take a closer look at actually doing it. - Gabriel > > On Fri, Oct 18, 2013 at 1:36 PM, Gabriel Reid <[email protected]>wrote: > >> This use case (map/combine <K,V> to <K,U>) seems to come up >> repeatedly. The solution (map <K,V> to <K, Collection<V>> and then >> combine) works but is also pretty unintuitive. >> >> Any thoughts on adding a util in Crunch to do this? It would basically >> just need to be a static util method that takes a MapFn<<K,V><K,U>> >> and a CombineFn<K,U> and would take care of the singleton collection >> mapping stuff internally. On the one hand I'm thinking that this could >> be pretty useful, but I'm not sure if it would make things more >> intuitive or possibly have the reverse effect. >> >> Any opinions? I'm up for putting it together if people think it's worth it. >> >> - Gabriel >> >> >> On Fri, Oct 18, 2013 at 4:14 PM, Micah Whitacre <[email protected]> wrote: >> > Thinking about the technical issues at first glance you could say the >> > restriction is just the way the java generics are written for the >> CombineFn >> > class but if you think about what is actually happening it would be >> awkward >> > to support changing types in the CombineFn especially when it is paired >> > with a GroupByKey. As I showed in the example the CombineFn essentially >> > bookends the GBK operation by performing processing on the types before >> and >> > after the sorting. The GBK's types describe the output of the map phase >> > and input to the reduce. If the CombineFn changed the types then the >> > output wouldn't match the types describe by the GBK. I'm guessing this >> > could lead to a number of problems trying to compute the types and plan >> for >> > the job. >> > >> > >> > On Fri, Oct 18, 2013 at 8:55 AM, Micah Whitacre <[email protected]> >> wrote: >> > >> >> I'm not sure I follow how there is extra effort involved. Are you >> talking >> >> development effort or processing effort? From a development effort in >> both >> >> cases you need to write code that translates T to U and combines the >> >> values. The difference is whether that logic exists inside of a single >> >> DoFn or is split into a MapFn + CombineFn. So the development effort >> >> should be the same. >> >> >> >> >> >> On Fri, Oct 18, 2013 at 8:11 AM, Chandan Biswas <[email protected] >> >wrote: >> >> >> >>> yeah.. i see what you are talking about. But it will take extra effort >> to >> >>> convert to U type. So, is there any specific reason the way CombineFn >> >>> created initially that CombineFn will not allow other return type. Was >> >>> there any constraints (design / complexity) to restrict to this >> behavior? >> >>> Thanks, >> >>> >> >>> >> >>> On Thu, Oct 17, 2013 at 8:47 PM, Micah Whitacre <[email protected]> >> wrote: >> >>> >> >>> > Chandan, >> >>> > So let's apply your situation to the types and conversion that is >> >>> > proposed and break it down where logic will be applied. Say we have >> a >> >>> > PCollection that is like the following: >> >>> > >> >>> > Mapper 1: >> >>> > <id1, "Hello"> >> >>> > <id2, "World"> >> >>> > <id1, "I like turtles"> >> >>> > >> >>> > Mapper 2 >> >>> > <id2, "Goodbye"> >> >>> > >> >>> > This will be represented by the PTable<String, Comment>. We then >> apply >> >>> a >> >>> > MapFn to transform it into PTable<String, Book> and we'd get the >> >>> following >> >>> > in our PCollection: >> >>> > >> >>> > Mapper 1 >> >>> > <id1, <"Hello", 1>> >> >>> > <id2, <"World", 1>> >> >>> > <id1, <"I like turtles", 1>> >> >>> > >> >>> > Mapper 2 >> >>> > <id2, <"Goodbye", 1>> >> >>> > >> >>> > Then if we were to use the GBK + CombineFn, the output of the map >> phase >> >>> > would be.. >> >>> > >> >>> > Mapper 1 >> >>> > <id2, <"World", 1>> >> >>> > <id1, <"I like turtles", 2>> >> >>> > >> >>> > Mapper 2 >> >>> > <id2, <"Goodbye", 1>> >> >>> > >> >>> > Notice Mapper 1 would only be emitting 2 items instead of 3 and >> >>> therefore >> >>> > less data is sent over the wire and has to be sorted. Also in the >> >>> reducer >> >>> > after the GBK is completed the CombineFn would finish its work and >> you'd >> >>> > get the following: >> >>> > >> >>> > Reducer 1 >> >>> > <id2, <"Goodbye", 2>> >> >>> > <id1, <"I like turtles", 2>> >> >>> > >> >>> > The only case where this would not improve performance is if you >> never >> >>> emit >> >>> > data for the same key from the same mapper or your mapper doesn't >> reduce >> >>> > the size of the data. >> >>> > >> >>> > >> >>> > On Thu, Oct 17, 2013 at 8:18 PM, Chandan Biswas < >> [email protected] >> >>> > >wrote: >> >>> > >> >>> > > I have PTable<String,Comment>. and getting after reduce >> PTable<String, >> >>> > > Book> >> >>> > > >> >>> > > T--> Comment{ String comment, String author}, U--> Book{String id, >> >>> String >> >>> > > lengthiestComment, int noOfComments} >> >>> > > >> >>> > > But wanted to some aggregations in the map side based on some logic >> >>> > instead >> >>> > > of all aggregations at reduce side. >> >>> > > Yes in worst case, data flow over the n/w will remain same, but >> >>> sorting >> >>> > > will be improved. >> >>> > > >> >>> > > Thanks, >> >>> > > Chandan >> >>> > > >> >>> > > >> >>> > > On Thu, Oct 17, 2013 at 6:46 PM, Josh Wills <[email protected]> >> >>> wrote: >> >>> > > >> >>> > > > On Thu, Oct 17, 2013 at 4:41 PM, Chandan Biswas < >> >>> [email protected] >> >>> > > > >wrote: >> >>> > > > >> >>> > > > > Yeah, I agree with Micah that it will not eliminate the reduce >> >>> phase >> >>> > > > > entirely. But the dummy object of U suggested by Josh (or >> >>> converting >> >>> > > to U >> >>> > > > > type in map for every record) will not improve performance >> >>> because >> >>> > > same >> >>> > > > > amounts of records will be sorted and aggregated in the reduce >> >>> phase. >> >>> > > > >> >>> > > > >> >>> > > > I don't think that's true-- the records of type U will be >> combined >> >>> on >> >>> > the >> >>> > > > map-side, which would reduce the amount of data that is pushed >> over >> >>> the >> >>> > > > network and improve performance. >> >>> > > > >> >>> > > > Can you give any additional details about what T and U are in >> this >> >>> > > > scenario? :) >> >>> > > > >> >>> > > > >> >>> > > > >> >>> > > > > But >> >>> > > > > my point is, can we improve it by applying a combiner where the >> >>> > > combineFn >> >>> > > > > provides output as different type. If we have same type, we can >> >>> use >> >>> > the >> >>> > > > > combiner to do some aggregation in map side which improves >> >>> > performance. >> >>> > > > > But, can we have some mechanism by which the same advantage >> can be >> >>> > > > achieved >> >>> > > > > when combineFn emits different type. I think, emitting same >> type >> >>> by >> >>> > > > > CombineFn has restricted its use. Can we have new CombineFn >> that >> >>> > allows >> >>> > > > us >> >>> > > > > to output different type not only same type as input? >> >>> > > > > >> >>> > > > > >> >>> > > > > On Thu, Oct 17, 2013 at 5:05 PM, Josh Wills < >> [email protected]> >> >>> > > wrote: >> >>> > > > > >> >>> > > > > > Yeah, my experience in these kinds of situations is that you >> >>> need >> >>> > to >> >>> > > > come >> >>> > > > > > up with a "dummy" or singleton version of U for the case >> where >> >>> > there >> >>> > > is >> >>> > > > > > only a single T and do that conversion on the map side of the >> >>> job, >> >>> > > > before >> >>> > > > > > the combiner runs. I think Chao had an issue like this awhile >> >>> ago, >> >>> > > > where >> >>> > > > > he >> >>> > > > > > had a PTable<String, Double> and wanted to write a combiner >> that >> >>> > > would >> >>> > > > > > return a PTable<String, Collection<Double>>. The solution >> was to >> >>> > > > convert >> >>> > > > > > the map-side object to a PTable<String, Collection<Double>>, >> >>> where >> >>> > > the >> >>> > > > > > value on the map-side was a singleton list containing just >> that >> >>> > > double >> >>> > > > > > value. Does that sort of trick work here? >> >>> > > > > > >> >>> > > > > > >> >>> > > > > > On Thu, Oct 17, 2013 at 2:57 PM, Micah Whitacre < >> >>> [email protected]> >> >>> > > > > wrote: >> >>> > > > > > >> >>> > > > > > > Ok so the feature you are trying to achieve is the >> proactive >> >>> > > > > combination >> >>> > > > > > of >> >>> > > > > > > data before performing the GBK like the javadoc describes. >> >>> > > > Essentially >> >>> > > > > > in >> >>> > > > > > > that situation the CombineFn is being used as a >> Combiner[1] to >> >>> > > > combine >> >>> > > > > > the >> >>> > > > > > > data local to that mapper before doing the GBK and then >> >>> further >> >>> > > > > combining >> >>> > > > > > > the data in the reduce operation. It will not necessarily >> >>> > > eliminate >> >>> > > > > the >> >>> > > > > > > need for all processing in the reduce. >> >>> > > > > > > >> >>> > > > > > > If you want to use this functionality you will need to do >> the >> >>> > > > > following: >> >>> > > > > > > >> >>> > > > > > > PTable<S, T> map to PTable<S, U> >> >>> > > > > > > PTable<S, U> gbk to PGT<S, U> >> >>> > > > > > > PGT<S, U> combine PTable<S, U> >> >>> > > > > > > >> >>> > > > > > > This will take advantage of any optimization provided by >> the >> >>> > > > CombineFn. >> >>> > > > > > > >> >>> > > > > > > [1] - http://wiki.apache.org/hadoop/HadoopMapReduce >> >>> > > > > > > >> >>> > > > > > > >> >>> > > > > > > >> >>> > > > > > > On Thu, Oct 17, 2013 at 4:30 PM, Chandan Biswas < >> >>> > > > [email protected] >> >>> > > > > > > >wrote: >> >>> > > > > > > >> >>> > > > > > > > Hello Micah, >> >>> > > > > > > > Yes we are using MapFn now. That aggregation and >> >>> computation is >> >>> > > > being >> >>> > > > > > > done >> >>> > > > > > > > in reduce phase. As CombineFn after GBK runs into map >> side, >> >>> > then >> >>> > > > > those >> >>> > > > > > > most >> >>> > > > > > > > computations can be done in map side which are now >> running >> >>> in >> >>> > > > reduce >> >>> > > > > > > phase. >> >>> > > > > > > > Some smaller aggregations and computations can be done on >> >>> > reduce >> >>> > > > > phase. >> >>> > > > > > > > My point was to do some aggregation (and create a new >> >>> object) >> >>> > in >> >>> > > > map >> >>> > > > > > > phase >> >>> > > > > > > > instead of in reduce phase. >> >>> > > > > > > > >> >>> > > > > > > > Thanks, >> >>> > > > > > > > Chandan >> >>> > > > > > > > >> >>> > > > > > > > >> >>> > > > > > > > On Thu, Oct 17, 2013 at 3:48 PM, Micah Whitacre < >> >>> > > [email protected]> >> >>> > > > > > > wrote: >> >>> > > > > > > > >> >>> > > > > > > > > Chandan, >> >>> > > > > > > > > I think what you are wanting will just be a simple >> >>> MapFn >> >>> > > > instead >> >>> > > > > > of >> >>> > > > > > > a >> >>> > > > > > > > > CombineFn. The doc of the CombineFn[1] sounds like >> what >> >>> you >> >>> > > want >> >>> > > > > > with >> >>> > > > > > > > the >> >>> > > > > > > > > statement "A special >> >>> > > > > > > > > DoFn< >> >>> > > > > > > >> >>> > http://crunch.apache.org/apidocs/0.7.0/org/apache/crunch/DoFn.html >> >>> > > > >> >>> > > > > > > > > implementation >> >>> > > > > > > > > that converts an >> >>> > > > > > > > > Iterable< >> >>> > > > > > > > > >> >>> > > > > > > > >> >>> > > > > > > >> >>> > > > > > >> >>> > > > > >> >>> > > > >> >>> > > >> >>> > >> >>> >> http://download.oracle.com/javase/6/docs/api/java/lang/Iterable.html?is-external=true >> >>> > > > > > > > > > >> >>> > > > > > > > > of >> >>> > > > > > > > > values into a single value" but it is expecting the >> value >> >>> to >> >>> > be >> >>> > > > of >> >>> > > > > > the >> >>> > > > > > > > same >> >>> > > > > > > > > time. Since you are wanting to combine the values >> into a >> >>> > > > different >> >>> > > > > > > form >> >>> > > > > > > > it >> >>> > > > > > > > > should be fairly trivial to write a MapFn that converts >> >>> the >> >>> > > > > > Iterable<T> >> >>> > > > > > > > -> >> >>> > > > > > > > > U. >> >>> > > > > > > > > >> >>> > > > > > > > > [1] - >> >>> > > > > > > > > >> >>> > > > > > > >> >>> > > > > >> >>> > > >> >>> >> http://crunch.apache.org/apidocs/0.7.0/org/apache/crunch/CombineFn.html >> >>> > > > > > > > > >> >>> > > > > > > > > >> >>> > > > > > > > > On Thu, Oct 17, 2013 at 3:30 PM, Chandan Biswas < >> >>> > > > > > [email protected] >> >>> > > > > > > > > >wrote: >> >>> > > > > > > > > >> >>> > > > > > > > > > I was trying to refactoring some stuffs and trying to >> >>> use >> >>> > > > > > combineFn. >> >>> > > > > > > > > > But when I went into deeper, found that I can't do >> it as >> >>> > > Crunch >> >>> > > > > > > doesn't >> >>> > > > > > > > > > allow it the functionality I needed. For example, I >> >>> have a >> >>> > > > > > > > > > PGroupedTable<S,T>. I wanted to apply CombineFn<S,T> >> on >> >>> it >> >>> > > and >> >>> > > > > > wanted >> >>> > > > > > > > to >> >>> > > > > > > > > > get PCollection<S,U> instead of T. Right now, >> CombineFn >> >>> > > allows >> >>> > > > > only >> >>> > > > > > > > same >> >>> > > > > > > > > > type as return value. The use case of this need is >> that >> >>> > there >> >>> > > > > will >> >>> > > > > > be >> >>> > > > > > > > > some >> >>> > > > > > > > > > time saving in sorting. It's natural that when >> >>> aggregating >> >>> > > some >> >>> > > > > > > objects >> >>> > > > > > > > > at >> >>> > > > > > > > > > map side can create a new different type object. >> >>> > > > > > > > > > >> >>> > > > > > > > > > Any thought on it? Am I missing any thing? If this >> can >> >>> be >> >>> > > > written >> >>> > > > > > in >> >>> > > > > > > > > > different way using existing way please let me know. >> >>> > > > > > > > > > >> >>> > > > > > > > > > Thanks >> >>> > > > > > > > > > Chandan >> >>> > > > > > > > > > >> >>> > > > > > > > > >> >>> > > > > > > > >> >>> > > > > > > >> >>> > > > > > >> >>> > > > > > >> >>> > > > > > >> >>> > > > > > -- >> >>> > > > > > Director of Data Science >> >>> > > > > > Cloudera <http://www.cloudera.com> >> >>> > > > > > Twitter: @josh_wills <http://twitter.com/josh_wills> >> >>> > > > > > >> >>> > > > > >> >>> > > > >> >>> > > > >> >>> > > > >> >>> > > > -- >> >>> > > > Director of Data Science >> >>> > > > Cloudera <http://www.cloudera.com> >> >>> > > > Twitter: @josh_wills <http://twitter.com/josh_wills> >> >>> > > > >> >>> > > >> >>> > >> >>> >> >> >> >> >> > > > > -- > Director of Data Science > Cloudera <http://www.cloudera.com> > Twitter: @josh_wills <http://twitter.com/josh_wills>
