Hi,

In general I believe it is a good idea to expose the state backend to the 
functions. You can always optimize the data processing based on the data 
storage. Hence, as the level of the processing (aggregation here) you would be 
able to control the access to data, you can implement this in a smart way. 
Moreover, we can also construct different data organizations/partition 
strategy/etc based on the specific computation. I understand that this would be 
quite an effort, but at some point it is worth making it.

Meanwhile if it would not be possible to have the aggregation function 
extending the rich interface, wouldn't we be able to supplement this with some 
extra logic in the process function that would provide the aggregates the 
needed data or at least pointers to the required state?

As far as I know it would be legal now to have something like:

ProcessFunction () {

ValueState state = ...

processElement(newElement) {

acc.accumulate(newElement, state)

}
}

WeightedAvgAccum {

public void accumulate(Row newElement, ValueState state) {

    state.value....
}
}
Would something like this at least partially solve the problem? ...it would 
allow you to manage the intermediate data directly in the state instead of the 
memory


-----Original Message-----
From: Shaoxuan Wang [mailto:wshaox...@gmail.com] 
Sent: Friday, May 12, 2017 1:20 PM
To: Dev
Cc: Stephan Ewen
Subject: Re: [DISCUSS] Expose State Backend Interface for UDAGG

Fabian,
Thanks for your quick reply.
The goal of "FLINK-6544" is not to expose state backend in all UDAGG cases.
It is designed to provide an interface which provides an ability for user to 
access state backend when it is allowed (yes, right now this is only allowed by 
ProcessFunction).  This interface itself does not make the things better. 
Instead, it provides a generic interface for the future adoption of exposing 
backend state in all different UDAGG cases, and the current over Aggregate and 
unbounded group aggregate can enjoy the benefits of accessing state backend.

In the meanwhile, I am also curious why we cannot build AggregateFunction on 
RichFunction. We will lose lots of benefit of having state backend for window 
Aggregate if it does not provide runtime context.
@Stephan It is really appreciate if you can share the concerns or blocking 
reasons of not having AggregateFunction designed on top of RichFunction.

Regards,
Shaoxuan


On Fri, May 12, 2017 at 6:21 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi, thanks for the proposal.
>
> I think exposing state to UDAGGs would be very difficult and a lot of work.
>
> UDAGGs are called from ProcessFunctions (stream, OVER window and 
> non-window aggs), AggregateFunctions (stream, group-window aggs), 
> CombineFunctions
> (batch) and GroupReduceFunctions (batch). The batch functions do not 
> support state backends at all, ProcessFunctions can register state, 
> and AggregateFunction cannot.
> Even when putting the batch case aside this is very hard.
>
> AggregateFunctions support merging of windows. Right now, this only 
> involves merging of accumulators. If we allow AggregateFunctions to 
> have state, we would also need to provide logic to merge the state. 
> Moreover, it is not clearly defined when AggregateFunctions are called 
> (similar to Combiners in MapReduce) which would make state handling very 
> complex.
> Changing this would be a major effort in the DataStream API.
>
> An alternative would be to reimplement the group-window logic in the 
> Table API, but this will he a huge effort as well (maybe we have to do 
> it anyway at some point though).
>
> @Stephan knows more about the implications of allowing state in 
> AggregateFunctions.
>
> Best, Fabian
>
> 2017-05-12 11:53 GMT+02:00 Shaoxuan Wang <wshaox...@gmail.com>:
>
> > Hi everyone,
> >
> > We made some progress in the implementation of UDAGG (FLINK-5564).
> However,
> > we realized that there are cases where users may want to use state
> backend
> > to store the data. For instance, the built-in 
> > MaxWithRetractAggFunction currently create a hashMap to store the 
> > historical data. It will have problem when the # of keys are huge enough, 
> > thereby leading to OOM.
> >
> > In FLINK-6544, we have proposed an approach to expose State Backend 
> > Interface for UDAGG. A brief design doc can be found in
> > https://docs.google.com/document/d/1g-wHOuFj5pMaYMJg90kVIO2IHbiQiO26
> > nWscLIOn50c/edit
> >
> > I am opening this discussion thread, as I realized there are 
> > recently
> some
> > open jiras which are towards to implement some special aggregators, 
> > such
> as
> > "count distinct". IMO, "count distinct" is just an UDAGG. With the 
> > new proposed FLINK-6544, we can just make it as a built-in agg 
> > without
> changing
> > the current UDAGG framework.
> >
> > @Fabian, @Haohui, @Radu, please take a look at FLINK-6544 and let me 
> > know what you think.
> > Btw, we do not need include this change for release 1.3 in our opinion.
> >
> > Regards,
> > Shaoxuan
> >
>

Reply via email to