[
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15898182#comment-15898182
]
ASF GitHub Bot commented on FLINK-5047:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3364#discussion_r104515406
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
---
@@ -280,6 +285,138 @@ class DataSetWindowAggregate(
}
}
+ private def createEventTimeSlidingWindowDataSet(
+ inputDS: DataSet[Row],
+ isTimeWindow: Boolean,
+ isParserCaseSensitive: Boolean)
+ : DataSet[Row] = {
+
+ // create MapFunction for initializing the aggregations
+ // it aligns the rowtime for pre-tumbling in case of a time-window for
incremental aggregates
+ val mapFunction = createDataSetWindowPrepareMapFunction(
+ window,
+ namedAggregates,
+ grouping,
+ inputType,
+ isParserCaseSensitive)
+
+ val mappedDataSet = inputDS
+ .map(mapFunction)
+ .name(prepareOperatorName)
+
+ val mapReturnType = mappedDataSet.getType
+
+ val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+ val groupingKeys = grouping.indices.toArray
+
+ // do incremental aggregation if possible
+ val isIncremental = doAllSupportPartialAggregation(
+ namedAggregates.map(_.getKey),
+ inputType,
+ grouping.length)
+
+ val preparedDataSet = if (isTimeWindow) {
+ // time window
+
+ if (isIncremental) {
+ // incremental aggregates
+
+ val groupingKeysAndAlignedRowtime = groupingKeys :+
mapReturnType.getArity - 1
+
+ // create GroupReduceFunction
+ // for pre-tumbling and replicating/omitting the content for each
pane
+ val prepareReduceFunction =
createDataSetSlideWindowPrepareGroupReduceFunction(
+ window,
+ namedAggregates,
+ grouping,
+ inputType,
+ isParserCaseSensitive)
+
+ mappedDataSet.asInstanceOf[DataSet[Row]]
+ .groupBy(groupingKeysAndAlignedRowtime: _*)
+ .reduceGroup(prepareReduceFunction) // pre-tumbles and
replicates/omits
+ .name(prepareOperatorName)
+ } else {
+ // non-incremental aggregates
+
+ // create FlatMapFunction
+ // for replicating/omitting the content for each pane
+ val prepareFlatMapFunction =
createDataSetSlideWindowPrepareFlatMapFunction(
+ window,
+ namedAggregates,
+ grouping,
+ inputType,
+ isParserCaseSensitive)
+
+ mappedDataSet
+ .flatMap(prepareFlatMapFunction) // replicates/omits
+ }
+ } else {
+ // count window
--- End diff --
Can we break this PR into time and count windows? There are so many cases
to consider...
> Add sliding group-windows for batch tables
> ------------------------------------------
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Jark Wu
> Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This
> is probably the more straight-forward implementation and supports any
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can
> also find the largest tumbling window size from which the sliding windows can
> be assembled. This is basically the technique used to express sliding windows
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10
> minutes, 2 minutes) this would mean to first compute aggregates of
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of
> these into a sliding window (could be done in a MapPartition with sorted
> input). The implementation could be done as an optimizer rule to split the
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe
> it makes sense to implement the WINDOW clause first and reuse this for
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the
> largest non-overlapping windows (as in 2) and replicating these results and
> processing those as in the 1) approach. The benefits of this is that it a) is
> based on the implementation that supports non-combinable aggregates (which is
> required in any case) and b) that it does not require the implementation of
> the SQL WINDOW operator. Internally, this can be implemented again as an
> optimizer rule that translates the SlidingWindow into a pre-aggregating
> TublingWindow and a final SlidingWindow (with replication).
> see FLINK-4692 for more discussion
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)