Thanks Alan for updating the FLIP! IMO, it looks good.
Just a few nits that would be great to fix for consistency: * BundledKeySegement * should it be accumulatorS instead of accumulator? * should accumulators be null or an empty list if no accumulator is present? * update the Group By Example to use a list of accumulator instead of a single value * fix the `AvgAggregate` example: * add missing `canRetract()`, and `canMerge()` methods * example uses `batch()` method instead of `bundledAccumulateRetract()` * example works with a single accumulator, not a list of accumulators * in general, check the code examples for compliance with the proposed API. * Some use `bundle()` instead of `bundledAccumulateRetract()`. * There might be other mistakes that sneaked when evolving the API Thank you, Fabian On Tue, Jan 28, 2025 at 12:54 AM Alan Sheinberg <asheinb...@confluent.io.invalid> wrote: > Hi everyone, > > Sorry for the delayed response! I appreciate the comments. > > Addressing Timo's comments: > > > Correct me if I'm wrong, but for AggregateFunctions implementing a > > retract() and merge() is optional. How can a BundledAggregateFunction > > communicate whether or not this is supported to the planner? Enforcing > > the retract() feature in the interface specification could be an option, > > but esp for window aggregations there might not be a retract required. > > > This is a good catch. Much of my experimenting with a POC was done with a > retract call and it slipped my mind that it's optional. I think this will > have to be added to the interface BundledAggregateFunction. > > Also how do you plan to support merge() in this design? I couldn't find > > any mentioning in the FLIP. > > > Searching through operators which used merge, I wasn't clear that I would > require it in the implementation, so I didn't think it required support. I > now see it's used in windows and maybe elsewhere. I'll add a list of > accumulators rather than a single one -- the first step will be to merge > accumulators before applying any of the accumulate or retract calls. I need > to look more closely at the operators that will use them, but think it may > make sense to do in this way. Tell me if you feel strongly that they > should be separate method calls. > > Addressing Fabian's comments: > > > * Why do we need the `canBundle()` function? We can use the interface > > itself as a marker. A function that can't bundle, shouldn't implement the > > interface. > > * the interface could just contain the > > `bundledAccumulateRetract(List<BKS> bundle)` method? > > > I originally had it just like you're recommending, but it removes a layer > of indirection and removes some flexibility because it means that a class > must statically decide if it's an aggregate bundled function or not. > FunctionDefinition doesn't work this way -- for example at planning time > you can determine what function kind you are rather than by directly > extending ScalarFunction, for example. This flexibility allows us to > implement a meta UDF which can try to be any kind of UDFs and then > specialize during runtime. For example, this would allow it only to return > canBundle when it's an aggregate which wants to implement the bundled > interface. > > * why does a function call need to handle multiple keys? > > * It feels that the key-loop is something that 90% of the functions > would > > implement the same. > > * Couldn't we have a default implementation for the key loop and > another > > method that just handles the bundle of a single key? > > > I think so. So you mean you could overload multiple methods, one for > single key and another for multi key and the default for the latter would > just call the former N times? That would be fine. In some cases, you > would want to override the multi key version, to minimize external calls, > for example, but others might not. > > * does the list of updatedValuesAfterEachRow also include the last row? > > * it would be good to clarify that in the API documentation > > > It includes the first and last value (and everything in between), so > duplicates those fields also set in startingValue and finalValue. I can be > more explicit about this field. > > * Could you clarify what the table.exec.agg-bundled.size parameter refers > > to? > > * Number of keys? Total number of rows? Number of rows per key? > > > Number of total input rows which may be for one or more keys. Let me > clarify that. > > * It would be nice to add the keys in the example > > You're right. It's a bit confusing without them. I'll add them. > > * How do the bundled agg functions interact with checkpointing? > > * Are bundles sent out when a checkpoint barrier is received? > > > Yes, for group by, it would flush the bundle when a checkpoint barrier is > received. I think we would ultimately want to do this async so as not to > block the checkpoint (in case the bundle takes a while), but the first > implementation will likely be straightforward. I can make a followup task > for this. > > For the over implementations, processElement uses ValueState and > mapstate to store all the intermediate data and uses watermark eventTimers > to actually do the bundle calls. I think for these, there is no special > behavior for checkpoints and shouldn't hold them up at all. > > For the windowed aggregate implementations, it would also flush the bundle > when a checkpoint barrier is received. I think this will also ultimately > need to be made async as well, but will have a first implementation which > happens when the barrier is received. > > I'll update the FLIP to address the comments above and please have another > look. > > Thanks, > Alan > > > On Wed, Jan 15, 2025 at 9:06 AM Fabian Hüske <fhue...@confluent.io.invalid > > > wrote: > > > Hi Alan, > > > > Thanks for working on this FLIP! > > Overall the document looks very good, but I have a few question / > comments: > > > > * Why do we need the `canBundle()` function? We can use the interface > > itself as a marker. A function that can't bundle, shouldn't implement the > > interface. > > * the interface could just contain the > > `bundledAccumulateRetract(List<BKS> bundle)` method? > > > > * why does a function call need to handle multiple keys? > > * It feels that the key-loop is something that 90% of the functions > would > > implement the same. > > * Couldn't we have a default implementation for the key loop and > another > > method that just handles the bundle of a single key? > > > > * does the list of updatedValuesAfterEachRow also include the last row? > > * it would be good to clarify that in the API documentation > > > > * Could you clarify what the table.exec.agg-bundled.size parameter refers > > to? > > * Number of keys? Total number of rows? Number of rows per key? > > > > * It would be nice to add the keys in the example > > > > * How do the bundled agg functions interact with checkpointing? > > * Are bundles sent out when a checkpoint barrier is received? > > > > Thank you, Fabian > > > > On Mon, Jan 13, 2025 at 3:42 PM Timo Walther <twal...@apache.org> wrote: > > > > > Hi Alan, > > > > > > thanks for sharing this FLIP with us. Sorry for the late reply. I think > > > the FLIP is already in a very good shape. > > > > > > I like the approach that the BundledAggregateFunction is rather a > > > separate interface that can be implemented by advanced users. This > > > matches with the existing SpecializedFunction interface and the > upcoming > > > ChangelogFunction of FLIP-440. > > > > > > However, I have some additional feedback: > > > > > > Correct me if I'm wrong, but for AggregateFunctions implementing a > > > retract() and merge() is optional. How can a BundledAggregateFunction > > > communicate whether or not this is supported to the planner? Enforcing > > > the retract() feature in the interface specification could be an > option, > > > but esp for window aggregations there might not be a retract required. > > > > > > Also how do you plan to support merge() in this design? I couldn't find > > > any mentioning in the FLIP. > > > > > > Regards, > > > Timo > > > > > > > > > > > > On 12.12.24 02:57, Alan Sheinberg wrote: > > > > I'd like to start a discussion of FLIP-491: BundledAggregateFunction > > for > > > > batched aggregation [1] > > > > > > > > This feature proposes adding a new interface BundledAggregateFunction > > > that > > > > can be implemented by AggregateFunction UDFs. This allows the use > of a > > > > batched method call so that users can handle many rows at a time for > > > > multiple keys rather than the per-row calls such as accumulate and > > > retract. > > > > > > > > The purpose is to achieve high throughput while still allowing for > > calls > > > to > > > > external systems or other blocking operations. Similar calls through > > the > > > > conventional AggregateFunction methods would be prohibitively slow, > but > > > if > > > > given a batch of inputs and accumulators for each key, the > implementer > > > has > > > > the power to parallelize or internally batch lookups to improve > > > performance. > > > > > > > > Looking forward to your feedback and suggestions. > > > > > > > > [1] > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-491%3A+BundledAggregateFunction+for+batched+aggregation > > > > > > > > > > > > Thanks, > > > > Alan > > > > > > > > > > > > >