[ 
https://issues.apache.org/jira/browse/FLINK-33446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shengkai Fang updated FLINK-33446:
----------------------------------
    Description: 
Although this test doesn't throw an exception, the final plan produces 3 
columns rather than 2 after optimization.

{code:java}
LogicalProject(inputs=[0..1], exprs=[[$4]])
+- LogicalFilter(condition=[IS NULL($5)])
   +- LogicalJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[left])
      :- LogicalProject(exprs=[[+(2, $0), +(3, $1)]])
      :  +- LogicalTableScan(table=[[default_catalog, default_database, l, 
source: [TestTableSource(a, b, c)]]])
      +- LogicalProject(inputs=[0..2], exprs=[[true]])
         +- LogicalAggregate(group=[{0, 1, 2}])
            +- LogicalProject(inputs=[0..2])
               +- LogicalFilter(condition=[IS NULL($3)])
                  +- LogicalJoin(condition=[true], joinType=[left])
                     :- LogicalFilter(condition=[IS NOT NULL($0)])
                     :  +- LogicalProject(exprs=[[+($0, 1)]])
                     :     +- LogicalTableScan(table=[[default_catalog, 
default_database, r, source: [TestTableSource(d, e, f)]]])
                     +- LogicalProject(inputs=[0..1], exprs=[[true]])
                        +- LogicalAggregate(group=[{0, 1}])
                           +- LogicalProject(exprs=[[$3, $0]])
                              +- LogicalFilter(condition=[AND(=($1, $0), 
=(CAST($2):BIGINT, $3))])
                                 +- LogicalProject(exprs=[[+($0, 4), +($0, 5), 
+($0, 6), CAST(+($0, 6)):BIGINT]])
                                    +- 
LogicalTableScan(table=[[default_catalog, default_database, t, source: 
[TestTableSource(i, j, k)]]])

{code}

After digging, I think it's the SubQueryRemoveRule doesn't generate the 
Correlate but generates the Join node, which causes the failure of the 
decorrelation. For a quick fix, I think we should throw an exception to notify 
users it's not a supported feature in the Flink. 

There might exist 2 ways to fix this issue:
1. Expand subquery when converting SQL to rel.  After experimenting with 
calcite, I found that the Sql2RelConverter generates the correct plan.

