[ 
https://issues.apache.org/jira/browse/FLINK-2184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573290#comment-15573290
 ] 

ASF GitHub Bot commented on FLINK-2184:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1975#discussion_r83318311
  
    --- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
 ---
    @@ -224,45 +224,95 @@ class KeyedStream[T, K](javaStream: 
KeyedJavaStream[T, K]) extends DataStream[T]
       /**
        * Applies an aggregation that that gives the current minimum element of 
the data stream by
        * the given position by the given key. An independent aggregate is kept 
per key. 
    -   * When equality, the first element is returned with the minimal value.
    +   * When equality, the first element is returned with the minimal value 
by default.
        *
        */
       def minBy(position: Int): DataStream[T] = aggregate(AggregationType
    -    .MINBY, position)
    -    
    -   /**
    +    .MINBY, position, true)
    +
    +  /**
        * Applies an aggregation that that gives the current minimum element of 
the data stream by
    -   * the given field by the given key. An independent aggregate is kept 
per key.
    -   * When equality, the first element is returned with the minimal value.
    +   * the given position by the given key. An independent aggregate is kept 
per key. 
    +   * When equality, returns either the first or last one depending on the 
parameter setting.
    +   *
    +   */
    +  def minBy(position: Int, first: Boolean): DataStream[T] = 
aggregate(AggregationType
    +    .MINBY, position, first)
    +
    +  /**
    +   * Applies an aggregation that that gives the current minimum element of 
the data stream by
    +   * the given field by the given key. An independent aggregate is kept 
per key. 
    +   * When equality, the first element is returned with the minimal value 
by default.
        *
        */
       def minBy(field: String): DataStream[T] = aggregate(AggregationType
    -    .MINBY, field )
    +    .MINBY, field, true)
    +
    +  /**
    +   * Applies an aggregation that that gives the current minimum element of 
the data stream by
    +   * the given field by the given key. An independent aggregate is kept 
per key. 
    +   * When equality, returns either the first or last one depending on the 
parameter setting.
    +   *
    +   */
    +  def minBy(field: String, first: Boolean): DataStream[T] = 
aggregate(AggregationType
    +    .MINBY, field, first)
     
        /**
        * Applies an aggregation that that gives the current maximum element of 
the data stream by
        * the given position by the given key. An independent aggregate is kept 
per key. 
    -   * When equality, the first element is returned with the maximal value.
    +   * When equality, the first element is returned with the maximal value 
by default.
        *
        */
       def maxBy(position: Int): DataStream[T] =
    -    aggregate(AggregationType.MAXBY, position)
    -    
    -   /**
    +    aggregate(AggregationType.MAXBY, position, true)
    +
    +  /**
    +   * Applies an aggregation that that gives the current maximum element of 
the data stream by
    +   * the given position by the given key. An independent aggregate is kept 
per key. 
    +   * When equality, returns either the first or last one depending on the 
parameter setting.
    +   *
    +   */
    +  def maxBy(position: Int, first: Boolean): DataStream[T] =
    +    aggregate(AggregationType.MAXBY, position, first)
    +
    +  /**
        * Applies an aggregation that that gives the current maximum element of 
the data stream by
        * the given field by the given key. An independent aggregate is kept 
per key. 
    -   * When equality, the first element is returned with the maximal value.
    +   * When equality, the first element is returned with the maximal value 
by default.
        *
        */
       def maxBy(field: String): DataStream[T] =
    -    aggregate(AggregationType.MAXBY, field)
    -    
    +    aggregate(AggregationType.MAXBY, field, true)
    +
    +  /**
    +   * Applies an aggregation that that gives the current maximum element of 
the data stream by
    +   * the given field by the given key. An independent aggregate is kept 
per key. 
    +   * When equality, returns either the first or last one depending on the 
parameter setting.
    +   *
    +   */
    +  def maxBy(field: String, first: Boolean): DataStream[T] =
    +    aggregate(AggregationType.MAXBY, field, first)
    +
    +  private def aggregate(aggregationType: AggregationType, position: Int): 
DataStream[T] = {
    +    aggregate(aggregationType, position, false)
    +  }
    +
       private def aggregate(aggregationType: AggregationType, field: String): 
DataStream[T] = {
    +    aggregate(aggregationType, field, false)
    --- End diff --
    
    default should be `true` instead of `false`


> Cannot get last element with maxBy/minBy
> ----------------------------------------
>
>                 Key: FLINK-2184
>                 URL: https://issues.apache.org/jira/browse/FLINK-2184
>             Project: Flink
>          Issue Type: Improvement
>          Components: Scala API, Streaming
>            Reporter: Gábor Hermann
>            Priority: Minor
>
> In the streaming Scala API there is no method
> {{maxBy(int positionToMaxBy, boolean first)}}
> nor
> {{minBy(int positionToMinBy, boolean first)}}
> like in the Java API, where _first_ set to {{true}} indicates that the latest 
> found element will return.
> These methods should be added to the Scala API too, in order to be consistent.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to