[ 
https://issues.apache.org/jira/browse/FLINK-7561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16281748#comment-16281748
 ] 

ASF GitHub Bot commented on FLINK-7561:
---------------------------------------

Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/4626
  
    I think it would be nice to have a utility here in order to make this 
easier to use:
    ```java
    DataStream result =
        Utils.withPreaggregation(
            stream.timeWindow(Time.minutes(5)), 
            myAggregateFunction
        )
        .apply(windowFunction);
    ```
    
    The utility would basically take the aggregate function and insert the 
stream transformation for the pre-aggregation on the "*predecessor* or the 
keyed stream, and then set up the `WindowedStream` again.
    
    Pseudo code:
    ```java
    public static <T, K, W extends Window, A> WindowedStream<T, K, W> 
preaggregate(
            WindowedStream<T, K, W> windowedStream,
            AggregateFunction<T, A, T> preAggregator) {
    
       // sanity check that the windowedStream has no custom trigger and evictor
    
       PreAggregationOperator preAggOp = new 
PreAggregationOperator(preAggregator, properties from windowed stream);
    
        DataStream<T> originalStream = 'get predecessor before keyBy from 
windowed stream'
        DataStream<T> preAggregated = originalStream.transform(preAggOp , ...);
    
        WindowedStream<T, K, W> windowedAgain = preAggregated
            .keyBy(key extractor from original windowed stream)
            .window(assigner);
    
        return windowedAgain;
    }
    ```


> Add support for pre-aggregation in DataStream API
> -------------------------------------------------
>
>                 Key: FLINK-7561
>                 URL: https://issues.apache.org/jira/browse/FLINK-7561
>             Project: Flink
>          Issue Type: New Feature
>          Components: DataStream API
>            Reporter: Piotr Nowojski
>            Assignee: Piotr Nowojski
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to