Hi Fabian,

I addressed your comments below.


> * BundledKeySegement
>   * should it be accumulatorS instead of accumulator?
>   * should accumulators be null or an empty list if no accumulator is
> present?


Good catch, forgot to update those places.  It should be "accumulators" and
an empty list when there are none. Updated in the FLIP.

>
> * update the Group By Example to use a list of accumulator instead of a
> single value


Updated.

* fix the `AvgAggregate` example:
>   * add missing `canRetract()`, and `canMerge()` methods
>   * example uses `batch()` method instead of `bundledAccumulateRetract()`
>   * example works with a single accumulator, not a list of accumulators


Updated AvgAggregate to address these things.  They should be consistent
with the interface discussed.

* in general, check the code examples for compliance with the proposed API.
>   * Some use `bundle()` instead of `bundledAccumulateRetract()`.
>   * There might be other mistakes that sneaked when evolving the API


Yeah, you're right.  I'll read through it again and try to find any missing
updates.

Thanks,
Alan

On Tue, Jan 28, 2025 at 3:58 AM Fabian Hüske <fhue...@confluent.io.invalid>
wrote:

> Thanks Alan for updating the FLIP!
>
> IMO, it looks good.
>
> Just a few nits that would be great to fix for consistency:
>
> * BundledKeySegement
>   * should it be accumulatorS instead of accumulator?
>   * should accumulators be null or an empty list if no accumulator is
> present?
> * update the Group By Example to use a list of accumulator instead of a
> single value
> * fix the `AvgAggregate` example:
>   * add missing `canRetract()`, and `canMerge()` methods
>   * example uses `batch()` method instead of `bundledAccumulateRetract()`
>   * example works with a single accumulator, not a list of accumulators
> * in general, check the code examples for compliance with the proposed API.
>   * Some use `bundle()` instead of `bundledAccumulateRetract()`.
>   * There might be other mistakes that sneaked when evolving the API
>
> Thank you,
> Fabian
>
>
> On Tue, Jan 28, 2025 at 12:54 AM Alan Sheinberg
> <asheinb...@confluent.io.invalid> wrote:
>
> > Hi everyone,
> >
> > Sorry for the delayed response!  I appreciate the comments.
> >
> > Addressing Timo's comments:
> >
> > > Correct me if I'm wrong, but for AggregateFunctions implementing a
> > > retract() and merge() is optional. How can a BundledAggregateFunction
> > > communicate whether or not this is supported to the planner? Enforcing
> > > the retract() feature in the interface specification could be an
> option,
> > > but esp for window aggregations there might not be a retract required.
> >
> >
> > This is a good catch.  Much of my experimenting with a POC was done with
> a
> > retract call and it slipped my mind that it's optional.  I think this
> will
> > have to be added to the interface  BundledAggregateFunction.
> >
> > Also how do you plan to support merge() in this design? I couldn't find
> > > any mentioning in the FLIP.
> > >
> > Searching through operators which used merge, I wasn't clear that I would
> > require it in the implementation, so I didn't think it required
> support.  I
> > now see it's used in windows and maybe elsewhere.  I'll add a list of
> > accumulators rather than a single one -- the first step will be to merge
> > accumulators before applying any of the accumulate or retract calls. I
> need
> > to look more closely at the operators that will use them, but think it
> may
> > make sense to do in this way.  Tell me if you feel strongly that they
> > should be separate method calls.
> >
> > Addressing Fabian's comments:
> >
> > > * Why do we need the `canBundle()` function? We can use the interface
> > > itself as a marker. A function that can't bundle, shouldn't implement
> the
> > > interface.
> > >   * the interface could just contain the
> > > `bundledAccumulateRetract(List<BKS> bundle)` method?
> >
> >
> > I originally had it just like you're recommending, but it removes a layer
> > of indirection and removes some flexibility because it means that a class
> > must statically decide if it's an aggregate bundled function or not.
> > FunctionDefinition doesn't work this way -- for example at planning time
> > you can determine what function kind you are rather than by directly
> > extending ScalarFunction, for example.  This flexibility allows us to
> > implement a meta UDF which can try to be any kind of UDFs and then
> > specialize during runtime.  For example, this would allow it only to
> return
> > canBundle when it's an aggregate which wants to implement the bundled
> > interface.
> >
> > * why does a function call need to handle multiple keys?
> > >   * It feels that the key-loop is something that 90% of the functions
> > would
> > > implement the same.
> > >   * Couldn't we have a default implementation for the key loop and
> > another
> > > method that just handles the bundle of a single key?
> >
> >
> > I think so.  So you mean you could overload multiple methods, one for
> > single key and another for multi key and the default for the latter would
> > just call the former N times?  That would be fine.  In some cases, you
> > would want to override the multi key version, to minimize external calls,
> > for example, but others might not.
> >
> > * does the list of updatedValuesAfterEachRow also include the last row?
> > >   * it would be good to clarify that in the API documentation
> >
> >
> > It includes the first and last value (and everything in between), so
> > duplicates those fields also set in startingValue and finalValue.  I can
> be
> > more explicit about this field.
> >
> > * Could you clarify what the table.exec.agg-bundled.size parameter refers
> > > to?
> > >   * Number of keys? Total number of rows? Number of rows per key?
> >
> >
> > Number of total input rows which may be for one or more keys.  Let me
> > clarify that.
> >
> > * It would be nice to add the keys in the example
> >
> > You're right.  It's a bit confusing without them.  I'll add them.
> >
> > * How do the bundled agg functions interact with checkpointing?
> > >   * Are bundles sent out when a checkpoint barrier is received?
> >
> >
> > Yes, for group by, it would flush the bundle when a checkpoint barrier is
> > received.  I think we would ultimately want to do this async so as not to
> > block the checkpoint (in case the bundle takes a while), but the first
> > implementation will likely be straightforward.  I can make a followup
> task
> > for this.
> >
> > For the over implementations, processElement uses ValueState and
> > mapstate to store all the intermediate data and uses watermark
> eventTimers
> > to actually do the bundle calls.  I think for these, there is no special
> > behavior for checkpoints and shouldn't hold them up at all.
> >
> > For the windowed aggregate implementations, it would also flush the
> bundle
> > when a checkpoint barrier is received. I think this will also ultimately
> > need to be made async as well, but will have a first implementation which
> > happens when the barrier is received.
> >
> > I'll update the FLIP to address the comments above and please have
> another
> > look.
> >
> > Thanks,
> > Alan
> >
> >
> > On Wed, Jan 15, 2025 at 9:06 AM Fabian Hüske
> <fhue...@confluent.io.invalid
> > >
> > wrote:
> >
> > > Hi Alan,
> > >
> > > Thanks for working on this FLIP!
> > > Overall the document looks very good, but I have a few question /
> > comments:
> > >
> > > * Why do we need the `canBundle()` function? We can use the interface
> > > itself as a marker. A function that can't bundle, shouldn't implement
> the
> > > interface.
> > >   * the interface could just contain the
> > > `bundledAccumulateRetract(List<BKS> bundle)` method?
> > >
> > > * why does a function call need to handle multiple keys?
> > >   * It feels that the key-loop is something that 90% of the functions
> > would
> > > implement the same.
> > >   * Couldn't we have a default implementation for the key loop and
> > another
> > > method that just handles the bundle of a single key?
> > >
> > > * does the list of updatedValuesAfterEachRow also include the last row?
> > >   * it would be good to clarify that in the API documentation
> > >
> > > * Could you clarify what the table.exec.agg-bundled.size parameter
> refers
> > > to?
> > >   * Number of keys? Total number of rows? Number of rows per key?
> > >
> > > * It would be nice to add the keys in the example
> > >
> > > * How do the bundled agg functions interact with checkpointing?
> > >   * Are bundles sent out when a checkpoint barrier is received?
> > >
> > > Thank you, Fabian
> > >
> > > On Mon, Jan 13, 2025 at 3:42 PM Timo Walther <twal...@apache.org>
> wrote:
> > >
> > > > Hi Alan,
> > > >
> > > > thanks for sharing this FLIP with us. Sorry for the late reply. I
> think
> > > > the FLIP is already in a very good shape.
> > > >
> > > > I like the approach that the BundledAggregateFunction is rather a
> > > > separate interface that can be implemented by advanced users. This
> > > > matches with the existing SpecializedFunction interface and the
> > upcoming
> > > > ChangelogFunction of FLIP-440.
> > > >
> > > > However, I have some additional feedback:
> > > >
> > > > Correct me if I'm wrong, but for AggregateFunctions implementing a
> > > > retract() and merge() is optional. How can a BundledAggregateFunction
> > > > communicate whether or not this is supported to the planner?
> Enforcing
> > > > the retract() feature in the interface specification could be an
> > option,
> > > > but esp for window aggregations there might not be a retract
> required.
> > > >
> > > > Also how do you plan to support merge() in this design? I couldn't
> find
> > > > any mentioning in the FLIP.
> > > >
> > > > Regards,
> > > > Timo
> > > >
> > > >
> > > >
> > > > On 12.12.24 02:57, Alan Sheinberg wrote:
> > > > > I'd like to start a discussion of FLIP-491:
> BundledAggregateFunction
> > > for
> > > > > batched aggregation [1]
> > > > >
> > > > > This feature proposes adding a new interface
> BundledAggregateFunction
> > > > that
> > > > > can be implemented by AggregateFunction UDFs.  This allows the use
> > of a
> > > > > batched method call so that users can handle many rows at a time
> for
> > > > > multiple keys rather than the per-row calls such as accumulate and
> > > > retract.
> > > > >
> > > > > The purpose is to achieve high throughput while still allowing for
> > > calls
> > > > to
> > > > > external systems or other blocking operations.  Similar calls
> through
> > > the
> > > > > conventional AggregateFunction methods would be prohibitively slow,
> > but
> > > > if
> > > > > given a batch of inputs and accumulators for each key, the
> > implementer
> > > > has
> > > > > the power to parallelize or internally batch lookups to improve
> > > > performance.
> > > > >
> > > > > Looking forward to your feedback and suggestions.
> > > > >
> > > > > [1]
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-491%3A+BundledAggregateFunction+for+batched+aggregation
> > > > >
> > > > >
> > > > > Thanks,
> > > > > Alan
> > > > >
> > > >
> > > >
> > >
> >
>

Reply via email to