[
https://issues.apache.org/jira/browse/FLINK-6473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-6473:
----------------------------------
Labels: auto-deprioritized-major auto-deprioritized-minor (was:
auto-deprioritized-major stale-minor)
Priority: Not a Priority (was: Minor)
This issue was labeled "stale-minor" 7 days ago and has not received any
updates so it is being deprioritized. If this ticket is actually Minor, please
raise the priority and ask a committer to assign you the issue or revive the
public discussion.
> Add OVER window support for batch tables
> ----------------------------------------
>
> Key: FLINK-6473
> URL: https://issues.apache.org/jira/browse/FLINK-6473
> Project: Flink
> Issue Type: New Feature
> Components: Table SQL / API
> Reporter: Fabian Hueske
> Priority: Not a Priority
> Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> Add support for OVER windows for batch tables.
> Since OVER windows are supported for streaming tables, this issue is not
> about the API (which is available) but about adding the execution strategies
> and translation for OVER windows on batch tables.
> The feature could be implemented using the following plans
> *UNBOUNDED OVER*
> {code}
> DataSet[Row] input = ...
> DataSet[Row] result = input
> .groupBy(partitionKeys)
> .sortGroup(orderByKeys)
> .reduceGroup(computeAggregates)
> {code}
> This implementation is quite straightforward because we don't need to retract
> rows.
> *BOUNDED OVER*
> A bit more challenging are BOUNDED OVER windows, because we need to retract
> values from aggregates and we don't want to store rows temporarily on the
> heap.
> {code}
> DataSet[Row] input = ...
> DataSet[Row] sorted = input
> .partitionByHash(partitionKey)
> .sortPartition(partitionKeys, orderByKeys)
> DataSet[Row] result = sorted.coGroup(sorted)
> .where(partitionKey).equalTo(partitionKey)
> .with(computeAggregates)
> {code}
> With this, the data set should be partitioned and sorted once. The sorted
> {{DataSet}} would be consumed twice (the optimizer should inject a temp
> barrier on one of the inputs to avoid a consumption deadlock). The
> {{CoGroupFunction}} would accumulate new rows into the aggregates from one
> input and retract them from the other. Since both input streams are properly
> sorted, this can happen in a zigzag fashion. We need verify that the
> generated plan is was we want it to be.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)