[ 
https://issues.apache.org/jira/browse/FLINK-31830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17732005#comment-17732005
 ] 

Jane Chan edited comment on FLINK-31830 at 6/29/23 7:35 AM:
------------------------------------------------------------

The issue is described in the previous comment. However, aligning the behavior 
between Table API & SQL might be a breaking change to the current design. Since 
RowType(extends LogicalType), and FieldsDataType(extends DataType) are all 
public APIs, I think we need a discussion before rushing to the fix. 
h4. TL;DR

1. The nullability of row type created via SQL and Table API is inconsistent.
ROW<f0 INT NOT NULL> is not a valid type for SQL, but Table API accepts it.

2. Table API should align the behavior to SQL. 
2.1 RowType.copy(boolean nullable) should also set the inner fields to null if 
nullable is true.
2.2 RowType's constructor should also check nullability.
2.3 FieldsDataType.nullable() should also set the inner fields to null.

3. But this alignment will cause incompatible changes on public API.
3.1
{code:java}
DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.INT().notNull())).notNull() 
cannot create 
ROW<INT NOT NULL>NOT NULL because the default nullability is true.
{code}
3.2 Row type will lose idempotence for chained call 
notNull().nullable().notNull()

4. Suggest introducing a Builder to DataTypes.ROW to fix 3.1
h4. Behavior Change

