[ https://issues.apache.org/jira/browse/FLINK-2184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573303#comment-15573303 ]
ASF GitHub Bot commented on FLINK-2184: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1975#discussion_r83319084 --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala --- @@ -362,38 +362,84 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { /** * Applies an aggregation that that gives the maximum element of the window by - * the given position. When equality, returns the first. + * the given position. When equality, returns the first by default. */ def maxBy(position: Int): DataStream[T] = aggregate(AggregationType.MAXBY, - position) + position, true) /** * Applies an aggregation that that gives the maximum element of the window by - * the given field. When equality, returns the first. + * the given position. When equality, returns either the first or last one depending + * on the parameter setting. + */ + def maxBy(position: Int, first: Boolean): DataStream[T] = aggregate(AggregationType.MAXBY, + position, first) + + /** + * Applies an aggregation that that gives the maximum element of the window by + * the given field. When equality, returns the first by default. */ def maxBy(field: String): DataStream[T] = aggregate(AggregationType.MAXBY, - field) + field, true) + + /** + * Applies an aggregation that that gives the maximum element of the window by + * the given field. When equality, returns either the first or last one depending + * on the parameter setting. + */ + def maxBy(field: String, first: Boolean): DataStream[T] = aggregate(AggregationType.MAXBY, + field, first) /** * Applies an aggregation that that gives the minimum element of the window by - * the given position. When equality, returns the first. + * the given position. When equality, returns the first by default. */ def minBy(position: Int): DataStream[T] = aggregate(AggregationType.MINBY, - position) + position, true) /** * Applies an aggregation that that gives the minimum element of the window by - * the given field. When equality, returns the first. + * the given position. When equality, returns either the first or last one depending + * on the parameter setting. + */ + def minBy(position: Int, first: Boolean): DataStream[T] = aggregate(AggregationType.MINBY, + position, first) + + /** + * Applies an aggregation that that gives the minimum element of the window by + * the given field. When equality, returns the first by default. */ def minBy(field: String): DataStream[T] = aggregate(AggregationType.MINBY, - field) + field, true) + + /** + * Applies an aggregation that that gives the minimum element of the window by + * the given field. When equality, returns either the first or last one depending + * on the parameter setting. + */ + def minBy(field: String, first: Boolean): DataStream[T] = aggregate(AggregationType.MINBY, + field, first) + + private def aggregate(aggregationType: AggregationType, position: Int): DataStream[T] = { + aggregate(aggregationType, position, false) + } private def aggregate(aggregationType: AggregationType, field: String): DataStream[T] = { + aggregate(aggregationType, field, false) + } + + private def aggregate( + aggregationType: AggregationType, + field: String, + first: Boolean): DataStream[T] = { val position = fieldNames2Indices(getInputType(), Array(field))(0) aggregate(aggregationType, position) } - def aggregate(aggregationType: AggregationType, position: Int): DataStream[T] = { + private def aggregate( + aggregationType: AggregationType, + position: Int, + first: Boolean): DataStream[T] = { --- End diff -- `first` is not used. > Cannot get last element with maxBy/minBy > ---------------------------------------- > > Key: FLINK-2184 > URL: https://issues.apache.org/jira/browse/FLINK-2184 > Project: Flink > Issue Type: Improvement > Components: Scala API, Streaming > Reporter: Gábor Hermann > Priority: Minor > > In the streaming Scala API there is no method > {{maxBy(int positionToMaxBy, boolean first)}} > nor > {{minBy(int positionToMinBy, boolean first)}} > like in the Java API, where _first_ set to {{true}} indicates that the latest > found element will return. > These methods should be added to the Scala API too, in order to be consistent. -- This message was sent by Atlassian JIRA (v6.3.4#6332)