[
https://issues.apache.org/jira/browse/FLINK-33446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Shengkai Fang updated FLINK-33446:
----------------------------------
Description:
Although this test doesn't throw an exception, the final plan produces 3
columns rather than 2 after optimization.
{code:java}
LogicalProject(inputs=[0..1], exprs=[[$4]])
+- LogicalFilter(condition=[IS NULL($5)])
+- LogicalJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[left])
:- LogicalProject(exprs=[[+(2, $0), +(3, $1)]])
: +- LogicalTableScan(table=[[default_catalog, default_database, l,
source: [TestTableSource(a, b, c)]]])
+- LogicalProject(inputs=[0..2], exprs=[[true]])
+- LogicalAggregate(group=[{0, 1, 2}])
+- LogicalProject(inputs=[0..2])
+- LogicalFilter(condition=[IS NULL($3)])
+- LogicalJoin(condition=[true], joinType=[left])
:- LogicalFilter(condition=[IS NOT NULL($0)])
: +- LogicalProject(exprs=[[+($0, 1)]])
: +- LogicalTableScan(table=[[default_catalog,
default_database, r, source: [TestTableSource(d, e, f)]]])
+- LogicalProject(inputs=[0..1], exprs=[[true]])
+- LogicalAggregate(group=[{0, 1}])
+- LogicalProject(exprs=[[$3, $0]])
+- LogicalFilter(condition=[AND(=($1, $0),
=(CAST($2):BIGINT, $3))])
+- LogicalProject(exprs=[[+($0, 4), +($0, 5),
+($0, 6), CAST(+($0, 6)):BIGINT]])
+-
LogicalTableScan(table=[[default_catalog, default_database, t, source:
[TestTableSource(i, j, k)]]])
{code}
After digging, I think it's the SubQueryRemoveRule doesn't generate the
Correlate but generates the Join node, which causes the failure of the
decorrelation. For a quick fix, I think we should throw an exception to notify
users it's not a supported feature in the Flink.
There might exist 2 ways to fix this issue:
1. Expand subquery when converting SQL to rel. After experimenting with
calcite, I found that the Sql2RelConverter generates the correct plan.
{code:java}
LogicalProject(inputs=[0..1])
+- LogicalFilter(condition=[IS NULL($2)])
+- LogicalCorrelate(correlation=[$cor7], joinType=[left],
requiredColumns=[{0, 1}])
:- LogicalProject(exprs=[[+(2, $0), +(3, $1)]])
: +- LogicalTableScan(table=[[default_catalog, default_database, l,
source: [TestTableSource(a, b, c)]]])
+- LogicalAggregate(group=[{}], agg#0=[MIN($0)])
+- LogicalProject(exprs=[[true]])
+- LogicalFilter(condition=[AND(=($0, $cor7.d2), IS NULL($1))])
+- LogicalCorrelate(correlation=[$cor4], joinType=[left],
requiredColumns=[{0}])
:- LogicalProject(inputs=[0])
: +- LogicalTableScan(table=[[default_catalog,
default_database, r, source: [TestTableSource(d1, e, f)]]])
+- LogicalAggregate(group=[{}], agg#0=[MIN($0)])
+- LogicalProject(exprs=[[true]])
+- LogicalFilter(condition=[AND(=($0, $cor4.d1), =($1,
$cor4.d1), =(CAST($2):BIGINT, $cor7.d3))])
+- LogicalProject(exprs=[[+($0, 4), +($0, 5), +($0,
6)]])
+- LogicalTableScan(table=[[default_catalog,
default_database, t, source: [TestTableSource(i, j, k)]]])
{code}
You can find the new plan uses a correlate node rather than a join node.
2. CALCITE-4686 might fix this problem by removing the nested correlation node.
was:
Although this test doesn't throw an exception, you can find the final produce 3
columns rather than 2 columns after optimization.
{code:java}
LogicalProject(inputs=[0..1], exprs=[[$4]])
+- LogicalFilter(condition=[IS NULL($5)])
+- LogicalJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[left])
:- LogicalProject(exprs=[[+(2, $0), +(3, $1)]])
: +- LogicalTableScan(table=[[default_catalog, default_database, l,
source: [TestTableSource(a, b, c)]]])
+- LogicalProject(inputs=[0..2], exprs=[[true]])
+- LogicalAggregate(group=[{0, 1, 2}])
+- LogicalProject(inputs=[0..2])
+- LogicalFilter(condition=[IS NULL($3)])
+- LogicalJoin(condition=[true], joinType=[left])
:- LogicalFilter(condition=[IS NOT NULL($0)])
: +- LogicalProject(exprs=[[+($0, 1)]])
: +- LogicalTableScan(table=[[default_catalog,
default_database, r, source: [TestTableSource(d, e, f)]]])
+- LogicalProject(inputs=[0..1], exprs=[[true]])
+- LogicalAggregate(group=[{0, 1}])
+- LogicalProject(exprs=[[$3, $0]])
+- LogicalFilter(condition=[AND(=($1, $0),
=(CAST($2):BIGINT, $3))])
+- LogicalProject(exprs=[[+($0, 4), +($0, 5),
+($0, 6), CAST(+($0, 6)):BIGINT]])
+-
LogicalTableScan(table=[[default_catalog, default_database, t, source:
[TestTableSource(i, j, k)]]])
{code}
After digging, I think it's the SubQueryRemoveRule doesn't generate the
Correlate but generates the Join node, which causes the failure of the
decorrelation. For a quick fix, I think we should throw an exception to notify
users it's not a supported feature in the Flink.
There might exist 2 ways to fix this issue:
1. Expand subquery when converting SQL to rel. After experimenting with
calcite, I find the Sql2RelConverter generates the correct plan.
{code:java}
LogicalProject(inputs=[0..1])
+- LogicalFilter(condition=[IS NULL($2)])
+- LogicalCorrelate(correlation=[$cor7], joinType=[left],
requiredColumns=[{0, 1}])
:- LogicalProject(exprs=[[+(2, $0), +(3, $1)]])
: +- LogicalTableScan(table=[[default_catalog, default_database, l,
source: [TestTableSource(a, b, c)]]])
+- LogicalAggregate(group=[{}], agg#0=[MIN($0)])
+- LogicalProject(exprs=[[true]])
+- LogicalFilter(condition=[AND(=($0, $cor7.d2), IS NULL($1))])
+- LogicalCorrelate(correlation=[$cor4], joinType=[left],
requiredColumns=[{0}])
:- LogicalProject(inputs=[0])
: +- LogicalTableScan(table=[[default_catalog,
default_database, r, source: [TestTableSource(d1, e, f)]]])
+- LogicalAggregate(group=[{}], agg#0=[MIN($0)])
+- LogicalProject(exprs=[[true]])
+- LogicalFilter(condition=[AND(=($0, $cor4.d1), =($1,
$cor4.d1), =(CAST($2):BIGINT, $cor7.d3))])
+- LogicalProject(exprs=[[+($0, 4), +($0, 5), +($0,
6)]])
+- LogicalTableScan(table=[[default_catalog,
default_database, t, source: [TestTableSource(i, j, k)]]])
{code}
You can find the new plan uses a correlate node rather than join node.
2. CALCITE-4686 might fix this problem by removing the nested correlation node.
> SubQueryAntiJoinTest#testMultiNotExistsWithCorrelatedOnWhere_NestedCorrelation
> doesn't produce the correct plan
> ---------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-33446
> URL: https://issues.apache.org/jira/browse/FLINK-33446
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.17.2, 1.19.0, 1.18.1
> Reporter: Shengkai Fang
> Priority: Major
>
> Although this test doesn't throw an exception, the final plan produces 3
> columns rather than 2 after optimization.
> {code:java}
> LogicalProject(inputs=[0..1], exprs=[[$4]])
> +- LogicalFilter(condition=[IS NULL($5)])
> +- LogicalJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[left])
> :- LogicalProject(exprs=[[+(2, $0), +(3, $1)]])
> : +- LogicalTableScan(table=[[default_catalog, default_database, l,
> source: [TestTableSource(a, b, c)]]])
> +- LogicalProject(inputs=[0..2], exprs=[[true]])
> +- LogicalAggregate(group=[{0, 1, 2}])
> +- LogicalProject(inputs=[0..2])
> +- LogicalFilter(condition=[IS NULL($3)])
> +- LogicalJoin(condition=[true], joinType=[left])
> :- LogicalFilter(condition=[IS NOT NULL($0)])
> : +- LogicalProject(exprs=[[+($0, 1)]])
> : +- LogicalTableScan(table=[[default_catalog,
> default_database, r, source: [TestTableSource(d, e, f)]]])
> +- LogicalProject(inputs=[0..1], exprs=[[true]])
> +- LogicalAggregate(group=[{0, 1}])
> +- LogicalProject(exprs=[[$3, $0]])
> +- LogicalFilter(condition=[AND(=($1, $0),
> =(CAST($2):BIGINT, $3))])
> +- LogicalProject(exprs=[[+($0, 4), +($0,
> 5), +($0, 6), CAST(+($0, 6)):BIGINT]])
> +-
> LogicalTableScan(table=[[default_catalog, default_database, t, source:
> [TestTableSource(i, j, k)]]])
> {code}
> After digging, I think it's the SubQueryRemoveRule doesn't generate the
> Correlate but generates the Join node, which causes the failure of the
> decorrelation. For a quick fix, I think we should throw an exception to
> notify users it's not a supported feature in the Flink.
> There might exist 2 ways to fix this issue:
> 1. Expand subquery when converting SQL to rel. After experimenting with
> calcite, I found that the Sql2RelConverter generates the correct plan.
> {code:java}
> LogicalProject(inputs=[0..1])
> +- LogicalFilter(condition=[IS NULL($2)])
> +- LogicalCorrelate(correlation=[$cor7], joinType=[left],
> requiredColumns=[{0, 1}])
> :- LogicalProject(exprs=[[+(2, $0), +(3, $1)]])
> : +- LogicalTableScan(table=[[default_catalog, default_database, l,
> source: [TestTableSource(a, b, c)]]])
> +- LogicalAggregate(group=[{}], agg#0=[MIN($0)])
> +- LogicalProject(exprs=[[true]])
> +- LogicalFilter(condition=[AND(=($0, $cor7.d2), IS NULL($1))])
> +- LogicalCorrelate(correlation=[$cor4], joinType=[left],
> requiredColumns=[{0}])
> :- LogicalProject(inputs=[0])
> : +- LogicalTableScan(table=[[default_catalog,
> default_database, r, source: [TestTableSource(d1, e, f)]]])
> +- LogicalAggregate(group=[{}], agg#0=[MIN($0)])
> +- LogicalProject(exprs=[[true]])
> +- LogicalFilter(condition=[AND(=($0, $cor4.d1),
> =($1, $cor4.d1), =(CAST($2):BIGINT, $cor7.d3))])
> +- LogicalProject(exprs=[[+($0, 4), +($0, 5),
> +($0, 6)]])
> +- LogicalTableScan(table=[[default_catalog,
> default_database, t, source: [TestTableSource(i, j, k)]]])
> {code}
> You can find the new plan uses a correlate node rather than a join node.
> 2. CALCITE-4686 might fix this problem by removing the nested correlation
> node.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)