1. Currently, a RowType respects any nullability specification on the attribute 
and record levels. But after the fix, {*}a row type with attributes not null 
but record nullable{*}(e.g. ROW<f0 INT NOT NULL>) {*}should not get supported 
anymore{*}. See the following matrix.
||User-specified Row Type with Nullability||Current Supported||Plan to Support 
||
|ROW<f0 INT NOT NULL> NOT NULL|Y|Y|
|ROW<f0 INT> NOT NULL|Y|Y|
|ROW<f0 INT NOT NULL>|Y|{color:#ff0000}N. {color}
{color:#ff0000}To align the behavior with SQL, where{color}
{color:#ff0000}ROW<f0 INT NOT NULL> is implicitly rewritten to {color}
{color:#ff0000}ROW<f0 INT> by Calcite[1].{color}|
|ROW<f0 INT>|Y|Y|

 

2. Currently, AbstractDataType(the interface) has two methods, *nullable()* and 
*notNull(),* to change the underlying logical type's nullability and get a 
reconfigured DataType instance. Although it is not explicitly stated in the 
JavaDoc, the current fact is that the conversion between these two methods can 
maintain the idempotence of the row type[2], and this feature may be broken 
after the fix.

Current Behavior 
1. copy(boolean) only changes the record's level of nullability
2. preserve idempotence

After the Fix
1. copy(boolean) changes the record's level nullability, and if nullable == 
true, the attribute's nullability is also set to true
2. might lose idempotence in some cases

See the following matrix.
||User-specified Row Type with Nullability||  Current Behavior|| After the Fix||
|r ROW<f0 INT NOT NULL> NOT NULL|r.nullable() => ROW<f0 INT NOT NULL>
 
r.notNull() => r
 
r.nullable().notNull() => r|r.nullable() => ROW<f0 INT>
 
r.notNull() => r
 
{color:#ff0000}r.nullable().notNull() => ROW<f0 INT> NOT NULL{color}|
|r ROW<f0 INT> NOT NULL|r.nullable() => ROW<f0 INT>
 
r.notNull() => r
 
r.nullable().notNull()=> r|r.nullable() => ROW<f0 INT>
 
r.notNull() => r
 
r.nullable().notNull() => r|
|r ROW<f0 INT NOT NULL>|r.nullable() => r
 
r.notNull() => ROW<f0 INT NOT NULL> NOT NULL
 
r.notNull().nullable() =>r|since ROW<f0 INT NOT NULL> is not considered a valid 
type anymore, ROW<f0 INT NOT NULL> will be first interpreted as ROW<f0 INT> 
 
r.nullable() => ROW<f0 INT>
 
r.notNull() => ROW<f0 INT> NOT NULL
 
r.notNull().nullable() => ROW<f0 INT>|
|r ROW<f0 INT>|r.nullable() => r
 
r.notNull() => ROW<f0 INT> NOT NULL
 
r.notNull().nullable() => r|r.nullable() => r
 
r.notNull() => ROW<f0 INT> NOT NULL
 
r.notNull().nullable() => r|

Since the default nullability for LogicalType is true, it's often the case to 
use DataTypes.ROW(...).notNull() to create `ROW<...> NOT NULL`. While we 
correct the behavior for DataTypes.ROW(...), it will not be possible to create 
a `ROW<...> NOT NULL` using DataTypes.ROW(...).notNull().
h4. Possible Plan

I think the core contradiction lies in the fact that, on the one hand, the 
{{notNull()}} and {{nullable()}} of {{FieldsDataType}} need to be consistent 
with the implementation of other classes that implement the interface 
AbstractDataType, such as {{{}AtomicDataType{}}}, which can continuously create 
reconfigured instances through {{notNull()}} and {{{}nullable(){}}}. On the 
other hand, the {{nullable()}} method of {{FieldsDataType}} needs to follow 
Calcite's rules and rewrite the nullability of the internal fields together.

One possible way is to introduce a method to DataTypes that can specify 
nullability when constructing RowType.
{code:java}
// interpreted as ROW<f0 INT NOT NULL> NOT NULL
DataTypes.ROW(false, DataTypes.FIELD("f0", DataTypes.INT().notNull()))

// We may only allow calls like 
DataType rowType = DataTypes.ROW(DataTypes.FIELD("f0", 
DataTypes.INT().nullable()))
{code}
Another way is to introduce a builder like
{code:java}
 // interpreted as ROW<f0 INT NOT NULL> NOT NULL
DataTypes.ROW.Builder builder = 
    DataTypes.ROW.newBuidler()
        .field(DataTypes.FIELD("f0", DataTypes.INT().notNull())
        .notNull()
DataType rowType = builder.build();{code}
and adapt the current DataTypes.ROW() to use the builder.
----
h4. Ref

[1] CALCITE-2464

[2] Idempotence means DataTypes.ROW(...).notNull().nullable() (can be repeated 
many times) equals DataTypes.ROW(...)

[3] 
[FLIP-37|https://cwiki.apache.org/confluence/display/FLINK/FLIP-37%3A+Rework+of+the+Table+API+Type+System]

 

I slightly prefer the builder pattern, but it would be great if you could share 
some suggestions. Cc [~lincoln.86xy] [~twalthr]  [~snuyanzin] 

 


was (Author: qingyue):
The issue is described in the previous comment. However, aligning the behavior 
between Table API & SQL might be a breaking change to the current design. Since 
RowType(extends LogicalType), and FieldsDataType(extends DataType) are all 
public APIs, I think we need a discussion before rushing to the fix. 
h4. Behavior Change

1. Currently, a RowType respects any nullability specification on the attribute 
and record levels. But after the fix, {*}a row type with attributes not null 
but record nullable{*}(e.g. ROW<f0 INT NOT NULL>) {*}should not get supported 
anymore{*}. See the following matrix.
||User-specified Row Type with Nullability||Current Supported||Plan to Support 
||
|ROW<f0 INT NOT NULL> NOT NULL|Y|Y|
|ROW<f0 INT> NOT NULL|Y|Y|
|ROW<f0 INT NOT NULL>|Y|{color:#ff0000}N. {color}
{color:#ff0000}To align the behavior with SQL, where{color}
{color:#ff0000}ROW<f0 INT NOT NULL> is implicitly rewritten to {color}
{color:#ff0000}ROW<f0 INT> by Calcite[1].{color}|
|ROW<f0 INT>|Y|Y|

 

2. Currently, AbstractDataType(the interface) has two methods, *nullable()* and 
*notNull(),* to change the underlying logical type's nullability and get a 
reconfigured DataType instance. Although it is not explicitly stated in the 
JavaDoc, the current fact is that the conversion between these two methods can 
maintain the idempotence of the row type[2], and this feature may be broken 
after the fix.

Current Behavior 
1. copy(boolean) only changes the record's level of nullability
2. preserve idempotence

After the Fix
1. copy(boolean) changes the record's level nullability, and if nullable == 
true, the attribute's nullability is also set to true
2. might lose idempotence in some cases

See the following matrix.
||User-specified Row Type with Nullability||  Current Behavior|| After the Fix||
|r ROW<f0 INT NOT NULL> NOT NULL|r.nullable() => ROW<f0 INT NOT NULL>
 
r.notNull() => r
 
r.nullable().notNull() => r|r.nullable() => ROW<f0 INT>
 
r.notNull() => r
 
{color:#ff0000}r.nullable().notNull() => ROW<f0 INT> NOT NULL{color}|
|r ROW<f0 INT> NOT NULL|r.nullable() => ROW<f0 INT>
 
r.notNull() => r
 
r.nullable().notNull()=> r|r.nullable() => ROW<f0 INT>
 
r.notNull() => r
 
r.nullable().notNull() => r|
|r ROW<f0 INT NOT NULL>|r.nullable() => r
 
r.notNull() => ROW<f0 INT NOT NULL> NOT NULL
 
r.notNull().nullable() =>r|since ROW<f0 INT NOT NULL> is not considered a valid 
type anymore, ROW<f0 INT NOT NULL> will be first interpreted as ROW<f0 INT> 
 
r.nullable() => ROW<f0 INT>
 
r.notNull() => ROW<f0 INT> NOT NULL
 
r.notNull().nullable() => ROW<f0 INT>|
|r ROW<f0 INT>|r.nullable() => r
 
r.notNull() => ROW<f0 INT> NOT NULL
 
r.notNull().nullable() => r|r.nullable() => r
 
r.notNull() => ROW<f0 INT> NOT NULL
 
r.notNull().nullable() => r|

Since the default nullability for LogicalType is true, it's often the case to 
use DataTypes.ROW(...).notNull() to create `ROW<...> NOT NULL`. While we 
correct the behavior for DataTypes.ROW(...), it will not be possible to create 
a `ROW<...> NOT NULL` using DataTypes.ROW(...).notNull().
h4. Possible Plan

I think the core contradiction lies in the fact that, on the one hand, the 
{{notNull()}} and {{nullable()}} of {{FieldsDataType}} need to be consistent 
with the implementation of other classes that implement the interface 
AbstractDataType, such as {{{}AtomicDataType{}}}, which can continuously create 
reconfigured instances through {{notNull()}} and {{{}nullable(){}}}. On the 
other hand, the {{nullable()}} method of {{FieldsDataType}} needs to follow 
Calcite's rules and rewrite the nullability of the internal fields together.

One possible way is to introduce a method to DataTypes that can specify 
nullability when constructing RowType.
{code:java}
// interpreted as ROW<f0 INT NOT NULL> NOT NULL
DataTypes.ROW(false, DataTypes.FIELD("f0", DataTypes.INT().notNull()))

// We may only allow calls like 
DataType rowType = DataTypes.ROW(DataTypes.FIELD("f0", 
DataTypes.INT().nullable()))
{code}
Another way is to introduce a builder like
{code:java}
 // interpreted as ROW<f0 INT NOT NULL> NOT NULL
DataTypes.ROW.Builder builder = 
    DataTypes.ROW.newBuidler()
        .field(DataTypes.FIELD("f0", DataTypes.INT().notNull())
        .notNull()
DataType rowType = builder.build();{code}
and adapt the current DataTypes.ROW() to use the builder.
----
h4. Ref

[1] CALCITE-2464

[2] Idempotence means DataTypes.ROW(...).notNull().nullable() (can be repeated 
many times) equals DataTypes.ROW(...)

[3] 
[FLIP-37|https://cwiki.apache.org/confluence/display/FLINK/FLIP-37%3A+Rework+of+the+Table+API+Type+System]

 

I slightly prefer the builder pattern, but it would be great if you could share 
some suggestions. Cc [~lincoln.86xy] [~twalthr]  [~snuyanzin] 

 

> Coalesce on nested fields with different nullabilities will get wrong plan
> --------------------------------------------------------------------------
>
>                 Key: FLINK-31830
>                 URL: https://issues.apache.org/jira/browse/FLINK-31830
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.14.6
>            Reporter: lincoln lee
>            Assignee: Jane Chan
>            Priority: Major
>         Attachments: image-2023-06-09-15-06-01-322.png, 
> image-2023-06-09-15-21-13-720.png
>
>
> A test case similar to FLINK-31829, only changes the nullable field `a.np` to 
> not null, will get a wrong plan in 1.14.x (reported from the community user):
> {code}
>   @Test
>   def testCoalesceOnNestedColumns(): Unit = {
>     val tEnv = util.tableEnv
>     val tableDescriptor = TableDescriptor.forConnector("datagen")
>         .schema(Schema.newBuilder
>             .column("id", DataTypes.INT.notNull)
>             .column("a", DataTypes.ROW(DataTypes.FIELD("np", 
> DataTypes.INT.notNull())).nullable)
>             .build)
>         .build
>     tEnv.createTemporaryTable("t1", tableDescriptor)
>     tEnv.createTemporaryTable("t2", tableDescriptor)
>     val res = tEnv.executeSql("EXPLAIN SELECT a.id, COALESCE(a.a.np, b.a.np) 
> c1, IFNULL(a.a.np, b.a.np) c2 FROM t1 a left JOIN t2 b ON a.id=b.id where a.a 
> is null or a.a.np is null")
>     res.print()
> }  
> == Abstract Syntax Tree ==
> LogicalProject(id=[$0], c1=[CAST($1.np):INTEGER], c2=[IFNULL($1.np, $3.np)])
> +- LogicalFilter(condition=[OR(IS NULL($1), IS NULL(CAST($1.np):INTEGER))])
>    +- LogicalJoin(condition=[=($0, $2)], joinType=[left])
>       :- LogicalTableScan(table=[[default_catalog, default_database, t1]])
>       +- LogicalTableScan(table=[[default_catalog, default_database, t2]])
> {code}
> the top project in the ast is wrong:  `LogicalProject(id=[$0], 
> c1=[CAST($1.np):INTEGER], c2=[IFNULL($1.np, $3.np)])`, the 
> `c1=[CAST($1.np):INTEGER]` relate to `COALESCE(a.a.np, b.a.np) c1` is 
> incorrect,
> but this works fine when using sql ddl to create tables
> {code}
>   @Test
>   def testCoalesceOnNestedColumns2(): Unit = {
>     val tEnv = util.tableEnv
>     tEnv.executeSql(
>       s"""
>          |create temporary table t1 (
>          |  id int not null,
>          |  a row<np int not null>
>          |) with (
>          | 'connector' = 'datagen'
>          |)
>          |""".stripMargin)
>     tEnv.executeSql(
>       s"""
>          |create temporary table t2 (
>          |  id int not null,
>          |  a row<np int not null>
>          |) with (
>          | 'connector' = 'datagen'
>          |)
>          |""".stripMargin)
>     val res = tEnv.executeSql(
>       "EXPLAIN SELECT a.id, COALESCE(a.a.np, b.a.np) c1, IFNULL(a.a.np, 
> b.a.np) c2 FROM t1 a left JOIN t2 b ON a.id=b.id where a.a is null or a.a.np 
> is null")
>     res.print()
>   }
> {code}
> from 1.15, the coalesce will be a new builtin function, and the ast looks 
> correct in version 1.15+, while before 1.15 it was rewritten as `case when`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to