[ 
https://issues.apache.org/jira/browse/FLINK-31830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17732005#comment-17732005
 ] 

Jane Chan edited comment on FLINK-31830 at 6/26/23 2:39 PM:
------------------------------------------------------------

The issue is described in the previous comment. However, aligning the behavior 
between Table API & SQL might be a breaking change to the current design.

1. Currently, a RowType respects any nullability specification on the attribute 
level and record level. But after the fix, {*}a row type with attributes not 
null but record nullable{*}(e.g. ROW<f0 INT NOT NULL>) {*}should not get 
supported anymore{*}. See the following matrix.
||User-specified Row Type with Nullability||Current Supported||Plan to Support 
||
|ROW<f0 INT NOT NULL> NOT NULL|Y|Y|
|ROW<f0 INT> NOT NULL|Y|Y|
|ROW<f0 INT NOT NULL>|Y|{color:#ff0000}N. {color}
{color:#ff0000}To align the behavior with SQL, where{color}
{color:#ff0000}ROW<f0 INT NOT NULL> is implicitly rewritten to {color}
{color:#ff0000}ROW<f0 INT> by Calcite[1].{color}|
|ROW<f0 INT>|Y|Y|

2. Currently, AbstractDataType(the interface) has two method *nullable()* and 
*notNull()* to change the underlying logical type's nullability. Although it is 
not explicitly stated in the JavaDoc, the current fact is that the conversion 
between these two methods can maintain the idempotence of the row type[2], and 
this feature may be broken after the fix. See the following matrix.
||User-specified Row Type with Nullability||  Current Behavior|| After the Fix||
|r ROW<f0 INT NOT NULL> NOT NULL|r.copy(true) => ROW<f0 INT NOT NULL>
r.copy(false) => r
r.copy(true).copy(false) => r|r.copy(true) => ROW<f0 INT>
r.copy(false) => r
r.copy(true).copy(false) => ROW<f0 INT> NOT NULL|
|r ROW<f0 INT> NOT NULL|r.copy(true) => ROW<f0 INT>
r.copy(false) => r
r.copy(true).copy(false) => r|r.copy(true) => ROW<f0 INT>
r.copy(false) => r
r.copy(true).copy(false) => r|
|r ROW<f0 INT NOT NULL>|r.copy(true) => r
r.copy(false) => ROW<f0 INT NOT NULL> NOT NULL
r.copy(false).copy(true) => r|since ROW<f0 INT NOT NULL> is not condisered a 
valid type any more, so ROW<f0 INT NOT NULL> will be  first (1) interpreted as 
ROW<f0 INT> or (2) throw exception.
If (1), then 
r.copy(true) => r
r.copy(false) => ROW<f0 INT> NOT NULL
r.copy(false).copy(true) => r|
|r ROW<f0 INT>|r.copy(true) => r
r.copy(false) => ROW<f0 INT> NOT NULL
r.copy(false).copy(true) => r|r.copy(true) => r
r.copy(false) => ROW<f0 INT> NOT NULL
r.copy(false).copy(true) => r|

 
----
[1] CALCITE-2464
[2] Idempotence means DataTypes.ROW(...).notNull().nullable() (can be repeated 
many times) equals DataTypes.ROW(...)


was (Author: qingyue):
The issue is described in the previous comment. However, aligning the behavior 
between Table API & SQL might be a breaking change to the current design.

1. Currently, a RowType respects any nullability specification on the attribute 
level and record level. But after the fix, {*}a row type with attributes not 
null but record nullable{*}(e.g. ROW<f0 INT NOT NULL>) {*}should not get 
supported anymore{*}. See the following matrix.
||User-specified Row Type with Nullability||Current Supported||Plan to Support 
||
|ROW<f0 INT NOT NULL> NOT NULL|Y|Y|
|ROW<f0 INT> NOT NULL|Y|Y|
|ROW<f0 INT NOT NULL>|Y|{color:#ff0000}N. {color}
{color:#ff0000}To align the behavior with SQL, where{color}
{color:#ff0000}ROW<f0 INT NOT NULL> is implicitly rewritten to {color}
{color:#ff0000}ROW<f0 INT> by Calcite[1].{color}|
|ROW<f0 INT>|Y|Y|

2. Currently, AbstractDataType(the interface) has two method *nullable()* and 
*notNull()* to change the underlying logical type's nullability. Although it is 
not explicitly stated in the JavaDoc, the current fact is that the conversion 
between these two methods can maintain the idempotence of the row type[2], and 
this feature may be broken after the fix. See the following matrix.
||User-specified Row Type with Nullability||                                    
  Current Behavior

1. copy(boolean) only changes the record's level nullability
2. preserve idempotence||                                                       
                                 After the Fix

1. copy(boolean) changes the record's level nullability, and if nullable == 
true, the attribute's nullability is also set to true
2. might lose idempotence in some cases ||
|r ROW<f0 INT NOT NULL> NOT NULL|Col A2| |
|r ROW<f0 INT> NOT NULL| | |
|r ROW<f0 INT NOT NULL>| | |
|r ROW<f0 INT>| | |

 
----
[1] CALCITE-2464
[2] Idempotence means DataTypes.ROW(...).notNull().nullable() (can be repeated 
many times) equals DataTypes.ROW(...)

> Coalesce on nested fields with different nullabilities will get wrong plan
> --------------------------------------------------------------------------
>
>                 Key: FLINK-31830
>                 URL: https://issues.apache.org/jira/browse/FLINK-31830
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.14.6
>            Reporter: lincoln lee
>            Assignee: Jane Chan
>            Priority: Major
>         Attachments: image-2023-06-09-15-06-01-322.png, 
> image-2023-06-09-15-21-13-720.png
>
>
> A test case similar to FLINK-31829, only changes the nullable field `a.np` to 
> not null, will get a wrong plan in 1.14.x (reported from the community user):
> {code}
>   @Test
>   def testCoalesceOnNestedColumns(): Unit = {
>     val tEnv = util.tableEnv
>     val tableDescriptor = TableDescriptor.forConnector("datagen")
>         .schema(Schema.newBuilder
>             .column("id", DataTypes.INT.notNull)
>             .column("a", DataTypes.ROW(DataTypes.FIELD("np", 
> DataTypes.INT.notNull())).nullable)
>             .build)
>         .build
>     tEnv.createTemporaryTable("t1", tableDescriptor)
>     tEnv.createTemporaryTable("t2", tableDescriptor)
>     val res = tEnv.executeSql("EXPLAIN SELECT a.id, COALESCE(a.a.np, b.a.np) 
> c1, IFNULL(a.a.np, b.a.np) c2 FROM t1 a left JOIN t2 b ON a.id=b.id where a.a 
> is null or a.a.np is null")
>     res.print()
> }  
> == Abstract Syntax Tree ==
> LogicalProject(id=[$0], c1=[CAST($1.np):INTEGER], c2=[IFNULL($1.np, $3.np)])
> +- LogicalFilter(condition=[OR(IS NULL($1), IS NULL(CAST($1.np):INTEGER))])
>    +- LogicalJoin(condition=[=($0, $2)], joinType=[left])
>       :- LogicalTableScan(table=[[default_catalog, default_database, t1]])
>       +- LogicalTableScan(table=[[default_catalog, default_database, t2]])
> {code}
> the top project in the ast is wrong:  `LogicalProject(id=[$0], 
> c1=[CAST($1.np):INTEGER], c2=[IFNULL($1.np, $3.np)])`, the 
> `c1=[CAST($1.np):INTEGER]` relate to `COALESCE(a.a.np, b.a.np) c1` is 
> incorrect,
> but this works fine when using sql ddl to create tables
> {code}
>   @Test
>   def testCoalesceOnNestedColumns2(): Unit = {
>     val tEnv = util.tableEnv
>     tEnv.executeSql(
>       s"""
>          |create temporary table t1 (
>          |  id int not null,
>          |  a row<np int not null>
>          |) with (
>          | 'connector' = 'datagen'
>          |)
>          |""".stripMargin)
>     tEnv.executeSql(
>       s"""
>          |create temporary table t2 (
>          |  id int not null,
>          |  a row<np int not null>
>          |) with (
>          | 'connector' = 'datagen'
>          |)
>          |""".stripMargin)
>     val res = tEnv.executeSql(
>       "EXPLAIN SELECT a.id, COALESCE(a.a.np, b.a.np) c1, IFNULL(a.a.np, 
> b.a.np) c2 FROM t1 a left JOIN t2 b ON a.id=b.id where a.a is null or a.a.np 
> is null")
>     res.print()
>   }
> {code}
> from 1.15, the coalesce will be a new builtin function, and the ast looks 
> correct in version 1.15+, while before 1.15 it was rewritten as `case when`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to