Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/4626
  
    I moved everything to `flink-streaming-contrib`.
    
    @StephanEwen I went one step further and created following helper function:
    ```
        public static <K, IN, ACC, OUT, W extends Window> 
SingleOutputStreamOperator<OUT> aggregateWithPreAggregation(
                        DataStream<IN> input,
                        KeySelector<IN, K> keySelector,
                        AggregateFunction<IN, ACC, OUT> aggregateFunction,
                        WindowAssigner<? super IN, W> windowAssigner)
    ```
    
    It adds a final aggregation step as well. With version that you proposed 
user would have to implement two slightly different `windowAssigner` and 
`aggregationFunction` for both the pre aggregation and final aggregation step. 
This could lead to a confusion and mistakes. Those functions have to be 
different because assigning windows, creating accumulators and accumulating 
happens only in the pre aggregation step and it works on `INPUT` data type, 
while in final aggregation we are working on a `Tuple3<KEY, WINDOW, 
ACCUMULATOR>` and we perform only merging of already created windows and 
accumulators. 


---

Reply via email to