Caizhi Weng created FLINK-19959:
-----------------------------------
Summary: Multiple input creation algorithm will deduce an
incorrect input order if the inputs are related under PIPELINED shuffle mode
Key: FLINK-19959
URL: https://issues.apache.org/jira/browse/FLINK-19959
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Reporter: Caizhi Weng
Fix For: 1.12.0
Consider the following SQL
{code:sql}
WITH
T1 AS (SELECT x.a AS a, y.d AS b FROM y LEFT JOIN x ON y.d = x.a),
T2 AS (SELECT a, b FROM (SELECT a, b FROM T1) UNION ALL (SELECT x.a AS a, x.b
AS b FROM x))
SELECT * FROM T2 LEFT JOIN t ON T2.a = t.a
{code}
The multiple input creation algorithm will currently deduce the following plan
under the PIPELINED shuffle mode:
{code}
MultipleInputNode(readOrder=[0,1,1,0],
members=[\nNestedLoopJoin(joinType=[LeftOuterJoin], where=[=(a, a0)],
select=[a, b, a0, b0, c], build=[right])\n:- Union(all=[true], union=[a, b])\n:
:- Calc(select=[a, CAST(d) AS b])\n: : +-
NestedLoopJoin(joinType=[LeftOuterJoin], where=[=(d, a)], select=[d, a],
build=[right])\n: : :- [#3] Calc(select=[d])\n: : +- [#4]
Exchange(distribution=[broadcast])\n: +- [#2] Calc(select=[a, b])\n+- [#1]
Exchange(distribution=[broadcast])\n])
:- Exchange(distribution=[broadcast])
: +- BoundedStreamScan(table=[[default_catalog, default_database, t]],
fields=[a, b, c])
:- Calc(select=[a, b])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, x,
source: [TestTableSource(a, b, c, nx)]]], fields=[a, b, c, nx], reuse_id=[1])
:- Calc(select=[d])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, y,
source: [TestTableSource(d, e, f, ny)]]], fields=[d, e, f, ny])
+- Exchange(distribution=[broadcast])
+- Calc(select=[a])
+- Reused(reference_id=[1])
{code}
It's obvious that the 2nd and the 4th input for the multiple input node should
have the same input priority, otherwise a deadlock will occur.
This is because the current algorithm fails to consider the case when the
inputs are related out of the multiple input node.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)