[ 
https://issues.apache.org/jira/browse/FLINK-19959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he reassigned FLINK-19959:
----------------------------------

    Assignee: Caizhi Weng

> Multiple input creation algorithm will deduce an incorrect input order if the 
> inputs are related under PIPELINED shuffle mode
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-19959
>                 URL: https://issues.apache.org/jira/browse/FLINK-19959
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>            Reporter: Caizhi Weng
>            Assignee: Caizhi Weng
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.12.0
>
>
> Consider the following SQL
> {code:sql}
> WITH
>   T1 AS (SELECT x.a AS a, y.d AS b FROM y LEFT JOIN x ON y.d = x.a),
>   T2 AS (SELECT a, b FROM (SELECT a, b FROM T1) UNION ALL (SELECT x.a AS a, 
> x.b AS b FROM x))
> SELECT * FROM T2 LEFT JOIN t ON T2.a = t.a
> {code}
> The multiple input creation algorithm will currently deduce the following 
> plan under the PIPELINED shuffle mode:
> {code}
> MultipleInputNode(readOrder=[0,1,1,0], 
> members=[\nNestedLoopJoin(joinType=[LeftOuterJoin], where=[=(a, a0)], 
> select=[a, b, a0, b0, c], build=[right])\n:- Union(all=[true], union=[a, 
> b])\n:  :- Calc(select=[a, CAST(d) AS b])\n:  :  +- 
> NestedLoopJoin(joinType=[LeftOuterJoin], where=[=(d, a)], select=[d, a], 
> build=[right])\n:  :     :- [#3] Calc(select=[d])\n:  :     +- [#4] 
> Exchange(distribution=[broadcast])\n:  +- [#2] Calc(select=[a, b])\n+- [#1] 
> Exchange(distribution=[broadcast])\n])
> :- Exchange(distribution=[broadcast])
> :  +- BoundedStreamScan(table=[[default_catalog, default_database, t]], 
> fields=[a, b, c])
> :- Calc(select=[a, b])
> :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, 
> source: [TestTableSource(a, b, c, nx)]]], fields=[a, b, c, nx], reuse_id=[1])
> :- Calc(select=[d])
> :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, y, 
> source: [TestTableSource(d, e, f, ny)]]], fields=[d, e, f, ny])
> +- Exchange(distribution=[broadcast])
>    +- Calc(select=[a])
>       +- Reused(reference_id=[1])
> {code}
> It's obvious that the 2nd and the 4th input for the multiple input node 
> should have the same input priority, otherwise a deadlock will occur.
> This is because the current algorithm fails to consider the case when the 
> inputs are related out of the multiple input node.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to