[ https://issues.apache.org/jira/browse/FLINK-2184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573299#comment-15573299 ]
ASF GitHub Bot commented on FLINK-2184: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1975#discussion_r83318300 --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala --- @@ -224,45 +224,95 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] /** * Applies an aggregation that that gives the current minimum element of the data stream by * the given position by the given key. An independent aggregate is kept per key. - * When equality, the first element is returned with the minimal value. + * When equality, the first element is returned with the minimal value by default. * */ def minBy(position: Int): DataStream[T] = aggregate(AggregationType - .MINBY, position) - - /** + .MINBY, position, true) + + /** * Applies an aggregation that that gives the current minimum element of the data stream by - * the given field by the given key. An independent aggregate is kept per key. - * When equality, the first element is returned with the minimal value. + * the given position by the given key. An independent aggregate is kept per key. + * When equality, returns either the first or last one depending on the parameter setting. + * + */ + def minBy(position: Int, first: Boolean): DataStream[T] = aggregate(AggregationType + .MINBY, position, first) + + /** + * Applies an aggregation that that gives the current minimum element of the data stream by + * the given field by the given key. An independent aggregate is kept per key. + * When equality, the first element is returned with the minimal value by default. * */ def minBy(field: String): DataStream[T] = aggregate(AggregationType - .MINBY, field ) + .MINBY, field, true) + + /** + * Applies an aggregation that that gives the current minimum element of the data stream by + * the given field by the given key. An independent aggregate is kept per key. + * When equality, returns either the first or last one depending on the parameter setting. + * + */ + def minBy(field: String, first: Boolean): DataStream[T] = aggregate(AggregationType + .MINBY, field, first) /** * Applies an aggregation that that gives the current maximum element of the data stream by * the given position by the given key. An independent aggregate is kept per key. - * When equality, the first element is returned with the maximal value. + * When equality, the first element is returned with the maximal value by default. * */ def maxBy(position: Int): DataStream[T] = - aggregate(AggregationType.MAXBY, position) - - /** + aggregate(AggregationType.MAXBY, position, true) + + /** + * Applies an aggregation that that gives the current maximum element of the data stream by + * the given position by the given key. An independent aggregate is kept per key. + * When equality, returns either the first or last one depending on the parameter setting. + * + */ + def maxBy(position: Int, first: Boolean): DataStream[T] = + aggregate(AggregationType.MAXBY, position, first) + + /** * Applies an aggregation that that gives the current maximum element of the data stream by * the given field by the given key. An independent aggregate is kept per key. - * When equality, the first element is returned with the maximal value. + * When equality, the first element is returned with the maximal value by default. * */ def maxBy(field: String): DataStream[T] = - aggregate(AggregationType.MAXBY, field) - + aggregate(AggregationType.MAXBY, field, true) + + /** + * Applies an aggregation that that gives the current maximum element of the data stream by + * the given field by the given key. An independent aggregate is kept per key. + * When equality, returns either the first or last one depending on the parameter setting. + * + */ + def maxBy(field: String, first: Boolean): DataStream[T] = + aggregate(AggregationType.MAXBY, field, first) + + private def aggregate(aggregationType: AggregationType, position: Int): DataStream[T] = { + aggregate(aggregationType, position, false) --- End diff -- default should be `true` instead of `false` > Cannot get last element with maxBy/minBy > ---------------------------------------- > > Key: FLINK-2184 > URL: https://issues.apache.org/jira/browse/FLINK-2184 > Project: Flink > Issue Type: Improvement > Components: Scala API, Streaming > Reporter: Gábor Hermann > Priority: Minor > > In the streaming Scala API there is no method > {{maxBy(int positionToMaxBy, boolean first)}} > nor > {{minBy(int positionToMinBy, boolean first)}} > like in the Java API, where _first_ set to {{true}} indicates that the latest > found element will return. > These methods should be added to the Scala API too, in order to be consistent. -- This message was sent by Atlassian JIRA (v6.3.4#6332)