[
https://issues.apache.org/jira/browse/FLINK-4693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15828007#comment-15828007
]
ASF GitHub Bot commented on FLINK-4693:
---------------------------------------
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/3150#discussion_r96601727
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
---
@@ -218,6 +216,84 @@ class DataSetWindowAggregate(
}
}
+ private[this] def createEventTimeSessionWindowDataSet(
+ inputDS: DataSet[Any],
+ isParserCaseSensitive: Boolean): DataSet[Any] = {
+
+ val groupingKeys = grouping.indices.toArray
+ val rowTypeInfo = resultRowTypeInfo
+
+ // grouping window
+ if (groupingKeys.length > 0) {
+ //create mapFunction for initializing the aggregations
+ val mapFunction = createDataSetWindowPrepareMapFunction(
+ window,
+ namedAggregates,
+ grouping,
+ inputType,isParserCaseSensitive)
+
+ // create groupReduceFunction for calculating the aggregations
+ val groupReduceFunction =
createDataSetWindowAggregationGroupReduceFunction(
+ window,
+ namedAggregates,
+ inputType,
+ rowRelDataType,
+ grouping,
+ namedProperties)
+
+ val mappedInput =
+ inputDS
+ .map(mapFunction)
+ .name(prepareOperatorName)
+
+ val mapReturnType =
mapFunction.asInstanceOf[ResultTypeQueryable[Row]].getProducedType
+
+ // the position of the rowtime field in the intermediate result for
map output
+ val rowTimeFilePos = mapReturnType.getArity - 1
+
+ // gets the window-start and window-end position in the
intermediate result.
+ val windowStartPos = rowTimeFilePos
+ val windowEndPos = windowStartPos + 1
--- End diff --
I would move the window start/end pos into the incremental aggregation `if`
branch. It is not used for the non-incremental part.
> Add session group-windows for batch tables
> -------------------------------------------
>
> Key: FLINK-4693
> URL: https://issues.apache.org/jira/browse/FLINK-4693
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Timo Walther
> Assignee: sunjincheng
>
> Add Session group-windows for batch tables as described in
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)