Spark is not multithreaded, AFAIK. I like this idea, and I don't see a reason why it wouldn't work. Let me whip something up and see how it looks-- thanks!
On Thu, Dec 5, 2013 at 11:33 AM, Gabriel Reid <gabriel.r...@gmail.com>wrote: > Ugh, hit send too soon, sorry about that. Restarting... > > As stated before, I'm clueless about Spark, so I'll have to start with > a basic question: is all of this stuff being run within one thread? Or > is there thread-safety to be worried about? > > If the aggregation is being done in a single thread, then I've got > another idea (although no promises on how good it is). There could be > a single Crunch Aggregator (at least a single one per thread), and the > aggregators used by createCombiner, mergeValue, and mergeCombiners all > have a reference to the single combiner. > > A call to mergeValue would then looking like this (ignore the bad > syntax and probably wrong API): > mergeValue(combiner, value) { > combiner.sharedCrunchCombiner.reset(); > combiner.sharedCrunchCombiner.add(combiner.currentValue) > combiner.sharedCrunchCombiner.add(value) > combiner.currentValue = > combiner.sharedCrunchCombiner.getResults()[0] > } > > That could also be done by building up lists of values in each of the > combiners, and only using the shared Crunch combiner to merge them > once they got to a certain size or something. > > On the other hand, if this stuff isn't all local to a single thread: > are there not a lot of other things that might need to be worried > about, even outside the realm of Aggregators? Currently nothing in > Crunch DoFns needs to be thread-safe, for obvious reasons. Although > this probably wouldn't be an issue in 99% of the cases, I would think > that there will be things that break if DoFns or other Crunch > primitives are called simultaneously by multiple threads. > > > - Gabriel > > > > On Thu, Dec 5, 2013 at 8:27 PM, Gabriel Reid <gabriel.r...@gmail.com> > wrote: > > As stated before, I'm clueless about Spark, so I'll have to start with > > a basic question: is all of this stuff being run within one thread? Or > > is there thread-safety to be worried about? > > > > If the aggregation is being done in a single thread, then I've got > > another idea (although no promises on how good it is). There could be > > a single Crunch Aggregator (at least a single one per thread), and the > > aggregators used by createCombiner, mergeValue, and mergeCombiners all > > have a reference to the single combiner. > > > > A call to mergeValue would then looking like this: > > mergeValue(combiner, value) { > > combiner.sharedCrunchCombiner.reset(); > > > combiner.sharedCrunchCombiner.add(combiner.sharedCrunchCombiner.currentValue) > > > > } > > > > On Thu, Dec 5, 2013 at 7:31 PM, Josh Wills <jwi...@cloudera.com> wrote: > >> Hey all, > >> > >> So I'm working away on CRUNCH-296, the Crunch-on-Spark patch, and I've > run > >> into a place where there's a mismatch between the frameworks: combiners, > >> and how Crunch uses Aggregators to express combine operations. > >> > >> The current Crunch Aggregator<T> assumes that it will see all of the > values > >> for a given key all at once, because that's how things work in MapReduce > >> Combiner and Reducer operations. That isn't true in Spark; all of > Spark's > >> aggregations are hash based, so when you do a combineByKey() operation > in > >> Spark, Spark creates a HashMap and then creates and updates combiner > >> instance by defining three functions: > >> > >> createCombiner: V => C (takes in a value, returns a combiner object) > >> mergeValue: (V, C) => C (takes in a new value and an existing combiner > and > >> updates the combiner) > >> mergeCombiners: (C, C) => C (take in two combiners and merge them > together) > >> > >> I could do a hack that would make Aggregators usable in the way that > Spark > >> expects them to be used-- after all, Aggregator<T> implements > Serializable, > >> so there's no issue with serializing Aggregators across the wire in > either > >> Spark or MapReduce by using a PTypes.serializables() with the > PTypeFamily > >> of the key. The requirement would be that Aggregators would need to be > >> Cloneable-ish (although not using Cloneable b/c Josh Bloch taught me > that > >> was evil), because certain Aggregators have state associated with them > >> (e.g., string concat) that would need to be passed along (this sort of > >> vaguely recalls the AggregatorFactory error I made many moons ago.) > What I > >> would probably end up with is a default impl that did a > >> serialize/deserialize to create a new instance of the Aggregator that > >> subclasses that knew better could override to work optimally. > >> > >> That said, that's not the greatest thing ever, and so I'm wondering if > >> anyone has thought about what a generalization of aggregator would look > >> like. I am even open to the use of terms like "monoid" if you feel like > >> there's no other way to express your ideas. ;-) > >> > >> J > >> -- > >> Director of Data Science > >> Cloudera <http://www.cloudera.com> > >> Twitter: @josh_wills <http://twitter.com/josh_wills> > -- Director of Data Science Cloudera <http://www.cloudera.com> Twitter: @josh_wills <http://twitter.com/josh_wills>