[
https://issues.apache.org/jira/browse/FLINK-7561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16281748#comment-16281748
]
ASF GitHub Bot commented on FLINK-7561:
---------------------------------------
Github user StephanEwen commented on the issue:
https://github.com/apache/flink/pull/4626
I think it would be nice to have a utility here in order to make this
easier to use:
```java
DataStream result =
Utils.withPreaggregation(
stream.timeWindow(Time.minutes(5)),
myAggregateFunction
)
.apply(windowFunction);
```
The utility would basically take the aggregate function and insert the
stream transformation for the pre-aggregation on the "*predecessor* or the
keyed stream, and then set up the `WindowedStream` again.
Pseudo code:
```java
public static <T, K, W extends Window, A> WindowedStream<T, K, W>
preaggregate(
WindowedStream<T, K, W> windowedStream,
AggregateFunction<T, A, T> preAggregator) {
// sanity check that the windowedStream has no custom trigger and evictor
PreAggregationOperator preAggOp = new
PreAggregationOperator(preAggregator, properties from windowed stream);
DataStream<T> originalStream = 'get predecessor before keyBy from
windowed stream'
DataStream<T> preAggregated = originalStream.transform(preAggOp , ...);
WindowedStream<T, K, W> windowedAgain = preAggregated
.keyBy(key extractor from original windowed stream)
.window(assigner);
return windowedAgain;
}
```
> Add support for pre-aggregation in DataStream API
> -------------------------------------------------
>
> Key: FLINK-7561
> URL: https://issues.apache.org/jira/browse/FLINK-7561
> Project: Flink
> Issue Type: New Feature
> Components: DataStream API
> Reporter: Piotr Nowojski
> Assignee: Piotr Nowojski
>
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)