[
https://issues.apache.org/jira/browse/FLINK-19939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-19939:
-----------------------------------
Labels: pull-request-available (was: )
> Remove redundant union from multiple input node
> -----------------------------------------------
>
> Key: FLINK-19939
> URL: https://issues.apache.org/jira/browse/FLINK-19939
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Planner
> Reporter: Caizhi Weng
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.12.0
>
>
> Consider the following SQL and the execution plan.
> {code:sql}
> WITH
> T1 AS (SELECT COUNT(*) AS cnt FROM x GROUP BY a),
> T2 AS (SELECT COUNT(*) AS cnt FROM y GROUP BY d),
> T3 AS (SELECT b AS cnt FROM x INNER JOIN y ON x.b = y.e)
> SELECT cnt FROM
> (SELECT cnt FROM T1)
> UNION ALL
> (SELECT cnt FROM T2)
> UNION ALL
> (SELECT cnt FROM T3)
> {code}
> {code}
> MultipleInputNode(readOrder=[1,0,0,0], members=[\nUnion(all=[true],
> union=[cnt])\n:- Union(all=[true], union=[cnt])\n: :- Calc(select=[CAST(cnt)
> AS cnt])\n: : +- HashAggregate(isMerge=[true], groupBy=[a], select=[a,
> Final_COUNT(count1$0) AS cnt])\n: : +- [#3]
> Exchange(distribution=[hash[a]])\n: +- Calc(select=[CAST(cnt) AS cnt])\n:
> +- HashAggregate(isMerge=[true], groupBy=[d], select=[d,
> Final_COUNT(count1$0) AS cnt])\n: +- [#4]
> Exchange(distribution=[hash[d]])\n+- Calc(select=[b AS cnt])\n +-
> HashJoin(joinType=[InnerJoin], where=[=(b, e)], select=[b, e],
> build=[right])\n :- [#1] Exchange(distribution=[hash[b]])\n +- [#2]
> Exchange(distribution=[hash[e]])\n])
> :- Exchange(distribution=[hash[b]])
> : +- Calc(select=[b])
> : +- LegacyTableSourceScan(table=[[default_catalog, default_database, x,
> source: [TestTableSource(a, b, c, nx)]]], fields=[a, b, c, nx])
> :- Exchange(distribution=[hash[e]])
> : +- Calc(select=[e])
> : +- LegacyTableSourceScan(table=[[default_catalog, default_database, y,
> source: [TestTableSource(d, e, f, ny)]]], fields=[d, e, f, ny])
> :- Exchange(distribution=[hash[a]])
> : +- LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(*) AS
> count1$0])
> : +- Calc(select=[a])
> : +- LegacyTableSourceScan(table=[[default_catalog, default_database,
> x, source: [TestTableSource(a, b, c, nx)]]], fields=[a, b, c, nx])
> +- Exchange(distribution=[hash[d]])
> +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(*) AS
> count1$0])
> +- Calc(select=[d])
> +- LegacyTableSourceScan(table=[[default_catalog, default_database,
> y, source: [TestTableSource(d, e, f, ny)]]], fields=[d, e, f, ny])
> {code}
> The two unions here in multiple input here is actually redundant, as the
> amount of data shuffled will not change even if they're moved out of the
> multiple input node. We should remove such redundant union from multiple
> input nodes.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)