[ 
https://issues.apache.org/jira/browse/FLINK-5582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15833605#comment-15833605
 ] 

ASF GitHub Bot commented on FLINK-5582:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3186#discussion_r97227451
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
 ---
    @@ -0,0 +1,94 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.functions;
    +
    +import java.io.Serializable;
    +
    +/**
    + * 
    + * <p>Aggregation functions must be {@link Serializable} because they are 
sent around
    + * between distributed processes during distributed execution.
    + * 
    + * <p>An example how to use this interface is below:
    + * 
    + * <pre>{@code
    + * // the accumulator, which holds the state of the in-flight aggregate
    + * public class AverageAccumulator {
    + *     long count;
    + *     long sum;
    + * }
    + * 
    + * // implementation of an aggregation function for an 'average'
    + * public class Average implements AggregateFunction<Integer, 
AverageAccumulator, Double> {
    + * 
    + *     public AverageAccumulator createAccumulator() {
    + *         return new AverageAccumulator();
    + *     }
    + * 
    + *     public AverageAccumulator merge(AverageAccumulator a, 
AverageAccumulator b) {
    + *         a.count += b.count;
    + *         a.sum += b.sum;
    + *         return a;
    + *     }
    + * 
    + *     public void add(Integer value, AverageAccumulator acc) {
    + *         acc.sum += value;
    + *         acc.count++;
    + *     }
    + * 
    + *     public Double getResult(AverageAccumulator acc) {
    + *         return acc.sum / (double) acc.count;
    + *     }
    + * }
    + * 
    + * // implementation of a weighted average
    + * // this reuses the same accumulator type as the aggregate function for 
'average'
    + * public class WeightedAverage implements AggregateFunction<Datum, 
AverageAccumulator, Double> {
    + *
    + *     public AverageAccumulator createAccumulator() {
    + *         return new AverageAccumulator();
    + *     }
    + *
    + *     public AverageAccumulator merge(AverageAccumulator a, 
AverageAccumulator b) {
    + *         a.count += b.count;
    + *         a.sum += b.sum;
    + *         return a;
    + *     }
    + *
    + *     public void add(Datum value, AverageAccumulator acc) {
    + *         acc.count += value.getWeight();
    + *         acc.sum += value.getValue();
    + *     }
    + *
    + *     public Double getResult(AverageAccumulator acc) {
    + *         return acc.sum / (double) acc.count;
    + *     }
    + * }
    + * }</pre>
    + */
    +public interface AggregateFunction<IN, ACC, OUT> extends Function, 
Serializable {
    +
    +   ACC createAccumulator();
    +
    +   void add(IN value, ACC accumulator);
    --- End diff --
    
    My first feeling is to keep the name `add()` because it fits better 
together with the term `Accumulator`. One can view retractions as adding 
negative values. What do you think about that?


> Add a general distributive aggregate function
> ---------------------------------------------
>
>                 Key: FLINK-5582
>                 URL: https://issues.apache.org/jira/browse/FLINK-5582
>             Project: Flink
>          Issue Type: New Feature
>          Components: Streaming
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 1.3.0
>
>
> The {{DataStream}} API currently has two aggregation functions that can be 
> used on windows and in state, both of which have limitations:
>   - {{ReduceFunction}} only supports one type as the type that is added and 
> aggregated/returned.
>   - {{FoldFunction}} Supports different types to add and return, but is not 
> distributive, i.e. it cannot be used for hierarchical aggregation, for 
> example to split the aggregation into to pre- and final-aggregation.
> I suggest to add a generic and powerful aggregation function that supports:
>   - Different types to add, accumulate, and return
>   - The ability to merge partial aggregated by merging the accumulated type.
> The proposed interface is below. This type of interface is found in many 
> APIs, like that of various databases, and also in Apache Beam:
>   - The accumulator is the state of the running aggregate
>   - Accumulators can be merged
>   - Values are added to the accumulator
>   - Getting the result from the accumulator perform an optional finalizing 
> operation
> {code}
> public interface AggregateFunction<IN, ACC, OUT> extends Function {
>       ACC createAccumulator();
>       void add(IN value, ACC accumulator);
>       OUT getResult(ACC accumulator);
>       ACC merge(ACC a, ACC b);
> }
> {code}
> Example use:
> {code}
> public class AverageAccumulator {
>     long count;
>     long sum;
> }
> // implementation of a simple average
> public class Average implements AggregateFunction<Integer, 
> AverageAccumulator, Double> {
>     public AverageAccumulator createAccumulator() {
>         return new AverageAccumulator();
>     }
>     public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator 
> b) {
>         a.count += b.count;
>         a.sum += b.sum;
>         return a;
>     }
>     public void add(Integer value, AverageAccumulator acc) {
>         acc.sum += value;
>         acc.count++;
>     }
>     public Double getResult(AverageAccumulator acc) {
>         return acc.sum / (double) acc.count;
>     }
> }
> // implementation of a weighted average
> // this reuses the same accumulator type as the aggregate function for 
> 'average'
> public class WeightedAverage implements AggregateFunction<Datum, 
> AverageAccumulator, Double> {
>     public AverageAccumulator createAccumulator() {
>         return new AverageAccumulator();
>     }
>     public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator 
> b) {
>         a.count += b.count;
>         a.sum += b.sum;
>         return a;
>     }
>     public void add(Datum value, AverageAccumulator acc) {
>         acc.count += value.getWeight();
>         acc.sum += value.getValue();
>     }
>     public Double getResult(AverageAccumulator acc) {
>         return acc.sum / (double) acc.count;
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to