Fabian, Thanks for your quick reply. The goal of "FLINK-6544" is not to expose state backend in all UDAGG cases. It is designed to provide an interface which provides an ability for user to access state backend when it is allowed (yes, right now this is only allowed by ProcessFunction). This interface itself does not make the things better. Instead, it provides a generic interface for the future adoption of exposing backend state in all different UDAGG cases, and the current over Aggregate and unbounded group aggregate can enjoy the benefits of accessing state backend.
In the meanwhile, I am also curious why we cannot build AggregateFunction on RichFunction. We will lose lots of benefit of having state backend for window Aggregate if it does not provide runtime context. @Stephan It is really appreciate if you can share the concerns or blocking reasons of not having AggregateFunction designed on top of RichFunction. Regards, Shaoxuan On Fri, May 12, 2017 at 6:21 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi, thanks for the proposal. > > I think exposing state to UDAGGs would be very difficult and a lot of work. > > UDAGGs are called from ProcessFunctions (stream, OVER window and non-window > aggs), AggregateFunctions (stream, group-window aggs), CombineFunctions > (batch) and GroupReduceFunctions (batch). The batch functions do not > support state backends at all, ProcessFunctions can register state, and > AggregateFunction cannot. > Even when putting the batch case aside this is very hard. > > AggregateFunctions support merging of windows. Right now, this only > involves merging of accumulators. If we allow AggregateFunctions to have > state, we would also need to provide logic to merge the state. Moreover, it > is not clearly defined when AggregateFunctions are called (similar to > Combiners in MapReduce) which would make state handling very complex. > Changing this would be a major effort in the DataStream API. > > An alternative would be to reimplement the group-window logic in the Table > API, but this will he a huge effort as well (maybe we have to do it anyway > at some point though). > > @Stephan knows more about the implications of allowing state in > AggregateFunctions. > > Best, Fabian > > 2017-05-12 11:53 GMT+02:00 Shaoxuan Wang <wshaox...@gmail.com>: > > > Hi everyone, > > > > We made some progress in the implementation of UDAGG (FLINK-5564). > However, > > we realized that there are cases where users may want to use state > backend > > to store the data. For instance, the built-in MaxWithRetractAggFunction > > currently create a hashMap to store the historical data. It will have > > problem when the # of keys are huge enough, thereby leading to OOM. > > > > In FLINK-6544, we have proposed an approach to expose State Backend > > Interface for UDAGG. A brief design doc can be found in > > https://docs.google.com/document/d/1g-wHOuFj5pMaYMJg90kVIO2IHbiQiO26 > > nWscLIOn50c/edit > > > > I am opening this discussion thread, as I realized there are recently > some > > open jiras which are towards to implement some special aggregators, such > as > > "count distinct". IMO, "count distinct" is just an UDAGG. With the new > > proposed FLINK-6544, we can just make it as a built-in agg without > changing > > the current UDAGG framework. > > > > @Fabian, @Haohui, @Radu, please take a look at FLINK-6544 and let me know > > what you think. > > Btw, we do not need include this change for release 1.3 in our opinion. > > > > Regards, > > Shaoxuan > > >