Caizhi Weng created FLINK-19939:
-----------------------------------
Summary: Remove redundant union from multiple input node
Key: FLINK-19939
URL: https://issues.apache.org/jira/browse/FLINK-19939
Project: Flink
Issue Type: Improvement
Components: Table SQL / Planner
Reporter: Caizhi Weng
Fix For: 1.12.0
Consider the following SQL and the execution plan.
{code:sql}
WITH
T1 AS (SELECT COUNT(*) AS cnt FROM x GROUP BY a),
T2 AS (SELECT COUNT(*) AS cnt FROM y GROUP BY d),
T3 AS (SELECT b AS cnt FROM x INNER JOIN y ON x.b = y.e)
SELECT cnt FROM
(SELECT cnt FROM T1)
UNION ALL
(SELECT cnt FROM T2)
UNION ALL
(SELECT cnt FROM T3)
{code}
{code}
MultipleInputNode(readOrder=[1,0,0,0], members=[\nUnion(all=[true],
union=[cnt])\n:- Union(all=[true], union=[cnt])\n: :- Calc(select=[CAST(cnt)
AS cnt])\n: : +- HashAggregate(isMerge=[true], groupBy=[a], select=[a,
Final_COUNT(count1$0) AS cnt])\n: : +- [#3]
Exchange(distribution=[hash[a]])\n: +- Calc(select=[CAST(cnt) AS cnt])\n:
+- HashAggregate(isMerge=[true], groupBy=[d], select=[d, Final_COUNT(count1$0)
AS cnt])\n: +- [#4] Exchange(distribution=[hash[d]])\n+- Calc(select=[b
AS cnt])\n +- HashJoin(joinType=[InnerJoin], where=[=(b, e)], select=[b, e],
build=[right])\n :- [#1] Exchange(distribution=[hash[b]])\n +- [#2]
Exchange(distribution=[hash[e]])\n])
:- Exchange(distribution=[hash[b]])
: +- Calc(select=[b])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, x,
source: [TestTableSource(a, b, c, nx)]]], fields=[a, b, c, nx])
:- Exchange(distribution=[hash[e]])
: +- Calc(select=[e])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, y,
source: [TestTableSource(d, e, f, ny)]]], fields=[d, e, f, ny])
:- Exchange(distribution=[hash[a]])
: +- LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(*) AS count1$0])
: +- Calc(select=[a])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, x,
source: [TestTableSource(a, b, c, nx)]]], fields=[a, b, c, nx])
+- Exchange(distribution=[hash[d]])
+- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(*) AS count1$0])
+- Calc(select=[d])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, y,
source: [TestTableSource(d, e, f, ny)]]], fields=[d, e, f, ny])
{code}
The two unions here in multiple input here is actually redundant, as the amount
of data shuffled will not change even if they're moved out of the multiple
input node. We should remove such redundant union from multiple input nodes.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)