On Sat, Oct 19, 2013 at 2:34 AM, Josh Wills <[email protected]> wrote:
> I'm certainly not opposed to having something like this. Spark makes this
> distinction via Accumulable vs. Accumulator:
>
> http://spark.incubator.apache.org/docs/0.8.0/api/core/index.html#org.apache.spark.Accumulable
> http://spark.incubator.apache.org/docs/0.8.0/api/core/index.html#org.apache.spark.Accumulator
>
> Maybe we want something like "Aggregatable<R, T>" to go along with our
> Aggregator<T> (which could extend Aggregatable<T, T>)?
>
>

That sounds like a great way of exposing that functionality -- I'll
take a closer look at actually doing it.

- Gabriel

>
> On Fri, Oct 18, 2013 at 1:36 PM, Gabriel Reid <[email protected]>wrote:
>
>> This use case (map/combine <K,V> to <K,U>) seems to come up
>> repeatedly. The solution (map <K,V> to <K, Collection<V>> and then
>> combine) works but is also pretty unintuitive.
>>
>> Any thoughts on adding a util in Crunch to do this? It would basically
>> just need to be a static util method that takes a MapFn<<K,V><K,U>>
>> and a CombineFn<K,U> and would take care of the singleton collection
>> mapping stuff internally. On the one hand I'm thinking that this could
>> be pretty useful, but I'm not sure if it would make things more
>> intuitive or possibly have the reverse effect.
>>
>> Any opinions? I'm up for putting it together if people think it's worth it.
>>
>> - Gabriel
>>
>>
>> On Fri, Oct 18, 2013 at 4:14 PM, Micah Whitacre <[email protected]> wrote:
>> > Thinking about the technical issues at first glance you could say the
>> > restriction is just the way the java generics are written for the
>> CombineFn
>> > class but if you think about what is actually happening it would be
>> awkward
>> > to support changing types in the CombineFn especially when it is paired
>> > with a GroupByKey.  As I showed in the example the CombineFn essentially
>> > bookends the GBK operation by performing processing on the types before
>> and
>> > after the sorting.  The GBK's types describe the output of the map phase
>> > and input to the reduce.  If the CombineFn changed the types then the
>> > output wouldn't match the types describe by the GBK.  I'm guessing this
>> > could lead to a number of problems trying to compute the types and plan
>> for
>> > the job.
>> >
>> >
>> > On Fri, Oct 18, 2013 at 8:55 AM, Micah Whitacre <[email protected]>
>> wrote:
>> >
>> >> I'm not sure I follow how there is extra effort involved.  Are you
>> talking
>> >> development effort or processing effort?  From a development effort in
>> both
>> >> cases you need to write code that translates T to U and combines the
>> >> values.  The difference is whether that logic exists inside of a single
>> >> DoFn or is split into a MapFn + CombineFn.  So the development effort
>> >> should be the same.
>> >>
>> >>
>> >> On Fri, Oct 18, 2013 at 8:11 AM, Chandan Biswas <[email protected]
>> >wrote:
>> >>
>> >>> yeah.. i see what you are talking about. But it will take extra effort
>> to
>> >>> convert to U type. So, is there any specific reason the way CombineFn
>> >>> created initially that CombineFn will not allow other return type. Was
>> >>> there any constraints (design / complexity) to restrict to this
>> behavior?
>> >>> Thanks,
>> >>>
>> >>>
>> >>> On Thu, Oct 17, 2013 at 8:47 PM, Micah Whitacre <[email protected]>
>> wrote:
>> >>>
>> >>> > Chandan,
>> >>> >    So let's apply your situation to the types and conversion that is
>> >>> > proposed and break it down where logic will be applied.  Say we have
>> a
>> >>> > PCollection that is like the following:
>> >>> >
>> >>> > Mapper 1:
>> >>> > <id1, "Hello">
>> >>> > <id2, "World">
>> >>> > <id1, "I like turtles">
>> >>> >
>> >>> > Mapper 2
>> >>> > <id2, "Goodbye">
>> >>> >
>> >>> > This will be represented by the PTable<String, Comment>.  We then
>> apply
>> >>> a
>> >>> > MapFn to transform it into PTable<String, Book> and we'd get the
>> >>> following
>> >>> > in our PCollection:
>> >>> >
>> >>> > Mapper 1
>> >>> > <id1, <"Hello", 1>>
>> >>> > <id2, <"World", 1>>
>> >>> > <id1, <"I like turtles", 1>>
>> >>> >
>> >>> > Mapper 2
>> >>> > <id2, <"Goodbye", 1>>
>> >>> >
>> >>> > Then if we were to use the GBK + CombineFn, the output of the map
>> phase
>> >>> > would be..
>> >>> >
>> >>> > Mapper 1
>> >>> > <id2, <"World", 1>>
>> >>> > <id1, <"I like turtles", 2>>
>> >>> >
>> >>> > Mapper 2
>> >>> > <id2, <"Goodbye", 1>>
>> >>> >
>> >>> > Notice Mapper 1 would only be emitting 2 items instead of 3 and
>> >>> therefore
>> >>> > less data is sent over the wire and has to be sorted.  Also in the
>> >>> reducer
>> >>> > after the GBK is completed the CombineFn would finish its work and
>> you'd
>> >>> > get the following:
>> >>> >
>> >>> > Reducer 1
>> >>> > <id2, <"Goodbye", 2>>
>> >>> > <id1, <"I like turtles", 2>>
>> >>> >
>> >>> > The only case where this would not improve performance is if you
>> never
>> >>> emit
>> >>> > data for the same key from the same mapper or your mapper doesn't
>> reduce
>> >>> > the size of the data.
>> >>> >
>> >>> >
>> >>> > On Thu, Oct 17, 2013 at 8:18 PM, Chandan Biswas <
>> [email protected]
>> >>> > >wrote:
>> >>> >
>> >>> > > I have PTable<String,Comment>. and getting after reduce
>> PTable<String,
>> >>> > > Book>
>> >>> > >
>> >>> > > T--> Comment{ String comment, String author}, U--> Book{String id,
>> >>> String
>> >>> > > lengthiestComment, int noOfComments}
>> >>> > >
>> >>> > > But wanted to some aggregations in the map side based on some logic
>> >>> > instead
>> >>> > > of all aggregations at reduce side.
>> >>> > > Yes in worst case, data flow over the n/w will remain same, but
>> >>> sorting
>> >>> > > will be improved.
>> >>> > >
>> >>> > > Thanks,
>> >>> > > Chandan
>> >>> > >
>> >>> > >
>> >>> > > On Thu, Oct 17, 2013 at 6:46 PM, Josh Wills <[email protected]>
>> >>> wrote:
>> >>> > >
>> >>> > > > On Thu, Oct 17, 2013 at 4:41 PM, Chandan Biswas <
>> >>> [email protected]
>> >>> > > > >wrote:
>> >>> > > >
>> >>> > > > > Yeah, I agree with Micah that it will not eliminate the reduce
>> >>> phase
>> >>> > > > > entirely. But the dummy object of U suggested by Josh (or
>> >>> converting
>> >>> > > to U
>> >>> > > > > type in map for every record)  will not improve performance
>> >>> because
>> >>> > > same
>> >>> > > > > amounts of records will be sorted and aggregated in the reduce
>> >>> phase.
>> >>> > > >
>> >>> > > >
>> >>> > > > I don't think that's true-- the records of type U will be
>> combined
>> >>> on
>> >>> > the
>> >>> > > > map-side, which would reduce the amount of data that is pushed
>> over
>> >>> the
>> >>> > > > network and improve performance.
>> >>> > > >
>> >>> > > > Can you give any additional details about what T and U are in
>> this
>> >>> > > > scenario? :)
>> >>> > > >
>> >>> > > >
>> >>> > > >
>> >>> > > > > But
>> >>> > > > > my point is, can we improve it by applying a combiner where the
>> >>> > > combineFn
>> >>> > > > > provides output as different type. If we have same type, we can
>> >>> use
>> >>> > the
>> >>> > > > > combiner to do some aggregation in map side which improves
>> >>> > performance.
>> >>> > > > > But, can we have some mechanism by which the same advantage
>> can be
>> >>> > > > achieved
>> >>> > > > > when combineFn emits different type. I think, emitting same
>> type
>> >>> by
>> >>> > > > > CombineFn has restricted its use. Can we have new CombineFn
>> that
>> >>> > allows
>> >>> > > > us
>> >>> > > > > to output different type not only same type as input?
>> >>> > > > >
>> >>> > > > >
>> >>> > > > > On Thu, Oct 17, 2013 at 5:05 PM, Josh Wills <
>> [email protected]>
>> >>> > > wrote:
>> >>> > > > >
>> >>> > > > > > Yeah, my experience in these kinds of situations is that you
>> >>> need
>> >>> > to
>> >>> > > > come
>> >>> > > > > > up with a "dummy" or singleton version of U for the case
>> where
>> >>> > there
>> >>> > > is
>> >>> > > > > > only a single T and do that conversion on the map side of the
>> >>> job,
>> >>> > > > before
>> >>> > > > > > the combiner runs. I think Chao had an issue like this awhile
>> >>> ago,
>> >>> > > > where
>> >>> > > > > he
>> >>> > > > > > had a PTable<String, Double> and wanted to write a combiner
>> that
>> >>> > > would
>> >>> > > > > > return a PTable<String, Collection<Double>>. The solution
>> was to
>> >>> > > > convert
>> >>> > > > > > the map-side object to a PTable<String, Collection<Double>>,
>> >>> where
>> >>> > > the
>> >>> > > > > > value on the map-side was a singleton list containing just
>> that
>> >>> > > double
>> >>> > > > > > value. Does that sort of trick work here?
>> >>> > > > > >
>> >>> > > > > >
>> >>> > > > > > On Thu, Oct 17, 2013 at 2:57 PM, Micah Whitacre <
>> >>> [email protected]>
>> >>> > > > > wrote:
>> >>> > > > > >
>> >>> > > > > > > Ok so the feature you are trying to achieve is the
>> proactive
>> >>> > > > > combination
>> >>> > > > > > of
>> >>> > > > > > > data before performing the GBK like the javadoc describes.
>> >>> > > >  Essentially
>> >>> > > > > > in
>> >>> > > > > > > that situation the CombineFn is being used as a
>> Combiner[1] to
>> >>> > > > combine
>> >>> > > > > > the
>> >>> > > > > > > data local to that mapper before doing the GBK and then
>> >>> further
>> >>> > > > > combining
>> >>> > > > > > > the data in the reduce operation.  It will not necessarily
>> >>> > > eliminate
>> >>> > > > > the
>> >>> > > > > > > need for all processing in the reduce.
>> >>> > > > > > >
>> >>> > > > > > > If you want to use this functionality you will need to do
>> the
>> >>> > > > > following:
>> >>> > > > > > >
>> >>> > > > > > > PTable<S, T> map to PTable<S, U>
>> >>> > > > > > > PTable<S, U> gbk to PGT<S, U>
>> >>> > > > > > > PGT<S, U> combine PTable<S, U>
>> >>> > > > > > >
>> >>> > > > > > > This will take advantage of any optimization provided by
>> the
>> >>> > > > CombineFn.
>> >>> > > > > > >
>> >>> > > > > > > [1] - http://wiki.apache.org/hadoop/HadoopMapReduce
>> >>> > > > > > >
>> >>> > > > > > >
>> >>> > > > > > >
>> >>> > > > > > > On Thu, Oct 17, 2013 at 4:30 PM, Chandan Biswas <
>> >>> > > > [email protected]
>> >>> > > > > > > >wrote:
>> >>> > > > > > >
>> >>> > > > > > > > Hello Micah,
>> >>> > > > > > > > Yes we are using MapFn now. That aggregation and
>> >>> computation is
>> >>> > > > being
>> >>> > > > > > > done
>> >>> > > > > > > > in reduce phase. As CombineFn after GBK runs into map
>> side,
>> >>> > then
>> >>> > > > > those
>> >>> > > > > > > most
>> >>> > > > > > > > computations can be done in map side which are now
>> running
>> >>> in
>> >>> > > > reduce
>> >>> > > > > > > phase.
>> >>> > > > > > > > Some smaller aggregations and computations can be done on
>> >>> > reduce
>> >>> > > > > phase.
>> >>> > > > > > > > My point was to do some aggregation (and create a new
>> >>> object)
>> >>> > in
>> >>> > > > map
>> >>> > > > > > > phase
>> >>> > > > > > > > instead of in reduce phase.
>> >>> > > > > > > >
>> >>> > > > > > > > Thanks,
>> >>> > > > > > > > Chandan
>> >>> > > > > > > >
>> >>> > > > > > > >
>> >>> > > > > > > > On Thu, Oct 17, 2013 at 3:48 PM, Micah Whitacre <
>> >>> > > [email protected]>
>> >>> > > > > > > wrote:
>> >>> > > > > > > >
>> >>> > > > > > > > > Chandan,
>> >>> > > > > > > > >    I think what you are wanting will just be a simple
>> >>> MapFn
>> >>> > > > instead
>> >>> > > > > > of
>> >>> > > > > > > a
>> >>> > > > > > > > > CombineFn.  The doc of the CombineFn[1] sounds like
>> what
>> >>> you
>> >>> > > want
>> >>> > > > > > with
>> >>> > > > > > > > the
>> >>> > > > > > > > > statement "A special
>> >>> > > > > > > > > DoFn<
>> >>> > > > > > >
>> >>> > http://crunch.apache.org/apidocs/0.7.0/org/apache/crunch/DoFn.html
>> >>> > > >
>> >>> > > > > > > > > implementation
>> >>> > > > > > > > > that converts an
>> >>> > > > > > > > > Iterable<
>> >>> > > > > > > > >
>> >>> > > > > > > >
>> >>> > > > > > >
>> >>> > > > > >
>> >>> > > > >
>> >>> > > >
>> >>> > >
>> >>> >
>> >>>
>> http://download.oracle.com/javase/6/docs/api/java/lang/Iterable.html?is-external=true
>> >>> > > > > > > > > >
>> >>> > > > > > > > > of
>> >>> > > > > > > > > values into a single value" but it is expecting the
>> value
>> >>> to
>> >>> > be
>> >>> > > > of
>> >>> > > > > > the
>> >>> > > > > > > > same
>> >>> > > > > > > > > time.  Since you are wanting to combine the values
>> into a
>> >>> > > > different
>> >>> > > > > > > form
>> >>> > > > > > > > it
>> >>> > > > > > > > > should be fairly trivial to write a MapFn that converts
>> >>> the
>> >>> > > > > > Iterable<T>
>> >>> > > > > > > > ->
>> >>> > > > > > > > > U.
>> >>> > > > > > > > >
>> >>> > > > > > > > > [1] -
>> >>> > > > > > > > >
>> >>> > > > > > >
>> >>> > > > >
>> >>> > >
>> >>>
>> http://crunch.apache.org/apidocs/0.7.0/org/apache/crunch/CombineFn.html
>> >>> > > > > > > > >
>> >>> > > > > > > > >
>> >>> > > > > > > > > On Thu, Oct 17, 2013 at 3:30 PM, Chandan Biswas <
>> >>> > > > > > [email protected]
>> >>> > > > > > > > > >wrote:
>> >>> > > > > > > > >
>> >>> > > > > > > > > > I was trying to refactoring some stuffs and trying to
>> >>> use
>> >>> > > > > > combineFn.
>> >>> > > > > > > > > > But when I went into deeper, found that I can't do
>> it as
>> >>> > > Crunch
>> >>> > > > > > > doesn't
>> >>> > > > > > > > > > allow it the functionality I needed. For example, I
>> >>> have a
>> >>> > > > > > > > > > PGroupedTable<S,T>. I wanted to apply CombineFn<S,T>
>> on
>> >>> it
>> >>> > > and
>> >>> > > > > > wanted
>> >>> > > > > > > > to
>> >>> > > > > > > > > > get PCollection<S,U> instead of T. Right now,
>> CombineFn
>> >>> > > allows
>> >>> > > > > only
>> >>> > > > > > > > same
>> >>> > > > > > > > > > type as return value. The use case of this need is
>> that
>> >>> > there
>> >>> > > > > will
>> >>> > > > > > be
>> >>> > > > > > > > > some
>> >>> > > > > > > > > > time saving in sorting. It's natural that when
>> >>> aggregating
>> >>> > > some
>> >>> > > > > > > objects
>> >>> > > > > > > > > at
>> >>> > > > > > > > > > map side can create a new different type object.
>> >>> > > > > > > > > >
>> >>> > > > > > > > > > Any thought on it? Am I missing any thing? If this
>> can
>> >>> be
>> >>> > > > written
>> >>> > > > > > in
>> >>> > > > > > > > > > different way using existing way please let me know.
>> >>> > > > > > > > > >
>> >>> > > > > > > > > > Thanks
>> >>> > > > > > > > > > Chandan
>> >>> > > > > > > > > >
>> >>> > > > > > > > >
>> >>> > > > > > > >
>> >>> > > > > > >
>> >>> > > > > >
>> >>> > > > > >
>> >>> > > > > >
>> >>> > > > > > --
>> >>> > > > > > Director of Data Science
>> >>> > > > > > Cloudera <http://www.cloudera.com>
>> >>> > > > > > Twitter: @josh_wills <http://twitter.com/josh_wills>
>> >>> > > > > >
>> >>> > > > >
>> >>> > > >
>> >>> > > >
>> >>> > > >
>> >>> > > > --
>> >>> > > > Director of Data Science
>> >>> > > > Cloudera <http://www.cloudera.com>
>> >>> > > > Twitter: @josh_wills <http://twitter.com/josh_wills>
>> >>> > > >
>> >>> > >
>> >>> >
>> >>>
>> >>
>> >>
>>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>

Reply via email to