[
https://issues.apache.org/jira/browse/FLINK-12192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-12192:
-----------------------------------
Labels: pull-request-available (was: )
> Add support for generating optimized logical plan for grouping sets and
> distinct aggregate
> ------------------------------------------------------------------------------------------
>
> Key: FLINK-12192
> URL: https://issues.apache.org/jira/browse/FLINK-12192
> Project: Flink
> Issue Type: New Feature
> Components: Table SQL / Planner
> Reporter: godfrey he
> Assignee: godfrey he
> Priority: Major
> Labels: pull-request-available
>
> This issue aims to supports generating optimized logical plan for grouping
> sets and distinct aggregate. (mentioned in FLINK-12076 and FLINK-12098)
> for batch, query with distinct aggregate will be rewritten into two
> non-distinct aggregates by extended
> [AggregateExpandDistinctAggregatesRule|https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/rel/rules/AggregateExpandDistinctAggregatesRule.java],
> the first aggregate computes the distinct key and non-distinct aggregate
> function, and the second aggregate computes the distinct aggregate function
> based on first aggregate result. The first aggregate has grouping sets if
> there are more than one distinct aggregate functions on different fields.
> for stream, query with distinct aggregate is handled by SplitAggregateRule in
> FLINK-12161.
> query with grouping sets (or cube, rollup) will be rewritten into a regular
> aggregate with expand, and the expand node will duplicates the input data for
> each simple group.
> e.g.
> {noformat}
> schema:
> MyTable: a: INT, b: BIGINT, c: VARCHAR(32), d: VARCHAR(32)
> Original records:
> +-----+-----+-----+-----+
> | a | b | c | d |
> +-----+-----+-----+-----+
> | 1 | 1 | c1 | d1 |
> +-----+-----+-----+-----+
> | 1 | 2 | c1 | d2 |
> +-----+-----+-----+-----+
> | 2 | 1 | c1 | d1 |
> +-----+-----+-----+-----+
> SELECT a, c, SUM(b) as b FROM MyTable GROUP BY GROUPING SETS (a, c)
> logical plan after expanded:
> LogicalCalc(expr#0..3=[{inputs}], proj#0..1=[{exprs}], b=[$t3])
> LogicalAggregate(group=[{0, 2, 3}], groups=[[]], b=[SUM($1)])
> LogicalExpand(projects=[{a=[$0], b=[$1], c=[null], $e=[1]},
> {a=[null], b=[$1], c=[$2], $e=[2]}])
> LogicalNativeTableScan(table=[[builtin, default, MyTable]])
> notes:
> '$e = 1' is equivalent to 'group by a'
> '$e = 2' is equivalent to 'group by c'
> expanded records:
> +-----+-----+-----+-----+
> | a | b | c | $e |
> +-----+-----+-----+-----+ ---+---
> | 1 | 1 | null| 1 | |
> +-----+-----+-----+-----+ records expanded by record1
> | null| 1 | c1 | 2 | |
> +-----+-----+-----+-----+ ---+---
> | 1 | 2 | null| 1 | |
> +-----+-----+-----+-----+ records expanded by record2
> | null| 2 | c1 | 2 | |
> +-----+-----+-----+-----+ ---+---
> | 2 | 1 | null| 1 | |
> +-----+-----+-----+-----+ records expanded by record3
> | null| 1 | c1 | 2 | |
> +-----+-----+-----+-----+ ---+---
> {noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)