[ 
https://issues.apache.org/jira/browse/FLINK-5582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15833701#comment-15833701
 ] 

ASF GitHub Bot commented on FLINK-5582:
---------------------------------------

Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/3186
  
    Since this requires constantly extensive merge conflict resolving with the 
master, I want to merge this soon.
    
    @shaoxuan-wang has tested it life and the CI pass as well...


> Add a general distributive aggregate function
> ---------------------------------------------
>
>                 Key: FLINK-5582
>                 URL: https://issues.apache.org/jira/browse/FLINK-5582
>             Project: Flink
>          Issue Type: New Feature
>          Components: Streaming
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 1.3.0
>
>
> The {{DataStream}} API currently has two aggregation functions that can be 
> used on windows and in state, both of which have limitations:
>   - {{ReduceFunction}} only supports one type as the type that is added and 
> aggregated/returned.
>   - {{FoldFunction}} Supports different types to add and return, but is not 
> distributive, i.e. it cannot be used for hierarchical aggregation, for 
> example to split the aggregation into to pre- and final-aggregation.
> I suggest to add a generic and powerful aggregation function that supports:
>   - Different types to add, accumulate, and return
>   - The ability to merge partial aggregated by merging the accumulated type.
> The proposed interface is below. This type of interface is found in many 
> APIs, like that of various databases, and also in Apache Beam:
>   - The accumulator is the state of the running aggregate
>   - Accumulators can be merged
>   - Values are added to the accumulator
>   - Getting the result from the accumulator perform an optional finalizing 
> operation
> {code}
> public interface AggregateFunction<IN, ACC, OUT> extends Function {
>       ACC createAccumulator();
>       void add(IN value, ACC accumulator);
>       OUT getResult(ACC accumulator);
>       ACC merge(ACC a, ACC b);
> }
> {code}
> Example use:
> {code}
> public class AverageAccumulator {
>     long count;
>     long sum;
> }
> // implementation of a simple average
> public class Average implements AggregateFunction<Integer, 
> AverageAccumulator, Double> {
>     public AverageAccumulator createAccumulator() {
>         return new AverageAccumulator();
>     }
>     public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator 
> b) {
>         a.count += b.count;
>         a.sum += b.sum;
>         return a;
>     }
>     public void add(Integer value, AverageAccumulator acc) {
>         acc.sum += value;
>         acc.count++;
>     }
>     public Double getResult(AverageAccumulator acc) {
>         return acc.sum / (double) acc.count;
>     }
> }
> // implementation of a weighted average
> // this reuses the same accumulator type as the aggregate function for 
> 'average'
> public class WeightedAverage implements AggregateFunction<Datum, 
> AverageAccumulator, Double> {
>     public AverageAccumulator createAccumulator() {
>         return new AverageAccumulator();
>     }
>     public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator 
> b) {
>         a.count += b.count;
>         a.sum += b.sum;
>         return a;
>     }
>     public void add(Datum value, AverageAccumulator acc) {
>         acc.count += value.getWeight();
>         acc.sum += value.getValue();
>     }
>     public Double getResult(AverageAccumulator acc) {
>         return acc.sum / (double) acc.count;
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to