Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3150#discussion_r96601727
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
 ---
    @@ -218,6 +216,84 @@ class DataSetWindowAggregate(
         }
       }
     
    +  private[this] def createEventTimeSessionWindowDataSet(
    +    inputDS: DataSet[Any],
    +    isParserCaseSensitive: Boolean): DataSet[Any] = {
    +
    +    val groupingKeys = grouping.indices.toArray
    +    val rowTypeInfo = resultRowTypeInfo
    +
    +    // grouping window
    +    if (groupingKeys.length > 0) {
    +      //create mapFunction for initializing the aggregations
    +      val mapFunction = createDataSetWindowPrepareMapFunction(
    +        window,
    +        namedAggregates,
    +        grouping,
    +        inputType,isParserCaseSensitive)
    +
    +      // create groupReduceFunction for calculating the aggregations
    +      val groupReduceFunction = 
createDataSetWindowAggregationGroupReduceFunction(
    +        window,
    +        namedAggregates,
    +        inputType,
    +        rowRelDataType,
    +        grouping,
    +        namedProperties)
    +
    +      val mappedInput =
    +        inputDS
    +        .map(mapFunction)
    +        .name(prepareOperatorName)
    +
    +      val mapReturnType = 
mapFunction.asInstanceOf[ResultTypeQueryable[Row]].getProducedType
    +
    +      // the position of the rowtime field in the intermediate result for 
map output
    +      val rowTimeFilePos = mapReturnType.getArity - 1
    +
    +      // gets the window-start and window-end position  in the 
intermediate result.
    +      val windowStartPos = rowTimeFilePos
    +      val windowEndPos = windowStartPos + 1
    --- End diff --
    
    I would move the window start/end pos into the incremental aggregation `if` 
branch. It is not used for the non-incremental part.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to