[ 
https://issues.apache.org/jira/browse/FLINK-7561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16289357#comment-16289357
 ] 

ASF GitHub Bot commented on FLINK-7561:
---------------------------------------

Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/4626
  
    I moved everything to `flink-streaming-contrib`.
    
    @StephanEwen I went one step further and created following helper function:
    ```
        public static <K, IN, ACC, OUT, W extends Window> 
SingleOutputStreamOperator<OUT> aggregateWithPreAggregation(
                        DataStream<IN> input,
                        KeySelector<IN, K> keySelector,
                        AggregateFunction<IN, ACC, OUT> aggregateFunction,
                        WindowAssigner<? super IN, W> windowAssigner)
    ```
    
    It adds a final aggregation step as well. With version that you proposed 
user would have to implement two slightly different `windowAssigner` and 
`aggregationFunction` for both the pre aggregation and final aggregation step. 
This could lead to a confusion and mistakes. Those functions have to be 
different because assigning windows, creating accumulators and accumulating 
happens only in the pre aggregation step and it works on `INPUT` data type, 
while in final aggregation we are working on a `Tuple3<KEY, WINDOW, 
ACCUMULATOR>` and we perform only merging of already created windows and 
accumulators. 


> Add support for pre-aggregation in DataStream API
> -------------------------------------------------
>
>                 Key: FLINK-7561
>                 URL: https://issues.apache.org/jira/browse/FLINK-7561
>             Project: Flink
>          Issue Type: New Feature
>          Components: DataStream API
>            Reporter: Piotr Nowojski
>            Assignee: Piotr Nowojski
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to