[ https://issues.apache.org/jira/browse/FLINK-5219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15865640#comment-15865640 ]
ASF GitHub Bot commented on FLINK-5219: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3266#discussion_r101014879 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala --- @@ -294,18 +319,29 @@ class DataSetWindowAggregate( namedProperties) mappedInput.groupBy(groupingKeys: _*) - .sortGroup(rowTimeFieldPos, Order.ASCENDING) - .reduceGroup(groupReduceFunction) - .returns(rowTypeInfo) - .name(aggregateOperatorName) - .asInstanceOf[DataSet[Any]] + .sortGroup(rowTimeFieldPos, Order.ASCENDING) + .reduceGroup(groupReduceFunction) + .returns(rowTypeInfo) + .name(aggregateOperatorName) + .asInstanceOf[DataSet[Any]] + + } else { + // non-grouping window + val mapPartitionFunction = createDataSetWindowAggregationMapPartitionFunction( + window, + namedAggregates, + inputType, + rowRelDataType, + namedProperties, + isPreMapPartition = false) + + mappedInput.sortPartition(rowTimeFieldPos, Order.ASCENDING).setParallelism(1) + .mapPartition(mapPartitionFunction).setParallelism(1) --- End diff -- I think we can also use `.reduceGroup()` and a `GroupReduceFunction` here. Without `groupBy`, the `GroupReduceFunction` will be executed with parallelism 1. > Add non-grouped session windows for batch tables > ------------------------------------------------ > > Key: FLINK-5219 > URL: https://issues.apache.org/jira/browse/FLINK-5219 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: sunjincheng > Assignee: sunjincheng > > Add non-grouped session windows for batch tables as described in > [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. -- This message was sent by Atlassian JIRA (v6.3.15#6346)