Hi everyone,

Sorry for the delayed response!  I appreciate the comments.

Addressing Timo's comments:

> Correct me if I'm wrong, but for AggregateFunctions implementing a
> retract() and merge() is optional. How can a BundledAggregateFunction
> communicate whether or not this is supported to the planner? Enforcing
> the retract() feature in the interface specification could be an option,
> but esp for window aggregations there might not be a retract required.


This is a good catch.  Much of my experimenting with a POC was done with a
retract call and it slipped my mind that it's optional.  I think this will
have to be added to the interface  BundledAggregateFunction.

Also how do you plan to support merge() in this design? I couldn't find
> any mentioning in the FLIP.
>
Searching through operators which used merge, I wasn't clear that I would
require it in the implementation, so I didn't think it required support.  I
now see it's used in windows and maybe elsewhere.  I'll add a list of
accumulators rather than a single one -- the first step will be to merge
accumulators before applying any of the accumulate or retract calls. I need
to look more closely at the operators that will use them, but think it may
make sense to do in this way.  Tell me if you feel strongly that they
should be separate method calls.

Addressing Fabian's comments:

> * Why do we need the `canBundle()` function? We can use the interface
> itself as a marker. A function that can't bundle, shouldn't implement the
> interface.
>   * the interface could just contain the
> `bundledAccumulateRetract(List<BKS> bundle)` method?


I originally had it just like you're recommending, but it removes a layer
of indirection and removes some flexibility because it means that a class
must statically decide if it's an aggregate bundled function or not.
FunctionDefinition doesn't work this way -- for example at planning time
you can determine what function kind you are rather than by directly
extending ScalarFunction, for example.  This flexibility allows us to
implement a meta UDF which can try to be any kind of UDFs and then
specialize during runtime.  For example, this would allow it only to return
canBundle when it's an aggregate which wants to implement the bundled
interface.

* why does a function call need to handle multiple keys?
>   * It feels that the key-loop is something that 90% of the functions would
> implement the same.
>   * Couldn't we have a default implementation for the key loop and another
> method that just handles the bundle of a single key?


I think so.  So you mean you could overload multiple methods, one for
single key and another for multi key and the default for the latter would
just call the former N times?  That would be fine.  In some cases, you
would want to override the multi key version, to minimize external calls,
for example, but others might not.

* does the list of updatedValuesAfterEachRow also include the last row?
>   * it would be good to clarify that in the API documentation


It includes the first and last value (and everything in between), so
duplicates those fields also set in startingValue and finalValue.  I can be
more explicit about this field.

* Could you clarify what the table.exec.agg-bundled.size parameter refers
> to?
>   * Number of keys? Total number of rows? Number of rows per key?


Number of total input rows which may be for one or more keys.  Let me
clarify that.

* It would be nice to add the keys in the example

You're right.  It's a bit confusing without them.  I'll add them.

* How do the bundled agg functions interact with checkpointing?
>   * Are bundles sent out when a checkpoint barrier is received?


Yes, for group by, it would flush the bundle when a checkpoint barrier is
received.  I think we would ultimately want to do this async so as not to
block the checkpoint (in case the bundle takes a while), but the first
implementation will likely be straightforward.  I can make a followup task
for this.

For the over implementations, processElement uses ValueState and
mapstate to store all the intermediate data and uses watermark eventTimers
to actually do the bundle calls.  I think for these, there is no special
behavior for checkpoints and shouldn't hold them up at all.

For the windowed aggregate implementations, it would also flush the bundle
when a checkpoint barrier is received. I think this will also ultimately
need to be made async as well, but will have a first implementation which
happens when the barrier is received.

I'll update the FLIP to address the comments above and please have another
look.

Thanks,
Alan


On Wed, Jan 15, 2025 at 9:06 AM Fabian Hüske <fhue...@confluent.io.invalid>
wrote:

> Hi Alan,
>
> Thanks for working on this FLIP!
> Overall the document looks very good, but I have a few question / comments:
>
> * Why do we need the `canBundle()` function? We can use the interface
> itself as a marker. A function that can't bundle, shouldn't implement the
> interface.
>   * the interface could just contain the
> `bundledAccumulateRetract(List<BKS> bundle)` method?
>
> * why does a function call need to handle multiple keys?
>   * It feels that the key-loop is something that 90% of the functions would
> implement the same.
>   * Couldn't we have a default implementation for the key loop and another
> method that just handles the bundle of a single key?
>
> * does the list of updatedValuesAfterEachRow also include the last row?
>   * it would be good to clarify that in the API documentation
>
> * Could you clarify what the table.exec.agg-bundled.size parameter refers
> to?
>   * Number of keys? Total number of rows? Number of rows per key?
>
> * It would be nice to add the keys in the example
>
> * How do the bundled agg functions interact with checkpointing?
>   * Are bundles sent out when a checkpoint barrier is received?
>
> Thank you, Fabian
>
> On Mon, Jan 13, 2025 at 3:42 PM Timo Walther <twal...@apache.org> wrote:
>
> > Hi Alan,
> >
> > thanks for sharing this FLIP with us. Sorry for the late reply. I think
> > the FLIP is already in a very good shape.
> >
> > I like the approach that the BundledAggregateFunction is rather a
> > separate interface that can be implemented by advanced users. This
> > matches with the existing SpecializedFunction interface and the upcoming
> > ChangelogFunction of FLIP-440.
> >
> > However, I have some additional feedback:
> >
> > Correct me if I'm wrong, but for AggregateFunctions implementing a
> > retract() and merge() is optional. How can a BundledAggregateFunction
> > communicate whether or not this is supported to the planner? Enforcing
> > the retract() feature in the interface specification could be an option,
> > but esp for window aggregations there might not be a retract required.
> >
> > Also how do you plan to support merge() in this design? I couldn't find
> > any mentioning in the FLIP.
> >
> > Regards,
> > Timo
> >
> >
> >
> > On 12.12.24 02:57, Alan Sheinberg wrote:
> > > I'd like to start a discussion of FLIP-491: BundledAggregateFunction
> for
> > > batched aggregation [1]
> > >
> > > This feature proposes adding a new interface BundledAggregateFunction
> > that
> > > can be implemented by AggregateFunction UDFs.  This allows the use of a
> > > batched method call so that users can handle many rows at a time for
> > > multiple keys rather than the per-row calls such as accumulate and
> > retract.
> > >
> > > The purpose is to achieve high throughput while still allowing for
> calls
> > to
> > > external systems or other blocking operations.  Similar calls through
> the
> > > conventional AggregateFunction methods would be prohibitively slow, but
> > if
> > > given a batch of inputs and accumulators for each key, the implementer
> > has
> > > the power to parallelize or internally batch lookups to improve
> > performance.
> > >
> > > Looking forward to your feedback and suggestions.
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-491%3A+BundledAggregateFunction+for+batched+aggregation
> > >
> > >
> > > Thanks,
> > > Alan
> > >
> >
> >
>

Reply via email to