Thanks Alan for updating the FLIP!
IMO, it looks good.
Just a few nits that would be great to fix for consistency:
* BundledKeySegement
* should it be accumulatorS instead of accumulator?
* should accumulators be null or an empty list if no accumulator is
present?
* update the Group By Example to use a list of accumulator instead of a
single value
* fix the `AvgAggregate` example:
* add missing `canRetract()`, and `canMerge()` methods
* example uses `batch()` method instead of `bundledAccumulateRetract()`
* example works with a single accumulator, not a list of accumulators
* in general, check the code examples for compliance with the proposed
API.
* Some use `bundle()` instead of `bundledAccumulateRetract()`.
* There might be other mistakes that sneaked when evolving the API
Thank you,
Fabian
On Tue, Jan 28, 2025 at 12:54 AM Alan Sheinberg
<asheinb...@confluent.io.invalid> wrote:
Hi everyone,
Sorry for the delayed response! I appreciate the comments.
Addressing Timo's comments:
Correct me if I'm wrong, but for AggregateFunctions implementing a
retract() and merge() is optional. How can a BundledAggregateFunction
communicate whether or not this is supported to the planner? Enforcing
the retract() feature in the interface specification could be an
option,
but esp for window aggregations there might not be a retract required.
This is a good catch. Much of my experimenting with a POC was done
with a
retract call and it slipped my mind that it's optional. I think this
will
have to be added to the interface BundledAggregateFunction.
Also how do you plan to support merge() in this design? I couldn't find
any mentioning in the FLIP.
Searching through operators which used merge, I wasn't clear that I
would
require it in the implementation, so I didn't think it required
support. I
now see it's used in windows and maybe elsewhere. I'll add a list of
accumulators rather than a single one -- the first step will be to merge
accumulators before applying any of the accumulate or retract calls. I
need
to look more closely at the operators that will use them, but think it
may
make sense to do in this way. Tell me if you feel strongly that they
should be separate method calls.
Addressing Fabian's comments:
* Why do we need the `canBundle()` function? We can use the interface
itself as a marker. A function that can't bundle, shouldn't implement
the
interface.
* the interface could just contain the
`bundledAccumulateRetract(List<BKS> bundle)` method?
I originally had it just like you're recommending, but it removes a
layer
of indirection and removes some flexibility because it means that a
class
must statically decide if it's an aggregate bundled function or not.
FunctionDefinition doesn't work this way -- for example at planning time
you can determine what function kind you are rather than by directly
extending ScalarFunction, for example. This flexibility allows us to
implement a meta UDF which can try to be any kind of UDFs and then
specialize during runtime. For example, this would allow it only to
return
canBundle when it's an aggregate which wants to implement the bundled
interface.
* why does a function call need to handle multiple keys?
* It feels that the key-loop is something that 90% of the functions
would
implement the same.
* Couldn't we have a default implementation for the key loop and
another
method that just handles the bundle of a single key?
I think so. So you mean you could overload multiple methods, one for
single key and another for multi key and the default for the latter
would
just call the former N times? That would be fine. In some cases, you
would want to override the multi key version, to minimize external
calls,
for example, but others might not.
* does the list of updatedValuesAfterEachRow also include the last row?
* it would be good to clarify that in the API documentation
It includes the first and last value (and everything in between), so
duplicates those fields also set in startingValue and finalValue. I
can be
more explicit about this field.
* Could you clarify what the table.exec.agg-bundled.size parameter
refers
to?
* Number of keys? Total number of rows? Number of rows per key?
Number of total input rows which may be for one or more keys. Let me
clarify that.
* It would be nice to add the keys in the example
You're right. It's a bit confusing without them. I'll add them.
* How do the bundled agg functions interact with checkpointing?
* Are bundles sent out when a checkpoint barrier is received?
Yes, for group by, it would flush the bundle when a checkpoint barrier
is
received. I think we would ultimately want to do this async so as not
to
block the checkpoint (in case the bundle takes a while), but the first
implementation will likely be straightforward. I can make a followup
task
for this.
For the over implementations, processElement uses ValueState and
mapstate to store all the intermediate data and uses watermark
eventTimers
to actually do the bundle calls. I think for these, there is no special
behavior for checkpoints and shouldn't hold them up at all.
For the windowed aggregate implementations, it would also flush the
bundle
when a checkpoint barrier is received. I think this will also ultimately
need to be made async as well, but will have a first implementation
which
happens when the barrier is received.
I'll update the FLIP to address the comments above and please have
another
look.
Thanks,
Alan
On Wed, Jan 15, 2025 at 9:06 AM Fabian Hüske
<fhue...@confluent.io.invalid
wrote:
Hi Alan,
Thanks for working on this FLIP!
Overall the document looks very good, but I have a few question /
comments:
* Why do we need the `canBundle()` function? We can use the interface
itself as a marker. A function that can't bundle, shouldn't implement
the
interface.
* the interface could just contain the
`bundledAccumulateRetract(List<BKS> bundle)` method?
* why does a function call need to handle multiple keys?
* It feels that the key-loop is something that 90% of the functions
would
implement the same.
* Couldn't we have a default implementation for the key loop and
another
method that just handles the bundle of a single key?
* does the list of updatedValuesAfterEachRow also include the last
row?
* it would be good to clarify that in the API documentation
* Could you clarify what the table.exec.agg-bundled.size parameter
refers
to?
* Number of keys? Total number of rows? Number of rows per key?
* It would be nice to add the keys in the example
* How do the bundled agg functions interact with checkpointing?
* Are bundles sent out when a checkpoint barrier is received?
Thank you, Fabian
On Mon, Jan 13, 2025 at 3:42 PM Timo Walther <twal...@apache.org>
wrote:
Hi Alan,
thanks for sharing this FLIP with us. Sorry for the late reply. I
think
the FLIP is already in a very good shape.
I like the approach that the BundledAggregateFunction is rather a
separate interface that can be implemented by advanced users. This
matches with the existing SpecializedFunction interface and the
upcoming
ChangelogFunction of FLIP-440.
However, I have some additional feedback:
Correct me if I'm wrong, but for AggregateFunctions implementing a
retract() and merge() is optional. How can a
BundledAggregateFunction
communicate whether or not this is supported to the planner?
Enforcing
the retract() feature in the interface specification could be an
option,
but esp for window aggregations there might not be a retract
required.
Also how do you plan to support merge() in this design? I couldn't
find
any mentioning in the FLIP.
Regards,
Timo
On 12.12.24 02:57, Alan Sheinberg wrote:
I'd like to start a discussion of FLIP-491:
BundledAggregateFunction
for
batched aggregation [1]
This feature proposes adding a new interface
BundledAggregateFunction
that
can be implemented by AggregateFunction UDFs. This allows the use
of a
batched method call so that users can handle many rows at a time
for
multiple keys rather than the per-row calls such as accumulate and
retract.
The purpose is to achieve high throughput while still allowing for
calls
to
external systems or other blocking operations. Similar calls
through
the
conventional AggregateFunction methods would be prohibitively
slow,
but
if
given a batch of inputs and accumulators for each key, the
implementer
has
the power to parallelize or internally batch lookups to improve
performance.
Looking forward to your feedback and suggestions.
[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-491%3A+BundledAggregateFunction+for+batched+aggregation
Thanks,
Alan