Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4380#discussion_r129827640
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
 ---
    @@ -75,12 +77,105 @@ object SortUtil {
         val inputCRowType = CRowTypeInfo(inputTypeInfo)
      
         new RowTimeSortProcessFunction(
    +      0,
    +      -1,
           inputCRowType,
           collectionRowComparator)
     
       }
       
       /**
    +   * Function creates 
[org.apache.flink.streaming.api.functions.ProcessFunction] for sorting   
    +   * with offset elements based on rowtime and potentially other fields 
with
    +   * @param collationSort The Sort collation list
    +   * @param sortOffset The offset indicator
    +   * @param inputType input row type
    +   * @param inputTypeInfo input type information
    +   * @param execCfg table environment execution configuration
    +   * @return org.apache.flink.streaming.api.functions.ProcessFunction
    +   */
    +  private[flink] def createRowTimeSortFunctionOffset(
    --- End diff --
    
    I think we can consolidate all sort-related methods in `SortUtil` into 
three methods:
    
    * `createProcTimeNoSortFunction(..., sortOffset: Option[RexNode], 
sortFetch: Offset[RexNode])`
    * `createProcTimeSortFunction(..., sortOffset: Option[RexNode], sortFetch: 
Offset[RexNode])`
    * `createRowTimeSortFunction(..., sortOffset: Option[RexNode], sortFetch: 
Offset[RexNode])`
    
    Each method handles all combinations of `offset` and `fetch` with two 
simple conditions to set the parameter to `-1`, `0`, or the actual value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to