Thanks Alan for updating the FLIP!

IMO, it looks good.

Just a few nits that would be great to fix for consistency:

* BundledKeySegement
  * should it be accumulatorS instead of accumulator?
  * should accumulators be null or an empty list if no accumulator is
present?
* update the Group By Example to use a list of accumulator instead of a
single value
* fix the `AvgAggregate` example:
  * add missing `canRetract()`, and `canMerge()` methods
  * example uses `batch()` method instead of `bundledAccumulateRetract()`
  * example works with a single accumulator, not a list of accumulators
* in general, check the code examples for compliance with the proposed API.
  * Some use `bundle()` instead of `bundledAccumulateRetract()`.
  * There might be other mistakes that sneaked when evolving the API

Thank you,
Fabian


On Tue, Jan 28, 2025 at 12:54 AM Alan Sheinberg
<asheinb...@confluent.io.invalid> wrote:

> Hi everyone,
>
> Sorry for the delayed response!  I appreciate the comments.
>
> Addressing Timo's comments:
>
> > Correct me if I'm wrong, but for AggregateFunctions implementing a
> > retract() and merge() is optional. How can a BundledAggregateFunction
> > communicate whether or not this is supported to the planner? Enforcing
> > the retract() feature in the interface specification could be an option,
> > but esp for window aggregations there might not be a retract required.
>
>
> This is a good catch.  Much of my experimenting with a POC was done with a
> retract call and it slipped my mind that it's optional.  I think this will
> have to be added to the interface  BundledAggregateFunction.
>
> Also how do you plan to support merge() in this design? I couldn't find
> > any mentioning in the FLIP.
> >
> Searching through operators which used merge, I wasn't clear that I would
> require it in the implementation, so I didn't think it required support.  I
> now see it's used in windows and maybe elsewhere.  I'll add a list of
> accumulators rather than a single one -- the first step will be to merge
> accumulators before applying any of the accumulate or retract calls. I need
> to look more closely at the operators that will use them, but think it may
> make sense to do in this way.  Tell me if you feel strongly that they
> should be separate method calls.
>
> Addressing Fabian's comments:
>
> > * Why do we need the `canBundle()` function? We can use the interface
> > itself as a marker. A function that can't bundle, shouldn't implement the
> > interface.
> >   * the interface could just contain the
> > `bundledAccumulateRetract(List<BKS> bundle)` method?
>
>
> I originally had it just like you're recommending, but it removes a layer
> of indirection and removes some flexibility because it means that a class
> must statically decide if it's an aggregate bundled function or not.
> FunctionDefinition doesn't work this way -- for example at planning time
> you can determine what function kind you are rather than by directly
> extending ScalarFunction, for example.  This flexibility allows us to
> implement a meta UDF which can try to be any kind of UDFs and then
> specialize during runtime.  For example, this would allow it only to return
> canBundle when it's an aggregate which wants to implement the bundled
> interface.
>
> * why does a function call need to handle multiple keys?
> >   * It feels that the key-loop is something that 90% of the functions
> would
> > implement the same.
> >   * Couldn't we have a default implementation for the key loop and
> another
> > method that just handles the bundle of a single key?
>
>
> I think so.  So you mean you could overload multiple methods, one for
> single key and another for multi key and the default for the latter would
> just call the former N times?  That would be fine.  In some cases, you
> would want to override the multi key version, to minimize external calls,
> for example, but others might not.
>
> * does the list of updatedValuesAfterEachRow also include the last row?
> >   * it would be good to clarify that in the API documentation
>
>
> It includes the first and last value (and everything in between), so
> duplicates those fields also set in startingValue and finalValue.  I can be
> more explicit about this field.
>
> * Could you clarify what the table.exec.agg-bundled.size parameter refers
> > to?
> >   * Number of keys? Total number of rows? Number of rows per key?
>
>
> Number of total input rows which may be for one or more keys.  Let me
> clarify that.
>
> * It would be nice to add the keys in the example
>
> You're right.  It's a bit confusing without them.  I'll add them.
>
> * How do the bundled agg functions interact with checkpointing?
> >   * Are bundles sent out when a checkpoint barrier is received?
>
>
> Yes, for group by, it would flush the bundle when a checkpoint barrier is
> received.  I think we would ultimately want to do this async so as not to
> block the checkpoint (in case the bundle takes a while), but the first
> implementation will likely be straightforward.  I can make a followup task
> for this.
>
> For the over implementations, processElement uses ValueState and
> mapstate to store all the intermediate data and uses watermark eventTimers
> to actually do the bundle calls.  I think for these, there is no special
> behavior for checkpoints and shouldn't hold them up at all.
>
> For the windowed aggregate implementations, it would also flush the bundle
> when a checkpoint barrier is received. I think this will also ultimately
> need to be made async as well, but will have a first implementation which
> happens when the barrier is received.
>
> I'll update the FLIP to address the comments above and please have another
> look.
>
> Thanks,
> Alan
>
>
> On Wed, Jan 15, 2025 at 9:06 AM Fabian Hüske <fhue...@confluent.io.invalid
> >
> wrote:
>
> > Hi Alan,
> >
> > Thanks for working on this FLIP!
> > Overall the document looks very good, but I have a few question /
> comments:
> >
> > * Why do we need the `canBundle()` function? We can use the interface
> > itself as a marker. A function that can't bundle, shouldn't implement the
> > interface.
> >   * the interface could just contain the
> > `bundledAccumulateRetract(List<BKS> bundle)` method?
> >
> > * why does a function call need to handle multiple keys?
> >   * It feels that the key-loop is something that 90% of the functions
> would
> > implement the same.
> >   * Couldn't we have a default implementation for the key loop and
> another
> > method that just handles the bundle of a single key?
> >
> > * does the list of updatedValuesAfterEachRow also include the last row?
> >   * it would be good to clarify that in the API documentation
> >
> > * Could you clarify what the table.exec.agg-bundled.size parameter refers
> > to?
> >   * Number of keys? Total number of rows? Number of rows per key?
> >
> > * It would be nice to add the keys in the example
> >
> > * How do the bundled agg functions interact with checkpointing?
> >   * Are bundles sent out when a checkpoint barrier is received?
> >
> > Thank you, Fabian
> >
> > On Mon, Jan 13, 2025 at 3:42 PM Timo Walther <twal...@apache.org> wrote:
> >
> > > Hi Alan,
> > >
> > > thanks for sharing this FLIP with us. Sorry for the late reply. I think
> > > the FLIP is already in a very good shape.
> > >
> > > I like the approach that the BundledAggregateFunction is rather a
> > > separate interface that can be implemented by advanced users. This
> > > matches with the existing SpecializedFunction interface and the
> upcoming
> > > ChangelogFunction of FLIP-440.
> > >
> > > However, I have some additional feedback:
> > >
> > > Correct me if I'm wrong, but for AggregateFunctions implementing a
> > > retract() and merge() is optional. How can a BundledAggregateFunction
> > > communicate whether or not this is supported to the planner? Enforcing
> > > the retract() feature in the interface specification could be an
> option,
> > > but esp for window aggregations there might not be a retract required.
> > >
> > > Also how do you plan to support merge() in this design? I couldn't find
> > > any mentioning in the FLIP.
> > >
> > > Regards,
> > > Timo
> > >
> > >
> > >
> > > On 12.12.24 02:57, Alan Sheinberg wrote:
> > > > I'd like to start a discussion of FLIP-491: BundledAggregateFunction
> > for
> > > > batched aggregation [1]
> > > >
> > > > This feature proposes adding a new interface BundledAggregateFunction
> > > that
> > > > can be implemented by AggregateFunction UDFs.  This allows the use
> of a
> > > > batched method call so that users can handle many rows at a time for
> > > > multiple keys rather than the per-row calls such as accumulate and
> > > retract.
> > > >
> > > > The purpose is to achieve high throughput while still allowing for
> > calls
> > > to
> > > > external systems or other blocking operations.  Similar calls through
> > the
> > > > conventional AggregateFunction methods would be prohibitively slow,
> but
> > > if
> > > > given a batch of inputs and accumulators for each key, the
> implementer
> > > has
> > > > the power to parallelize or internally batch lookups to improve
> > > performance.
> > > >
> > > > Looking forward to your feedback and suggestions.
> > > >
> > > > [1]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-491%3A+BundledAggregateFunction+for+batched+aggregation
> > > >
> > > >
> > > > Thanks,
> > > > Alan
> > > >
> > >
> > >
> >
>

Reply via email to