[ 
https://issues.apache.org/jira/browse/FLINK-12192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he updated FLINK-12192:
-------------------------------
    Description: 
This issue aims to supports generating optimized logical plan for grouping sets 
and distinct aggregate. (mentioned in FLINK-12076 and FLINK-12098)

for batch, query with distinct aggregate will be rewritten into two 
non-distinct aggregates by extended 
[AggregateExpandDistinctAggregatesRule|https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/rel/rules/AggregateExpandDistinctAggregatesRule.java],
 the first aggregate computes the distinct key and non-distinct aggregate 
function, and the second aggregate computes the distinct aggregate function 
based on first aggregate result.  The first aggregate has grouping sets if 
there are more than one distinct aggregate functions on different fields.

for stream, query with distinct aggregate is handled by SplitAggregateRule in 
FLINK-12161.

query with grouping sets (or cube, rollup) will be rewritten into a regular 
aggregate with expand, and the expand node will duplicates the input data for 
each simple group.
 e.g.
{noformat}
schema:
MyTable: a: INT, b: BIGINT, c: VARCHAR(32), d: VARCHAR(32)

 Original records:
+-----+-----+-----+-----+
|  a  |  b  |  c  |  d  |
+-----+-----+-----+-----+
|  1  |  1  |  c1 |  d1 |
+-----+-----+-----+-----+
|  1  |  2  |  c1 |  d2 |
+-----+-----+-----+-----+
|  2  |  1  |  c1 |  d1 |
+-----+-----+-----+-----+

SELECT a, c, SUM(b) as b FROM MyTable GROUP BY GROUPING SETS (a, c)

logical plan after expanded:
LogicalCalc(expr#0..3=[{inputs}], proj#0..1=[{exprs}], b=[$t3])
    LogicalAggregate(group=[{0, 2, 3}], groups=[[]], b=[SUM($1)])
        LogicalExpand(projects=[{a=[$0], b=[$1], c=[null], $e=[1]}, {a=[null], 
b=[$1], c=[$2], $e=[2]}])
            LogicalNativeTableScan(table=[[builtin, default, MyTable]])

notes:
'$e = 1' is equivalent to 'group by a'
'$e = 2' is equivalent to 'group by c'

expanded records:
+-----+-----+-----+-----+
|  a  |  b  |  c  | $e  |
+-----+-----+-----+-----+        ---+---
|  1  |  1  | null|  1  |           |
+-----+-----+-----+-----+  records expanded by record1
| null|  1  |  c1 |  2  |           |
+-----+-----+-----+-----+        ---+---
|  1  |  2  | null|  1  |           |
+-----+-----+-----+-----+  records expanded by record2
| null|  2  |  c1 |  2  |           |
+-----+-----+-----+-----+        ---+---
|  2  |  1  | null|  1  |           |
+-----+-----+-----+-----+  records expanded by record3
| null|  1  |  c1 |  2  |           |
+-----+-----+-----+-----+        ---+---
{noformat}

  was:
This issue aims to supports generating optimized logical plan for grouping sets 
and distinct aggregate. (mentioned in 
[FLINK-12076|https://issues.apache.org/jira/browse/FLINK-12076] and 
[FLINK-12098|https://issues.apache.org/jira/browse/FLINK-12098])

for batch, query with distinct aggregate will be rewritten into two 
non-distinct aggregates by extended 
[AggregateExpandDistinctAggregatesRule|https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/rel/rules/AggregateExpandDistinctAggregatesRule.java],
 the first aggregate computes the distinct result and non-distinct aggregate 
function result, and the second aggregate computes the distinct aggregate 
function result  based on first aggregate result. The first aggregate has 
grouping sets if there are more than one distinct aggregate on different fields.

for stream, query with distinct aggregate is handled by SplitAggregateRule in 
[FLINK-12161|https://issues.apache.org/jira/browse/FLINK-12161].

query with grouping sets (or cube, rollup) will be rewritten into a regular 
aggregate with expand.
The expand node will duplicates the input data for each simple group.
e.g.
{noformat}
schema:
MyTable: a: INT, b: BIGINT, c: VARCHAR(32), d: VARCHAR(32)

 Original records:
+-----+-----+-----+-----+
|  a  |  b  |  c  |  d  |
+-----+-----+-----+-----+
|  1  |  1  |  c1 |  d1 |
+-----+-----+-----+-----+
|  1  |  2  |  c1 |  d2 |
+-----+-----+-----+-----+
|  2  |  1  |  c1 |  d1 |
+-----+-----+-----+-----+

SELECT a, c, SUM(b) as b FROM MyTable GROUP BY GROUPING SETS (a, c)

logical plan after expanded:
LogicalCalc(expr#0..3=[{inputs}], proj#0..1=[{exprs}], b=[$t3])
    LogicalAggregate(group=[{0, 2, 3}], groups=[[]], b=[SUM($1)])
        LogicalExpand(projects=[{a=[$0], b=[$1], c=[null], $e=[1]}, {a=[null], 
b=[$1], c=[$2], $e=[2]}])
            LogicalNativeTableScan(table=[[builtin, default, MyTable]])

notes:
'$e = 1' is equivalent to 'group by a'
'$e = 2' is equivalent to 'group by c'

expanded records:
+-----+-----+-----+-----+
|  a  |  b  |  c  | $e  |
+-----+-----+-----+-----+        ---+---
|  1  |  1  | null|  1  |           |
+-----+-----+-----+-----+  records expanded by record1
| null|  1  |  c1 |  2  |           |
+-----+-----+-----+-----+        ---+---
|  1  |  2  | null|  1  |           |
+-----+-----+-----+-----+  records expanded by record2
| null|  2  |  c1 |  2  |           |
+-----+-----+-----+-----+        ---+---
|  2  |  1  | null|  1  |           |
+-----+-----+-----+-----+  records expanded by record3
| null|  1  |  c1 |  2  |           |
+-----+-----+-----+-----+        ---+---
{noformat}






> Add support for generating optimized logical plan for grouping sets and 
> distinct aggregate
> ------------------------------------------------------------------------------------------
>
>                 Key: FLINK-12192
>                 URL: https://issues.apache.org/jira/browse/FLINK-12192
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table SQL / Planner
>            Reporter: godfrey he
>            Assignee: godfrey he
>            Priority: Major
>
> This issue aims to supports generating optimized logical plan for grouping 
> sets and distinct aggregate. (mentioned in FLINK-12076 and FLINK-12098)
> for batch, query with distinct aggregate will be rewritten into two 
> non-distinct aggregates by extended 
> [AggregateExpandDistinctAggregatesRule|https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/rel/rules/AggregateExpandDistinctAggregatesRule.java],
>  the first aggregate computes the distinct key and non-distinct aggregate 
> function, and the second aggregate computes the distinct aggregate function 
> based on first aggregate result.  The first aggregate has grouping sets if 
> there are more than one distinct aggregate functions on different fields.
> for stream, query with distinct aggregate is handled by SplitAggregateRule in 
> FLINK-12161.
> query with grouping sets (or cube, rollup) will be rewritten into a regular 
> aggregate with expand, and the expand node will duplicates the input data for 
> each simple group.
>  e.g.
> {noformat}
> schema:
> MyTable: a: INT, b: BIGINT, c: VARCHAR(32), d: VARCHAR(32)
>  Original records:
> +-----+-----+-----+-----+
> |  a  |  b  |  c  |  d  |
> +-----+-----+-----+-----+
> |  1  |  1  |  c1 |  d1 |
> +-----+-----+-----+-----+
> |  1  |  2  |  c1 |  d2 |
> +-----+-----+-----+-----+
> |  2  |  1  |  c1 |  d1 |
> +-----+-----+-----+-----+
> SELECT a, c, SUM(b) as b FROM MyTable GROUP BY GROUPING SETS (a, c)
> logical plan after expanded:
> LogicalCalc(expr#0..3=[{inputs}], proj#0..1=[{exprs}], b=[$t3])
>     LogicalAggregate(group=[{0, 2, 3}], groups=[[]], b=[SUM($1)])
>         LogicalExpand(projects=[{a=[$0], b=[$1], c=[null], $e=[1]}, 
> {a=[null], b=[$1], c=[$2], $e=[2]}])
>             LogicalNativeTableScan(table=[[builtin, default, MyTable]])
> notes:
> '$e = 1' is equivalent to 'group by a'
> '$e = 2' is equivalent to 'group by c'
> expanded records:
> +-----+-----+-----+-----+
> |  a  |  b  |  c  | $e  |
> +-----+-----+-----+-----+        ---+---
> |  1  |  1  | null|  1  |           |
> +-----+-----+-----+-----+  records expanded by record1
> | null|  1  |  c1 |  2  |           |
> +-----+-----+-----+-----+        ---+---
> |  1  |  2  | null|  1  |           |
> +-----+-----+-----+-----+  records expanded by record2
> | null|  2  |  c1 |  2  |           |
> +-----+-----+-----+-----+        ---+---
> |  2  |  1  | null|  1  |           |
> +-----+-----+-----+-----+  records expanded by record3
> | null|  1  |  c1 |  2  |           |
> +-----+-----+-----+-----+        ---+---
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to