Shengkai Fang created FLINK-33446:
-------------------------------------
Summary:
SubQueryAntiJoinTest#testMultiNotExistsWithCorrelatedOnWhere_NestedCorrelation
doesn't produce the correct plan
Key: FLINK-33446
URL: https://issues.apache.org/jira/browse/FLINK-33446
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.17.2, 1.19.0, 1.18.1
Reporter: Shengkai Fang
Although this test doesn't throw an exception, you can find the final produce 3
columns rather than 2 columns after optimization.
{code:java}
LogicalProject(inputs=[0..1], exprs=[[$4]])
+- LogicalFilter(condition=[IS NULL($5)])
+- LogicalJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[left])
:- LogicalProject(exprs=[[+(2, $0), +(3, $1)]])
: +- LogicalTableScan(table=[[default_catalog, default_database, l,
source: [TestTableSource(a, b, c)]]])
+- LogicalProject(inputs=[0..2], exprs=[[true]])
+- LogicalAggregate(group=[{0, 1, 2}])
+- LogicalProject(inputs=[0..2])
+- LogicalFilter(condition=[IS NULL($3)])
+- LogicalJoin(condition=[true], joinType=[left])
:- LogicalFilter(condition=[IS NOT NULL($0)])
: +- LogicalProject(exprs=[[+($0, 1)]])
: +- LogicalTableScan(table=[[default_catalog,
default_database, r, source: [TestTableSource(d, e, f)]]])
+- LogicalProject(inputs=[0..1], exprs=[[true]])
+- LogicalAggregate(group=[{0, 1}])
+- LogicalProject(exprs=[[$3, $0]])
+- LogicalFilter(condition=[AND(=($1, $0),
=(CAST($2):BIGINT, $3))])
+- LogicalProject(exprs=[[+($0, 4), +($0, 5),
+($0, 6), CAST(+($0, 6)):BIGINT]])
+-
LogicalTableScan(table=[[default_catalog, default_database, t, source:
[TestTableSource(i, j, k)]]])
{code}
After digging, I think it's the SubQueryRemoveRule doesn't generate the
Correlate but generates the Join node, which causes the failure of the
decorrelation. For a quick fix, I think we should throw an exception to notify
users it's not a supported feature in the Flink.
There might exist 2 ways to fix this issue:
1. Expand subquery when converting SQL to rel. After experimenting with
calcite, I find the Sql2RelConverter generates the correct plan.
{code:java}
LogicalProject(inputs=[0..1])
+- LogicalFilter(condition=[IS NULL($2)])
+- LogicalCorrelate(correlation=[$cor7], joinType=[left],
requiredColumns=[{0, 1}])
:- LogicalProject(exprs=[[+(2, $0), +(3, $1)]])
: +- LogicalTableScan(table=[[default_catalog, default_database, l,
source: [TestTableSource(a, b, c)]]])
+- LogicalAggregate(group=[{}], agg#0=[MIN($0)])
+- LogicalProject(exprs=[[true]])
+- LogicalFilter(condition=[AND(=($0, $cor7.d2), IS NULL($1))])
+- LogicalCorrelate(correlation=[$cor4], joinType=[left],
requiredColumns=[{0}])
:- LogicalProject(inputs=[0])
: +- LogicalTableScan(table=[[default_catalog,
default_database, r, source: [TestTableSource(d1, e, f)]]])
+- LogicalAggregate(group=[{}], agg#0=[MIN($0)])
+- LogicalProject(exprs=[[true]])
+- LogicalFilter(condition=[AND(=($0, $cor4.d1), =($1,
$cor4.d1), =(CAST($2):BIGINT, $cor7.d3))])
+- LogicalProject(exprs=[[+($0, 4), +($0, 5), +($0,
6)]])
+- LogicalTableScan(table=[[default_catalog,
default_database, t, source: [TestTableSource(i, j, k)]]])
{code}
You can find the new plan uses a correlate node rather than join node.
2. CALCITE-4686 might fix this problem by removing the nested correlation node.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)