Hi all,

Are there any plans on implementing Romain's approach to support aggregating 
multiple fields per grouping in the official Storm release?

This is the original discussion on the old Google Groups group: 
https://groups.google.com/d/topic/storm-user/Hgmd9HAXCOY/discussion

Danijel

On Friday, February 22, 2013 11:59:24 PM UTC+1, Romain Lenglet wrote:
> Hi Nathan,
> 
> 
> I got around that limitation. I implemented a variant of 
> MapCombinerAggStateUpdater that accepts multiple fields as input and output, 
> executes a separate combiner aggregator for each input field, and stored all 
> updated fields as a single list into the map.
> Then, I use it similarly to what persistentAggregate() does, e.g. for summing 
> 2 fields:
> 
> 
> stream
>     .groupBy(groupFields)
> 
>     .chainedAgg()
> 
>         .aggregate(new Fields(inputField1), new Sum(), new 
> Fields(outputField1))
>         .aggregate(new Fields(inputField2), new Sum(), new 
> Fields(outputField2))
>     .chainEnd()
>     .partitionPersist(
>         new StateSpec(new MyMapFactory<List<Long>>(...)),
>         TridentUtils.fieldsUnion(groupFields, new Fields(outputField1, 
> outputField2),
>         new MyMapUpdater(..., new Sum(), new Sum()),
>       TridentUtils.fieldsConcat(groupFields, new Fields(outputField1, 
> outputField2)));
> 
> 
> That works well. All the fields are aggregated for each group key in a single 
> get-update-put cycle.
> 
> 
> So I'd suggest doing something similar: implementing a variant of 
> MapCombinerAggStateUpdater that stores lists, and a variant of 
> persistentAggregate() to uses that, like my pseudo-code above.
> 
> 
> I think that would have value e.g. for users storing multiple aggregate 
> fields in a single SQL table, one value per table.
> 
> 
> Regards,
> --
> Romain Lenglet
> On Tuesday, February 19, 2013 9:57:23 PM UTC-8, Nathan Marz wrote:That design 
> choice was made because you don't want to force users to have to store lists 
> in the database for every value. I'm open to suggestions on how to tweak the 
> design to be better at the case you described.
> 
> 
> 
> On Tue, Feb 19, 2013 at 6:45 PM, Romain Lenglet <[email protected]> 
> wrote:
> 
> Hi,
> 
> 
> I want to aggregate (sum) multiple fields for each tuple in a group, in a 
> single persistent map.
> I had no problem to implement an aggregator that chains CombinerAggregators, 
> i.e. applies each aggregator to a single field of each input tuple.
> 
> However, I have no way to use it with a persistentAggregate().
> That method can only take aggregators working on a single field.
> There is even explicitly a check in MapCombinerAggStateUpdater preventing 
> using it with multiple input fields.
> 
> What motivated that arbitrary limitation?
> 
> 
> The only way I see to have multiple aggregates for each grouped tuple is to 
> have multiple persistentAggregate(), one per aggregate.
> 
> That's very inefficient since it multiples the number of keys in my map, and 
> my keys are relatively large (I group on ~30 fields).
> 
> 
> 
> Is there an elegant way for me to use a single persistentAggregate() that 
> chains aggregations of multiple fields?
> Will I have to implement my own variant of MapCombinerAggStateUpdater?
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> Thanks in advance!
> Regards,
> --
> 
> Romain Lenglet 

Reply via email to