Hi, thanks for the proposal. I think exposing state to UDAGGs would be very difficult and a lot of work.
UDAGGs are called from ProcessFunctions (stream, OVER window and non-window aggs), AggregateFunctions (stream, group-window aggs), CombineFunctions (batch) and GroupReduceFunctions (batch). The batch functions do not support state backends at all, ProcessFunctions can register state, and AggregateFunction cannot. Even when putting the batch case aside this is very hard. AggregateFunctions support merging of windows. Right now, this only involves merging of accumulators. If we allow AggregateFunctions to have state, we would also need to provide logic to merge the state. Moreover, it is not clearly defined when AggregateFunctions are called (similar to Combiners in MapReduce) which would make state handling very complex. Changing this would be a major effort in the DataStream API. An alternative would be to reimplement the group-window logic in the Table API, but this will he a huge effort as well (maybe we have to do it anyway at some point though). @Stephan knows more about the implications of allowing state in AggregateFunctions. Best, Fabian 2017-05-12 11:53 GMT+02:00 Shaoxuan Wang <wshaox...@gmail.com>: > Hi everyone, > > We made some progress in the implementation of UDAGG (FLINK-5564). However, > we realized that there are cases where users may want to use state backend > to store the data. For instance, the built-in MaxWithRetractAggFunction > currently create a hashMap to store the historical data. It will have > problem when the # of keys are huge enough, thereby leading to OOM. > > In FLINK-6544, we have proposed an approach to expose State Backend > Interface for UDAGG. A brief design doc can be found in > https://docs.google.com/document/d/1g-wHOuFj5pMaYMJg90kVIO2IHbiQiO26 > nWscLIOn50c/edit > > I am opening this discussion thread, as I realized there are recently some > open jiras which are towards to implement some special aggregators, such as > "count distinct". IMO, "count distinct" is just an UDAGG. With the new > proposed FLINK-6544, we can just make it as a built-in agg without changing > the current UDAGG framework. > > @Fabian, @Haohui, @Radu, please take a look at FLINK-6544 and let me know > what you think. > Btw, we do not need include this change for release 1.3 in our opinion. > > Regards, > Shaoxuan >