As stated before, I'm clueless about Spark, so I'll have to start with a basic question: is all of this stuff being run within one thread? Or is there thread-safety to be worried about?
If the aggregation is being done in a single thread, then I've got another idea (although no promises on how good it is). There could be a single Crunch Aggregator (at least a single one per thread), and the aggregators used by createCombiner, mergeValue, and mergeCombiners all have a reference to the single combiner. A call to mergeValue would then looking like this: mergeValue(combiner, value) { combiner.sharedCrunchCombiner.reset(); combiner.sharedCrunchCombiner.add(combiner.sharedCrunchCombiner.currentValue) } On Thu, Dec 5, 2013 at 7:31 PM, Josh Wills <jwi...@cloudera.com> wrote: > Hey all, > > So I'm working away on CRUNCH-296, the Crunch-on-Spark patch, and I've run > into a place where there's a mismatch between the frameworks: combiners, > and how Crunch uses Aggregators to express combine operations. > > The current Crunch Aggregator<T> assumes that it will see all of the values > for a given key all at once, because that's how things work in MapReduce > Combiner and Reducer operations. That isn't true in Spark; all of Spark's > aggregations are hash based, so when you do a combineByKey() operation in > Spark, Spark creates a HashMap and then creates and updates combiner > instance by defining three functions: > > createCombiner: V => C (takes in a value, returns a combiner object) > mergeValue: (V, C) => C (takes in a new value and an existing combiner and > updates the combiner) > mergeCombiners: (C, C) => C (take in two combiners and merge them together) > > I could do a hack that would make Aggregators usable in the way that Spark > expects them to be used-- after all, Aggregator<T> implements Serializable, > so there's no issue with serializing Aggregators across the wire in either > Spark or MapReduce by using a PTypes.serializables() with the PTypeFamily > of the key. The requirement would be that Aggregators would need to be > Cloneable-ish (although not using Cloneable b/c Josh Bloch taught me that > was evil), because certain Aggregators have state associated with them > (e.g., string concat) that would need to be passed along (this sort of > vaguely recalls the AggregatorFactory error I made many moons ago.) What I > would probably end up with is a default impl that did a > serialize/deserialize to create a new instance of the Aggregator that > subclasses that knew better could override to work optimally. > > That said, that's not the greatest thing ever, and so I'm wondering if > anyone has thought about what a generalization of aggregator would look > like. I am even open to the use of terms like "monoid" if you feel like > there's no other way to express your ideas. ;-) > > J > -- > Director of Data Science > Cloudera <http://www.cloudera.com> > Twitter: @josh_wills <http://twitter.com/josh_wills>