I'm certainly not opposed to having something like this. Spark makes this distinction via Accumulable vs. Accumulator:
http://spark.incubator.apache.org/docs/0.8.0/api/core/index.html#org.apache.spark.Accumulable http://spark.incubator.apache.org/docs/0.8.0/api/core/index.html#org.apache.spark.Accumulator Maybe we want something like "Aggregatable<R, T>" to go along with our Aggregator<T> (which could extend Aggregatable<T, T>)? On Fri, Oct 18, 2013 at 1:36 PM, Gabriel Reid <[email protected]>wrote: > This use case (map/combine <K,V> to <K,U>) seems to come up > repeatedly. The solution (map <K,V> to <K, Collection<V>> and then > combine) works but is also pretty unintuitive. > > Any thoughts on adding a util in Crunch to do this? It would basically > just need to be a static util method that takes a MapFn<<K,V><K,U>> > and a CombineFn<K,U> and would take care of the singleton collection > mapping stuff internally. On the one hand I'm thinking that this could > be pretty useful, but I'm not sure if it would make things more > intuitive or possibly have the reverse effect. > > Any opinions? I'm up for putting it together if people think it's worth it. > > - Gabriel > > > On Fri, Oct 18, 2013 at 4:14 PM, Micah Whitacre <[email protected]> wrote: > > Thinking about the technical issues at first glance you could say the > > restriction is just the way the java generics are written for the > CombineFn > > class but if you think about what is actually happening it would be > awkward > > to support changing types in the CombineFn especially when it is paired > > with a GroupByKey. As I showed in the example the CombineFn essentially > > bookends the GBK operation by performing processing on the types before > and > > after the sorting. The GBK's types describe the output of the map phase > > and input to the reduce. If the CombineFn changed the types then the > > output wouldn't match the types describe by the GBK. I'm guessing this > > could lead to a number of problems trying to compute the types and plan > for > > the job. > > > > > > On Fri, Oct 18, 2013 at 8:55 AM, Micah Whitacre <[email protected]> > wrote: > > > >> I'm not sure I follow how there is extra effort involved. Are you > talking > >> development effort or processing effort? From a development effort in > both > >> cases you need to write code that translates T to U and combines the > >> values. The difference is whether that logic exists inside of a single > >> DoFn or is split into a MapFn + CombineFn. So the development effort > >> should be the same. > >> > >> > >> On Fri, Oct 18, 2013 at 8:11 AM, Chandan Biswas <[email protected] > >wrote: > >> > >>> yeah.. i see what you are talking about. But it will take extra effort > to > >>> convert to U type. So, is there any specific reason the way CombineFn > >>> created initially that CombineFn will not allow other return type. Was > >>> there any constraints (design / complexity) to restrict to this > behavior? > >>> Thanks, > >>> > >>> > >>> On Thu, Oct 17, 2013 at 8:47 PM, Micah Whitacre <[email protected]> > wrote: > >>> > >>> > Chandan, > >>> > So let's apply your situation to the types and conversion that is > >>> > proposed and break it down where logic will be applied. Say we have > a > >>> > PCollection that is like the following: > >>> > > >>> > Mapper 1: > >>> > <id1, "Hello"> > >>> > <id2, "World"> > >>> > <id1, "I like turtles"> > >>> > > >>> > Mapper 2 > >>> > <id2, "Goodbye"> > >>> > > >>> > This will be represented by the PTable<String, Comment>. We then > apply > >>> a > >>> > MapFn to transform it into PTable<String, Book> and we'd get the > >>> following > >>> > in our PCollection: > >>> > > >>> > Mapper 1 > >>> > <id1, <"Hello", 1>> > >>> > <id2, <"World", 1>> > >>> > <id1, <"I like turtles", 1>> > >>> > > >>> > Mapper 2 > >>> > <id2, <"Goodbye", 1>> > >>> > > >>> > Then if we were to use the GBK + CombineFn, the output of the map > phase > >>> > would be.. > >>> > > >>> > Mapper 1 > >>> > <id2, <"World", 1>> > >>> > <id1, <"I like turtles", 2>> > >>> > > >>> > Mapper 2 > >>> > <id2, <"Goodbye", 1>> > >>> > > >>> > Notice Mapper 1 would only be emitting 2 items instead of 3 and > >>> therefore > >>> > less data is sent over the wire and has to be sorted. Also in the > >>> reducer > >>> > after the GBK is completed the CombineFn would finish its work and > you'd > >>> > get the following: > >>> > > >>> > Reducer 1 > >>> > <id2, <"Goodbye", 2>> > >>> > <id1, <"I like turtles", 2>> > >>> > > >>> > The only case where this would not improve performance is if you > never > >>> emit > >>> > data for the same key from the same mapper or your mapper doesn't > reduce > >>> > the size of the data. > >>> > > >>> > > >>> > On Thu, Oct 17, 2013 at 8:18 PM, Chandan Biswas < > [email protected] > >>> > >wrote: > >>> > > >>> > > I have PTable<String,Comment>. and getting after reduce > PTable<String, > >>> > > Book> > >>> > > > >>> > > T--> Comment{ String comment, String author}, U--> Book{String id, > >>> String > >>> > > lengthiestComment, int noOfComments} > >>> > > > >>> > > But wanted to some aggregations in the map side based on some logic > >>> > instead > >>> > > of all aggregations at reduce side. > >>> > > Yes in worst case, data flow over the n/w will remain same, but > >>> sorting > >>> > > will be improved. > >>> > > > >>> > > Thanks, > >>> > > Chandan > >>> > > > >>> > > > >>> > > On Thu, Oct 17, 2013 at 6:46 PM, Josh Wills <[email protected]> > >>> wrote: > >>> > > > >>> > > > On Thu, Oct 17, 2013 at 4:41 PM, Chandan Biswas < > >>> [email protected] > >>> > > > >wrote: > >>> > > > > >>> > > > > Yeah, I agree with Micah that it will not eliminate the reduce > >>> phase > >>> > > > > entirely. But the dummy object of U suggested by Josh (or > >>> converting > >>> > > to U > >>> > > > > type in map for every record) will not improve performance > >>> because > >>> > > same > >>> > > > > amounts of records will be sorted and aggregated in the reduce > >>> phase. > >>> > > > > >>> > > > > >>> > > > I don't think that's true-- the records of type U will be > combined > >>> on > >>> > the > >>> > > > map-side, which would reduce the amount of data that is pushed > over > >>> the > >>> > > > network and improve performance. > >>> > > > > >>> > > > Can you give any additional details about what T and U are in > this > >>> > > > scenario? :) > >>> > > > > >>> > > > > >>> > > > > >>> > > > > But > >>> > > > > my point is, can we improve it by applying a combiner where the > >>> > > combineFn > >>> > > > > provides output as different type. If we have same type, we can > >>> use > >>> > the > >>> > > > > combiner to do some aggregation in map side which improves > >>> > performance. > >>> > > > > But, can we have some mechanism by which the same advantage > can be > >>> > > > achieved > >>> > > > > when combineFn emits different type. I think, emitting same > type > >>> by > >>> > > > > CombineFn has restricted its use. Can we have new CombineFn > that > >>> > allows > >>> > > > us > >>> > > > > to output different type not only same type as input? > >>> > > > > > >>> > > > > > >>> > > > > On Thu, Oct 17, 2013 at 5:05 PM, Josh Wills < > [email protected]> > >>> > > wrote: > >>> > > > > > >>> > > > > > Yeah, my experience in these kinds of situations is that you > >>> need > >>> > to > >>> > > > come > >>> > > > > > up with a "dummy" or singleton version of U for the case > where > >>> > there > >>> > > is > >>> > > > > > only a single T and do that conversion on the map side of the > >>> job, > >>> > > > before > >>> > > > > > the combiner runs. I think Chao had an issue like this awhile > >>> ago, > >>> > > > where > >>> > > > > he > >>> > > > > > had a PTable<String, Double> and wanted to write a combiner > that > >>> > > would > >>> > > > > > return a PTable<String, Collection<Double>>. The solution > was to > >>> > > > convert > >>> > > > > > the map-side object to a PTable<String, Collection<Double>>, > >>> where > >>> > > the > >>> > > > > > value on the map-side was a singleton list containing just > that > >>> > > double > >>> > > > > > value. Does that sort of trick work here? > >>> > > > > > > >>> > > > > > > >>> > > > > > On Thu, Oct 17, 2013 at 2:57 PM, Micah Whitacre < > >>> [email protected]> > >>> > > > > wrote: > >>> > > > > > > >>> > > > > > > Ok so the feature you are trying to achieve is the > proactive > >>> > > > > combination > >>> > > > > > of > >>> > > > > > > data before performing the GBK like the javadoc describes. > >>> > > > Essentially > >>> > > > > > in > >>> > > > > > > that situation the CombineFn is being used as a > Combiner[1] to > >>> > > > combine > >>> > > > > > the > >>> > > > > > > data local to that mapper before doing the GBK and then > >>> further > >>> > > > > combining > >>> > > > > > > the data in the reduce operation. It will not necessarily > >>> > > eliminate > >>> > > > > the > >>> > > > > > > need for all processing in the reduce. > >>> > > > > > > > >>> > > > > > > If you want to use this functionality you will need to do > the > >>> > > > > following: > >>> > > > > > > > >>> > > > > > > PTable<S, T> map to PTable<S, U> > >>> > > > > > > PTable<S, U> gbk to PGT<S, U> > >>> > > > > > > PGT<S, U> combine PTable<S, U> > >>> > > > > > > > >>> > > > > > > This will take advantage of any optimization provided by > the > >>> > > > CombineFn. > >>> > > > > > > > >>> > > > > > > [1] - http://wiki.apache.org/hadoop/HadoopMapReduce > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > On Thu, Oct 17, 2013 at 4:30 PM, Chandan Biswas < > >>> > > > [email protected] > >>> > > > > > > >wrote: > >>> > > > > > > > >>> > > > > > > > Hello Micah, > >>> > > > > > > > Yes we are using MapFn now. That aggregation and > >>> computation is > >>> > > > being > >>> > > > > > > done > >>> > > > > > > > in reduce phase. As CombineFn after GBK runs into map > side, > >>> > then > >>> > > > > those > >>> > > > > > > most > >>> > > > > > > > computations can be done in map side which are now > running > >>> in > >>> > > > reduce > >>> > > > > > > phase. > >>> > > > > > > > Some smaller aggregations and computations can be done on > >>> > reduce > >>> > > > > phase. > >>> > > > > > > > My point was to do some aggregation (and create a new > >>> object) > >>> > in > >>> > > > map > >>> > > > > > > phase > >>> > > > > > > > instead of in reduce phase. > >>> > > > > > > > > >>> > > > > > > > Thanks, > >>> > > > > > > > Chandan > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > On Thu, Oct 17, 2013 at 3:48 PM, Micah Whitacre < > >>> > > [email protected]> > >>> > > > > > > wrote: > >>> > > > > > > > > >>> > > > > > > > > Chandan, > >>> > > > > > > > > I think what you are wanting will just be a simple > >>> MapFn > >>> > > > instead > >>> > > > > > of > >>> > > > > > > a > >>> > > > > > > > > CombineFn. The doc of the CombineFn[1] sounds like > what > >>> you > >>> > > want > >>> > > > > > with > >>> > > > > > > > the > >>> > > > > > > > > statement "A special > >>> > > > > > > > > DoFn< > >>> > > > > > > > >>> > http://crunch.apache.org/apidocs/0.7.0/org/apache/crunch/DoFn.html > >>> > > > > >>> > > > > > > > > implementation > >>> > > > > > > > > that converts an > >>> > > > > > > > > Iterable< > >>> > > > > > > > > > >>> > > > > > > > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > http://download.oracle.com/javase/6/docs/api/java/lang/Iterable.html?is-external=true > >>> > > > > > > > > > > >>> > > > > > > > > of > >>> > > > > > > > > values into a single value" but it is expecting the > value > >>> to > >>> > be > >>> > > > of > >>> > > > > > the > >>> > > > > > > > same > >>> > > > > > > > > time. Since you are wanting to combine the values > into a > >>> > > > different > >>> > > > > > > form > >>> > > > > > > > it > >>> > > > > > > > > should be fairly trivial to write a MapFn that converts > >>> the > >>> > > > > > Iterable<T> > >>> > > > > > > > -> > >>> > > > > > > > > U. > >>> > > > > > > > > > >>> > > > > > > > > [1] - > >>> > > > > > > > > > >>> > > > > > > > >>> > > > > > >>> > > > >>> > http://crunch.apache.org/apidocs/0.7.0/org/apache/crunch/CombineFn.html > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > On Thu, Oct 17, 2013 at 3:30 PM, Chandan Biswas < > >>> > > > > > [email protected] > >>> > > > > > > > > >wrote: > >>> > > > > > > > > > >>> > > > > > > > > > I was trying to refactoring some stuffs and trying to > >>> use > >>> > > > > > combineFn. > >>> > > > > > > > > > But when I went into deeper, found that I can't do > it as > >>> > > Crunch > >>> > > > > > > doesn't > >>> > > > > > > > > > allow it the functionality I needed. For example, I > >>> have a > >>> > > > > > > > > > PGroupedTable<S,T>. I wanted to apply CombineFn<S,T> > on > >>> it > >>> > > and > >>> > > > > > wanted > >>> > > > > > > > to > >>> > > > > > > > > > get PCollection<S,U> instead of T. Right now, > CombineFn > >>> > > allows > >>> > > > > only > >>> > > > > > > > same > >>> > > > > > > > > > type as return value. The use case of this need is > that > >>> > there > >>> > > > > will > >>> > > > > > be > >>> > > > > > > > > some > >>> > > > > > > > > > time saving in sorting. It's natural that when > >>> aggregating > >>> > > some > >>> > > > > > > objects > >>> > > > > > > > > at > >>> > > > > > > > > > map side can create a new different type object. > >>> > > > > > > > > > > >>> > > > > > > > > > Any thought on it? Am I missing any thing? If this > can > >>> be > >>> > > > written > >>> > > > > > in > >>> > > > > > > > > > different way using existing way please let me know. > >>> > > > > > > > > > > >>> > > > > > > > > > Thanks > >>> > > > > > > > > > Chandan > >>> > > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > >>> > > > > > > > >>> > > > > > > >>> > > > > > > >>> > > > > > > >>> > > > > > -- > >>> > > > > > Director of Data Science > >>> > > > > > Cloudera <http://www.cloudera.com> > >>> > > > > > Twitter: @josh_wills <http://twitter.com/josh_wills> > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > > >>> > > > > >>> > > > -- > >>> > > > Director of Data Science > >>> > > > Cloudera <http://www.cloudera.com> > >>> > > > Twitter: @josh_wills <http://twitter.com/josh_wills> > >>> > > > > >>> > > > >>> > > >>> > >> > >> > -- Director of Data Science Cloudera <http://www.cloudera.com> Twitter: @josh_wills <http://twitter.com/josh_wills>
