[
https://issues.apache.org/jira/browse/FLINK-5219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15857515#comment-15857515
]
ASF GitHub Bot commented on FLINK-5219:
---------------------------------------
Github user wuchong commented on a diff in the pull request:
https://github.com/apache/flink/pull/3266#discussion_r99987780
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
---
@@ -271,19 +269,46 @@ class DataSetWindowAggregate(
isInputCombined = true)
mappedInput
- .groupBy(groupingKeys: _*)
- .sortGroup(rowTimeFieldPos, Order.ASCENDING)
- .combineGroup(combineGroupFunction)
- .groupBy(groupingKeys: _*)
- .sortGroup(windowStartPos, Order.ASCENDING)
- .sortGroup(windowEndPos, Order.ASCENDING)
- .reduceGroup(groupReduceFunction)
- .returns(rowTypeInfo)
- .name(aggregateOperatorName)
- .asInstanceOf[DataSet[Any]]
+ .groupBy(groupingKeys: _*)
+ .sortGroup(rowTimeFieldPos, Order.ASCENDING)
+ .combineGroup(combineGroupFunction)
+ .groupBy(groupingKeys: _*)
+ .sortGroup(windowStartPos, Order.ASCENDING)
+ .sortGroup(windowEndPos, Order.ASCENDING)
+ .reduceGroup(groupReduceFunction)
+ .returns(rowTypeInfo)
+ .name(aggregateOperatorName)
+ .asInstanceOf[DataSet[Any]]
+
+ } else {
+ // non-grouping window
+ val preMapPartitionFunction =
createDataSetWindowAggregationMapPartitionFunction(
+ window,
+ namedAggregates,
+ inputType)
+
+ val mapPartitionFunction =
createDataSetWindowAggregationMapPartitionFunction(
+ window,
+ namedAggregates,
+ inputType,
+ rowRelDataType,
+ namedProperties,
+ isPreMapPartition = false,
+ isInputCombined = true)
+
+ mappedInput.sortPartition(rowTimeFieldPos,
Order.ASCENDING).setParallelism(1)
+ .mapPartition(preMapPartitionFunction).setParallelism(1)
--- End diff --
Please indent every operator method in a single line.
I think the first `sortPartition` and the first `mapPartition` can be run
in parallel, do not need to set `setParallelism(1)`. Otherwise, the performance
will be poorer than the approach of non-incremental agg & non-grouping.
What do you think ?
> Add non-grouped session windows for batch tables
> ------------------------------------------------
>
> Key: FLINK-5219
> URL: https://issues.apache.org/jira/browse/FLINK-5219
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: sunjincheng
> Assignee: sunjincheng
>
> Add non-grouped session windows for batch tables as described in
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)