[ 
https://issues.apache.org/jira/browse/FLINK-20345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangqingru updated FLINK-20345:
--------------------------------
    Description: 
As mentioned in [Flink 
Document|https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html],
 we could split distinct aggregation to solve skew data on distinct keys which 
is a very good optimization. However, an unnecessary `Expand` node will be 
generated under some special cases, like the following sql. 
{code:java}
SELECT COUNT(c) AS pv, COUNT(DISTINCT c) AS uv FROM T GROUP BY a
{code}
Which plan is like the following text, the Expand and filter condition in 
aggregate functions could be removed.
{code:java}
Sink(name=[DataStreamTableSink], fields=[pv, uv])
+- Calc(select=[pv, uv])
   +- GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, 
$SUM0_RETRACT($f2_0) AS $f1, $SUM0_RETRACT($f3) AS $f2])
      +- Exchange(distribution=[hash[a]])
         +- GroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], 
select=[a, $f2, COUNT(c) FILTER $g_1 AS $f2_0, COUNT(DISTINCT c) FILTER $g_0 AS 
$f3])
            +- Exchange(distribution=[hash[a, $f2]])
               +- Calc(select=[a, c, $f2, =($e, 1) AS $g_1, =($e, 0) AS $g_0])
                  +- Expand(projects=[{a=[$0], c=[$1], $f2=[$2], $e=[0]}, 
{a=[$0], c=[$1], $f2=[null], $e=[1]}])
                     +- Calc(select=[a, c, MOD(HASH_CODE(c), 1024) AS $f2])
                        +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
                           +- DataStreamScan(table=[[default_catalog, 
default_database, T]], fields=[a, b, c]){code}
An `Expand` node is only necessary when multiple aggregate function with 
different distinct keys appears in an Aggregate.

  was:
As mentioned in [Flink 
Document|https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html],
 we could split distinct aggregation to solve skew data on distinct keys which 
is a very good optimization. However, an unnecessary `Expand` node will be 
generated under some special cases, like the following sql. 
{code:java}
SELECT COUNT(c) AS pv, COUNT(DISTINCT c) AS uv FROM T GROUP BY a
{code}
Which plan is like the following text, the Expand and filter condition in 
aggregate functions could be removed.
{code:java}
Sink(name=[DataStreamTableSink], fields=[pv, uv])
+- Calc(select=[pv, uv])
   +- GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, 
$SUM0_RETRACT($f2_0) AS $f1, $SUM0_RETRACT($f3) AS $f2])
      +- Exchange(distribution=[hash[a]])
         +- GroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], 
select=[a, $f2, COUNT(c) FILTER $g_1 AS $f2_0, COUNT(DISTINCT c) FILTER $g_0 AS 
$f3])
            +- Exchange(distribution=[hash[a, $f2]])
               +- Calc(select=[a, c, $f2, =($e, 1) AS $g_1, =($e, 0) AS $g_0])
                  +- Expand(projects=[{a=[$0], c=[$1], $f2=[$2], $e=[0]}, 
{a=[$0], c=[$1], $f2=[null], $e=[1]}])
                     +- Calc(select=[a, c, MOD(HASH_CODE(c), 1024) AS $f2])
                        +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
                           +- DataStreamScan(table=[[default_catalog, 
default_database, T]], fields=[a, b, c]){code}
An `Expand` node only is necessary when multiple aggregate function with 
different distinct keys appears in one Aggregate.


> Adds an Expand node only when there are more then one distinct aggregate 
> function in an Aggregate when executes SplitAggregateRule
> ----------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-20345
>                 URL: https://issues.apache.org/jira/browse/FLINK-20345
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>    Affects Versions: 1.11.2
>            Reporter: zhangqingru
>            Priority: Major
>             Fix For: 1.11.3
>
>
> As mentioned in [Flink 
> Document|https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html],
>  we could split distinct aggregation to solve skew data on distinct keys 
> which is a very good optimization. However, an unnecessary `Expand` node will 
> be generated under some special cases, like the following sql. 
> {code:java}
> SELECT COUNT(c) AS pv, COUNT(DISTINCT c) AS uv FROM T GROUP BY a
> {code}
> Which plan is like the following text, the Expand and filter condition in 
> aggregate functions could be removed.
> {code:java}
> Sink(name=[DataStreamTableSink], fields=[pv, uv])
> +- Calc(select=[pv, uv])
>    +- GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, 
> $SUM0_RETRACT($f2_0) AS $f1, $SUM0_RETRACT($f3) AS $f2])
>       +- Exchange(distribution=[hash[a]])
>          +- GroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], 
> select=[a, $f2, COUNT(c) FILTER $g_1 AS $f2_0, COUNT(DISTINCT c) FILTER $g_0 
> AS $f3])
>             +- Exchange(distribution=[hash[a, $f2]])
>                +- Calc(select=[a, c, $f2, =($e, 1) AS $g_1, =($e, 0) AS $g_0])
>                   +- Expand(projects=[{a=[$0], c=[$1], $f2=[$2], $e=[0]}, 
> {a=[$0], c=[$1], $f2=[null], $e=[1]}])
>                      +- Calc(select=[a, c, MOD(HASH_CODE(c), 1024) AS $f2])
>                         +- MiniBatchAssigner(interval=[1000ms], 
> mode=[ProcTime])
>                            +- DataStreamScan(table=[[default_catalog, 
> default_database, T]], fields=[a, b, c]){code}
> An `Expand` node is only necessary when multiple aggregate function with 
> different distinct keys appears in an Aggregate.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to