Hi everybody, I've created a GitHub branch for the new aggregation code: https://github.com/he-sk/incubator-flink/tree/aggregation
I have implemented both of the APIs that I proposed earlier, so people can play around and decide which they like better: DataSet ds = ... ds.groupBy(0).aggregate(min(1), max(1), count()) And: DataSet ds = ... ds.groupBy(0).min(1).max(1).count().aggregate() The second version is a thin layer on the first version. The aggregation functions min, max, sum, count, and average are supported. For groupings, you can select the group keys with (multiple) key() pseudo-aggregation functions. By default, all group keys are used. You can find examples in AggregationApi1Test.java and AggregationApi2Test.java. Right now, only the Java API uses the new aggregation code. I've only started learning Scala so I don't know how easy it will be to port the new API. One problem that I foresee is that the type information of the input tuples is lost. Therefore, the Scala compiler cannot do type inference on the output tuple. I hope that this can be fixed or worked around by simple specifying the output tuple type directly. I've kept the old aggregation API but marked it deprecated and renamed some functions. The next steps would be: 1) Implement Scala API. 2) Add support for POJOs (sync with streaming aggregations for that). Looking forward to your input. Best, Viktor -- View this message in context: http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-tp2311p2547.html Sent from the Apache Flink (Incubator) Mailing List archive. mailing list archive at Nabble.com.