[
https://issues.apache.org/jira/browse/FLINK-19959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
godfrey he reassigned FLINK-19959:
----------------------------------
Assignee: Caizhi Weng
> Multiple input creation algorithm will deduce an incorrect input order if the
> inputs are related under PIPELINED shuffle mode
> -----------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-19959
> URL: https://issues.apache.org/jira/browse/FLINK-19959
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Reporter: Caizhi Weng
> Assignee: Caizhi Weng
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.12.0
>
>
> Consider the following SQL
> {code:sql}
> WITH
> T1 AS (SELECT x.a AS a, y.d AS b FROM y LEFT JOIN x ON y.d = x.a),
> T2 AS (SELECT a, b FROM (SELECT a, b FROM T1) UNION ALL (SELECT x.a AS a,
> x.b AS b FROM x))
> SELECT * FROM T2 LEFT JOIN t ON T2.a = t.a
> {code}
> The multiple input creation algorithm will currently deduce the following
> plan under the PIPELINED shuffle mode:
> {code}
> MultipleInputNode(readOrder=[0,1,1,0],
> members=[\nNestedLoopJoin(joinType=[LeftOuterJoin], where=[=(a, a0)],
> select=[a, b, a0, b0, c], build=[right])\n:- Union(all=[true], union=[a,
> b])\n: :- Calc(select=[a, CAST(d) AS b])\n: : +-
> NestedLoopJoin(joinType=[LeftOuterJoin], where=[=(d, a)], select=[d, a],
> build=[right])\n: : :- [#3] Calc(select=[d])\n: : +- [#4]
> Exchange(distribution=[broadcast])\n: +- [#2] Calc(select=[a, b])\n+- [#1]
> Exchange(distribution=[broadcast])\n])
> :- Exchange(distribution=[broadcast])
> : +- BoundedStreamScan(table=[[default_catalog, default_database, t]],
> fields=[a, b, c])
> :- Calc(select=[a, b])
> : +- LegacyTableSourceScan(table=[[default_catalog, default_database, x,
> source: [TestTableSource(a, b, c, nx)]]], fields=[a, b, c, nx], reuse_id=[1])
> :- Calc(select=[d])
> : +- LegacyTableSourceScan(table=[[default_catalog, default_database, y,
> source: [TestTableSource(d, e, f, ny)]]], fields=[d, e, f, ny])
> +- Exchange(distribution=[broadcast])
> +- Calc(select=[a])
> +- Reused(reference_id=[1])
> {code}
> It's obvious that the 2nd and the 4th input for the multiple input node
> should have the same input priority, otherwise a deadlock will occur.
> This is because the current algorithm fails to consider the case when the
> inputs are related out of the multiple input node.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)