[ https://issues.apache.org/jira/browse/FLINK-1272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14355629#comment-14355629 ]
Fabian Hueske commented on FLINK-1272: -------------------------------------- OK, thanks! > Add a "reduceWithKey" function > ------------------------------ > > Key: FLINK-1272 > URL: https://issues.apache.org/jira/browse/FLINK-1272 > Project: Flink > Issue Type: New Feature > Components: Java API, Scala API > Reporter: Stephan Ewen > Assignee: Suneel Marthi > > Flink does not assume a key/value model for grouping/aggregating/joining. The > keys are specified as positions or paths of the objects to be grouped/joined. > Currently, we do not expose the key in the {{ReduceFunction}} and > {{GroupReduceFunction}}, bit give (iterators over) the objects themselves. > Since it is a common case to access the key, I suggest to add a convenience > function {{GroupReduceWithKey}} that has the following signature and can be > called as follows: > {code} > public interface GroupReduceWithKeyFunction<KEY, IN, OUT> { > void reduceGroup(KEY key, Iterable<IN> value, Collector<OUT> out); > } > {code} > Scala: > {code} > val data : DataSet[SomePOJO] = ... > data > .groupBy("id") > .reduceGroup( (key, value, out : Collector[(String, Long)]) => > out.collect( (key, values.minBy(_.timestamp) ) ); > {code} > Java: > {code} > DataSet<SomePOJO> data = ... > data > .groupBy("id") > .reduceGroup( > new GroupReduceWithKeyFunction<String, SomePOJO, Tuple2<String, Long>> { > ... > } > {code} > The sae -- This message was sent by Atlassian JIRA (v6.3.4#6332)