{code:java}
LogicalProject(inputs=[0..1])
+- LogicalFilter(condition=[IS NULL($2)])
   +- LogicalCorrelate(correlation=[$cor7], joinType=[left], 
requiredColumns=[{0, 1}])
      :- LogicalProject(exprs=[[+(2, $0), +(3, $1)]])
      :  +- LogicalTableScan(table=[[default_catalog, default_database, l, 
source: [TestTableSource(a, b, c)]]])
      +- LogicalAggregate(group=[{}], agg#0=[MIN($0)])
         +- LogicalProject(exprs=[[true]])
            +- LogicalFilter(condition=[AND(=($0, $cor7.d2), IS NULL($1))])
               +- LogicalCorrelate(correlation=[$cor4], joinType=[left], 
requiredColumns=[{0}])
                  :- LogicalProject(inputs=[0])
                  :  +- LogicalTableScan(table=[[default_catalog, 
default_database, r, source: [TestTableSource(d1, e, f)]]])
                  +- LogicalAggregate(group=[{}], agg#0=[MIN($0)])
                     +- LogicalProject(exprs=[[true]])
                        +- LogicalFilter(condition=[AND(=($0, $cor4.d1), =($1, 
$cor4.d1), =(CAST($2):BIGINT, $cor7.d3))])
                           +- LogicalProject(exprs=[[+($0, 4), +($0, 5), +($0, 
6)]])
                              +- LogicalTableScan(table=[[default_catalog, 
default_database, t, source: [TestTableSource(i, j, k)]]])
{code}

You can find the new plan uses a correlate node rather than a join node.

2. CALCITE-5789 has fix this problem by removing the nested correlation node.







  was:
Although this test doesn't throw an exception, the final plan produces 3 
columns rather than 2 after optimization.

{code:java}
LogicalProject(inputs=[0..1], exprs=[[$4]])
+- LogicalFilter(condition=[IS NULL($5)])
   +- LogicalJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[left])
      :- LogicalProject(exprs=[[+(2, $0), +(3, $1)]])
      :  +- LogicalTableScan(table=[[default_catalog, default_database, l, 
source: [TestTableSource(a, b, c)]]])
      +- LogicalProject(inputs=[0..2], exprs=[[true]])
         +- LogicalAggregate(group=[{0, 1, 2}])
            +- LogicalProject(inputs=[0..2])
               +- LogicalFilter(condition=[IS NULL($3)])
                  +- LogicalJoin(condition=[true], joinType=[left])
                     :- LogicalFilter(condition=[IS NOT NULL($0)])
                     :  +- LogicalProject(exprs=[[+($0, 1)]])
                     :     +- LogicalTableScan(table=[[default_catalog, 
default_database, r, source: [TestTableSource(d, e, f)]]])
                     +- LogicalProject(inputs=[0..1], exprs=[[true]])
                        +- LogicalAggregate(group=[{0, 1}])
                           +- LogicalProject(exprs=[[$3, $0]])
                              +- LogicalFilter(condition=[AND(=($1, $0), 
=(CAST($2):BIGINT, $3))])
                                 +- LogicalProject(exprs=[[+($0, 4), +($0, 5), 
+($0, 6), CAST(+($0, 6)):BIGINT]])
                                    +- 
LogicalTableScan(table=[[default_catalog, default_database, t, source: 
[TestTableSource(i, j, k)]]])

{code}

After digging, I think it's the SubQueryRemoveRule doesn't generate the 
Correlate but generates the Join node, which causes the failure of the 
decorrelation. For a quick fix, I think we should throw an exception to notify 
users it's not a supported feature in the Flink. 

There might exist 2 ways to fix this issue:
1. Expand subquery when converting SQL to rel.  After experimenting with 
calcite, I found that the Sql2RelConverter generates the correct plan.

{code:java}
LogicalProject(inputs=[0..1])
+- LogicalFilter(condition=[IS NULL($2)])
   +- LogicalCorrelate(correlation=[$cor7], joinType=[left], 
requiredColumns=[{0, 1}])
      :- LogicalProject(exprs=[[+(2, $0), +(3, $1)]])
      :  +- LogicalTableScan(table=[[default_catalog, default_database, l, 
source: [TestTableSource(a, b, c)]]])
      +- LogicalAggregate(group=[{}], agg#0=[MIN($0)])
         +- LogicalProject(exprs=[[true]])
            +- LogicalFilter(condition=[AND(=($0, $cor7.d2), IS NULL($1))])
               +- LogicalCorrelate(correlation=[$cor4], joinType=[left], 
requiredColumns=[{0}])
                  :- LogicalProject(inputs=[0])
                  :  +- LogicalTableScan(table=[[default_catalog, 
default_database, r, source: [TestTableSource(d1, e, f)]]])
                  +- LogicalAggregate(group=[{}], agg#0=[MIN($0)])
                     +- LogicalProject(exprs=[[true]])
                        +- LogicalFilter(condition=[AND(=($0, $cor4.d1), =($1, 
$cor4.d1), =(CAST($2):BIGINT, $cor7.d3))])
                           +- LogicalProject(exprs=[[+($0, 4), +($0, 5), +($0, 
6)]])
                              +- LogicalTableScan(table=[[default_catalog, 
default_database, t, source: [TestTableSource(i, j, k)]]])
{code}

You can find the new plan uses a correlate node rather than a join node.

2. CALCITE-4686 might fix this problem by removing the nested correlation node.








> SubQueryAntiJoinTest#testMultiNotExistsWithCorrelatedOnWhere_NestedCorrelation
>  doesn't produce the correct plan
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33446
>                 URL: https://issues.apache.org/jira/browse/FLINK-33446
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.17.2, 1.19.0, 1.18.1
>            Reporter: Shengkai Fang
>            Priority: Major
>
> Although this test doesn't throw an exception, the final plan produces 3 
> columns rather than 2 after optimization.
> {code:java}
> LogicalProject(inputs=[0..1], exprs=[[$4]])
> +- LogicalFilter(condition=[IS NULL($5)])
>    +- LogicalJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[left])
>       :- LogicalProject(exprs=[[+(2, $0), +(3, $1)]])
>       :  +- LogicalTableScan(table=[[default_catalog, default_database, l, 
> source: [TestTableSource(a, b, c)]]])
>       +- LogicalProject(inputs=[0..2], exprs=[[true]])
>          +- LogicalAggregate(group=[{0, 1, 2}])
>             +- LogicalProject(inputs=[0..2])
>                +- LogicalFilter(condition=[IS NULL($3)])
>                   +- LogicalJoin(condition=[true], joinType=[left])
>                      :- LogicalFilter(condition=[IS NOT NULL($0)])
>                      :  +- LogicalProject(exprs=[[+($0, 1)]])
>                      :     +- LogicalTableScan(table=[[default_catalog, 
> default_database, r, source: [TestTableSource(d, e, f)]]])
>                      +- LogicalProject(inputs=[0..1], exprs=[[true]])
>                         +- LogicalAggregate(group=[{0, 1}])
>                            +- LogicalProject(exprs=[[$3, $0]])
>                               +- LogicalFilter(condition=[AND(=($1, $0), 
> =(CAST($2):BIGINT, $3))])
>                                  +- LogicalProject(exprs=[[+($0, 4), +($0, 
> 5), +($0, 6), CAST(+($0, 6)):BIGINT]])
>                                     +- 
> LogicalTableScan(table=[[default_catalog, default_database, t, source: 
> [TestTableSource(i, j, k)]]])
> {code}
> After digging, I think it's the SubQueryRemoveRule doesn't generate the 
> Correlate but generates the Join node, which causes the failure of the 
> decorrelation. For a quick fix, I think we should throw an exception to 
> notify users it's not a supported feature in the Flink. 
> There might exist 2 ways to fix this issue:
> 1. Expand subquery when converting SQL to rel.  After experimenting with 
> calcite, I found that the Sql2RelConverter generates the correct plan.
> {code:java}
> LogicalProject(inputs=[0..1])
> +- LogicalFilter(condition=[IS NULL($2)])
>    +- LogicalCorrelate(correlation=[$cor7], joinType=[left], 
> requiredColumns=[{0, 1}])
>       :- LogicalProject(exprs=[[+(2, $0), +(3, $1)]])
>       :  +- LogicalTableScan(table=[[default_catalog, default_database, l, 
> source: [TestTableSource(a, b, c)]]])
>       +- LogicalAggregate(group=[{}], agg#0=[MIN($0)])
>          +- LogicalProject(exprs=[[true]])
>             +- LogicalFilter(condition=[AND(=($0, $cor7.d2), IS NULL($1))])
>                +- LogicalCorrelate(correlation=[$cor4], joinType=[left], 
> requiredColumns=[{0}])
>                   :- LogicalProject(inputs=[0])
>                   :  +- LogicalTableScan(table=[[default_catalog, 
> default_database, r, source: [TestTableSource(d1, e, f)]]])
>                   +- LogicalAggregate(group=[{}], agg#0=[MIN($0)])
>                      +- LogicalProject(exprs=[[true]])
>                         +- LogicalFilter(condition=[AND(=($0, $cor4.d1), 
> =($1, $cor4.d1), =(CAST($2):BIGINT, $cor7.d3))])
>                            +- LogicalProject(exprs=[[+($0, 4), +($0, 5), 
> +($0, 6)]])
>                               +- LogicalTableScan(table=[[default_catalog, 
> default_database, t, source: [TestTableSource(i, j, k)]]])
> {code}
> You can find the new plan uses a correlate node rather than a join node.
> 2. CALCITE-5789 has fix this problem by removing the nested correlation node.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to