[
https://issues.apache.org/jira/browse/FLINK-31830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17732005#comment-17732005
]
Jane Chan edited comment on FLINK-31830 at 6/26/23 3:54 PM:
------------------------------------------------------------
The issue is described in the previous comment. However, aligning the behavior
between Table API & SQL might be a breaking change to the current design. Since
RowType(extends LogicalType), and FieldsDataType(extends DataType) are all
public APIs, I think we need a discussion before rushing to the fix.
h4. Behavior Change
1. Currently, a RowType respects any nullability specification on the attribute
level and record level. But after the fix, {*}a row type with attributes not
null but record nullable{*}(e.g. ROW<f0 INT NOT NULL>) {*}should not get
supported anymore{*}. See the following matrix.
||User-specified Row Type with Nullability||Current Supported||Plan to Support
||
|ROW<f0 INT NOT NULL> NOT NULL|Y|Y|
|ROW<f0 INT> NOT NULL|Y|Y|
|ROW<f0 INT NOT NULL>|Y|{color:#ff0000}N. {color}
{color:#ff0000}To align the behavior with SQL, where{color}
{color:#ff0000}ROW<f0 INT NOT NULL> is implicitly rewritten to {color}
{color:#ff0000}ROW<f0 INT> by Calcite[1].{color}|
|ROW<f0 INT>|Y|Y|
2. Currently, AbstractDataType(the interface) has two methods, *nullable()* and
*notNull(),* to change the underlying logical type's nullability and get a
reconfigured DataType instance. Although it is not explicitly stated in the
JavaDoc, the current fact is that the conversion between these two methods can
maintain the idempotence of the row type[2], and this feature may be broken
after the fix.
Current Behavior
1. copy(boolean) only changes the record's level of nullability
2. preserve idempotence
After the Fix
1. copy(boolean) changes the record's level nullability, and if nullable ==
true, the attribute's nullability is also set to true
2. might lose idempotence in some cases
See the following matrix.
||User-specified Row Type with Nullability|| Current Behavior|| After the Fix||
|r ROW<f0 INT NOT NULL> NOT NULL|r.nullable() => ROW<f0 INT NOT NULL>
r.notNull() => r
r.nullable().notNull() => r|r.nullable() => ROW<f0 INT>
r.notNull() => r
{color:#ff0000}r.nullable().notNull() => ROW<f0 INT> NOT NULL{color}|
|r ROW<f0 INT> NOT NULL|r.nullable() => ROW<f0 INT>
r.notNull() => r
r.nullable().notNull() => r|r.nullable() => ROW<f0 INT>
r.notNull() => r
r.nullable().notNull() => r|
|r ROW<f0 INT NOT NULL>|r.nullable() => r
r.notNull() => ROW<f0 INT NOT NULL> NOT NULL
r.notNull().nullable() => r|since ROW<f0 INT NOT NULL> is not considered a
valid type anymore, ROW<f0 INT NOT NULL> will be first interpreted as ROW<f0
INT>
r.nullable() => ROW<f0 INT>
r.notNull() => ROW<f0 INT> NOT NULL
r.notNull().nullable() => ROW<f0 INT>|
|r ROW<f0 INT>|r.nullable() => r
r.notNull() => ROW<f0 INT> NOT NULL
r.notNull().nullable() => r|r.nullable() => r
r.notNull() => ROW<f0 INT> NOT NULL
r.notNull().nullable() => r|
Since the default nullability for LogicalType is true, it's often the case to
use DataTypes.ROW(...).notNull() to create `ROW<...> NOT NULL`. While we
correct the behavior for DataTypes.ROW(...), it will not be possible to create
a `ROW<...> NOT NULL` using DataTypes.ROW(...).notNull().
h4. Possible Plan
I think the core contradiction lies in the fact that, on the one hand, the
{{notNull()}} and {{nullable()}} of {{FieldsDataType}} need to be consistent
with the implementation of other classes that implement the interface
AbstractDataType, such as {{{}AtomicDataType{}}}, which can continuously create
reconfigured instances through {{notNull()}} and {{{}nullable(){}}}. On the
other hand, the {{nullable()}} method of {{FieldsDataType}} needs to follow
Calcite's rules and rewrite the nullability of the internal fields together.
One possible way is to introduce a method to DataTypes that can specify
nullability when constructing RowType.
{code:java}
// interpreted as ROW<f0 INT NOT NULL> NOT NULL
DataTypes.ROW(false, DataTypes.FIELD("f0", DataTypes.INT().notNull()))
// We may only allow calls like
DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.INT().nullable()))
{code}
Another way is to introduce a builder like
{code:java}
// interpreted as ROW<f0 INT NOT NULL> NOT NULL
DataTypes.ROW.Builder builder =
DataTypes.ROW.newBuidler()
.field(DataTypes.FIELD("f0", DataTypes.INT().notNull())
.notNull()
.build();{code}
----
h4. Ref
[1] CALCITE-2464
[2] Idempotence means DataTypes.ROW(...).notNull().nullable() (can be repeated
many times) equals DataTypes.ROW(...)
[3]
[FLIP-37|https://cwiki.apache.org/confluence/display/FLINK/FLIP-37%3A+Rework+of+the+Table+API+Type+System]
was (Author: qingyue):
The issue is described in the previous comment. However, aligning the behavior
between Table API & SQL might be a breaking change to the current design. Since
RowType(extends LogicalType), and FieldsDataType(extends DataType) are all
public APIs, I think we need a discussion before rushing to the fix.
h4. Behavior Change
1. Currently, a RowType respects any nullability specification on the attribute
level and record level. But after the fix, {*}a row type with attributes not
null but record nullable{*}(e.g. ROW<f0 INT NOT NULL>) {*}should not get
supported anymore{*}. See the following matrix.
||User-specified Row Type with Nullability||Current Supported||Plan to Support
||
|ROW<f0 INT NOT NULL> NOT NULL|Y|Y|
|ROW<f0 INT> NOT NULL|Y|Y|
|ROW<f0 INT NOT NULL>|Y|{color:#ff0000}N. {color}
{color:#ff0000}To align the behavior with SQL, where{color}
{color:#ff0000}ROW<f0 INT NOT NULL> is implicitly rewritten to {color}
{color:#ff0000}ROW<f0 INT> by Calcite[1].{color}|
|ROW<f0 INT>|Y|Y|
2. Currently, AbstractDataType(the interface) has two methods, *nullable()* and
*notNull(),* to change the underlying logical type's nullability and get a
reconfigured DataType instance. Although it is not explicitly stated in the
JavaDoc, the current fact is that the conversion between these two methods can
maintain the idempotence of the row type[2], and this feature may be broken
after the fix.
Current Behavior
1. copy(boolean) only changes the record's level of nullability
2. preserve idempotence
After the Fix
1. copy(boolean) changes the record's level nullability, and if nullable ==
true, the attribute's nullability is also set to true
2. might lose idempotence in some cases
See the following matrix.
||User-specified Row Type with Nullability|| Current Behavior|| After the Fix||
|r ROW<f0 INT NOT NULL> NOT NULL|r.nullable() => ROW<f0 INT NOT NULL>
r.notNull() => r
r.nullable().notNull() => r|r.nullable() => ROW<f0 INT>
r.notNull() => r
{color:#ff0000}r.nullable().notNull() => ROW<f0 INT> NOT NULL{color}|
|r ROW<f0 INT> NOT NULL|r.nullable() => ROW<f0 INT>
r.notNull() => r
r.nullable().notNull() => r|r.nullable() => ROW<f0 INT>
r.notNull() => r
r.nullable().notNull() => r|
|r ROW<f0 INT NOT NULL>|r.nullable() => r
r.notNull() => ROW<f0 INT NOT NULL> NOT NULL
r.notNull().nullable() => r|since ROW<f0 INT NOT NULL> is not considered a
valid type anymore, ROW<f0 INT NOT NULL> will be first interpreted as ROW<f0
INT>
r.nullable() => ROW<f0 INT>
r.notNull() => ROW<f0 INT> NOT NULL
r.notNull().nullable() => ROW<f0 INT>|
|r ROW<f0 INT>|r.nullable() => r
r.notNull() => ROW<f0 INT> NOT NULL
r.notNull().nullable() => r|r.nullable() => r
r.notNull() => ROW<f0 INT> NOT NULL
r.notNull().nullable() => r|
Since the default nullability for LogicalType is true, it's often the case to
use DataTypes.ROW(...).notNull() to create `ROW<...> NOT NULL`. While we
correct the behavior for DataTypes.ROW(...), it will not be possible to create
a `ROW<...> NOT NULL` using DataTypes.ROW(...).notNull().
h4. Possible Plan
I think the core contradiction lies in the fact that, on the one hand, the
{{notNull()}} and {{nullable()}} of {{FieldsDataType}} need to be consistent
with the implementation of other classes that implement the interface
AbstractDataType, such as {{{}AtomicDataType{}}}, which can continuously create
reconfigured instances through {{notNull()}} and {{{}nullable(){}}}. On the
other hand, the {{nullable()}} method of {{FieldsDataType}} needs to follow
Calcite's rules and rewrite the nullability of the internal fields together.
One possible way is to introduce a method to DataTypes that can specify
nullability when constructing RowType.
{code:java}
// interpreted as ROW<f0 INT NOT NULL> NOT NULL
DataTypes.ROW(false, DataTypes.FIELD("f0", DataTypes.INT().notNull()))
// We may only allow calls like
DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.INT().nullable()))
{code}
Another way is to introduce a builder like
{code:java}
// interpreted as ROW<f0 INT NOT NULL> NOT NULL
DataTypes.ROW.Builder builder =
DataTypes.ROW.newBuidler()
.field(DataTypes.FIELD("f0", DataTypes.INT().notNull())
.notNull()
.build();{code}
----
h4. Ref
[1] CALCITE-2464[2] Idempotence means DataTypes.ROW(...).notNull().nullable()
(can be repeated many times) equals DataTypes.ROW(...)
[3]
[FLIP-37|https://cwiki.apache.org/confluence/display/FLINK/FLIP-37%3A+Rework+of+the+Table+API+Type+System]
> Coalesce on nested fields with different nullabilities will get wrong plan
> --------------------------------------------------------------------------
>
> Key: FLINK-31830
> URL: https://issues.apache.org/jira/browse/FLINK-31830
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.14.6
> Reporter: lincoln lee
> Assignee: Jane Chan
> Priority: Major
> Attachments: image-2023-06-09-15-06-01-322.png,
> image-2023-06-09-15-21-13-720.png
>
>
> A test case similar to FLINK-31829, only changes the nullable field `a.np` to
> not null, will get a wrong plan in 1.14.x (reported from the community user):
> {code}
> @Test
> def testCoalesceOnNestedColumns(): Unit = {
> val tEnv = util.tableEnv
> val tableDescriptor = TableDescriptor.forConnector("datagen")
> .schema(Schema.newBuilder
> .column("id", DataTypes.INT.notNull)
> .column("a", DataTypes.ROW(DataTypes.FIELD("np",
> DataTypes.INT.notNull())).nullable)
> .build)
> .build
> tEnv.createTemporaryTable("t1", tableDescriptor)
> tEnv.createTemporaryTable("t2", tableDescriptor)
> val res = tEnv.executeSql("EXPLAIN SELECT a.id, COALESCE(a.a.np, b.a.np)
> c1, IFNULL(a.a.np, b.a.np) c2 FROM t1 a left JOIN t2 b ON a.id=b.id where a.a
> is null or a.a.np is null")
> res.print()
> }
> == Abstract Syntax Tree ==
> LogicalProject(id=[$0], c1=[CAST($1.np):INTEGER], c2=[IFNULL($1.np, $3.np)])
> +- LogicalFilter(condition=[OR(IS NULL($1), IS NULL(CAST($1.np):INTEGER))])
> +- LogicalJoin(condition=[=($0, $2)], joinType=[left])
> :- LogicalTableScan(table=[[default_catalog, default_database, t1]])
> +- LogicalTableScan(table=[[default_catalog, default_database, t2]])
> {code}
> the top project in the ast is wrong: `LogicalProject(id=[$0],
> c1=[CAST($1.np):INTEGER], c2=[IFNULL($1.np, $3.np)])`, the
> `c1=[CAST($1.np):INTEGER]` relate to `COALESCE(a.a.np, b.a.np) c1` is
> incorrect,
> but this works fine when using sql ddl to create tables
> {code}
> @Test
> def testCoalesceOnNestedColumns2(): Unit = {
> val tEnv = util.tableEnv
> tEnv.executeSql(
> s"""
> |create temporary table t1 (
> | id int not null,
> | a row<np int not null>
> |) with (
> | 'connector' = 'datagen'
> |)
> |""".stripMargin)
> tEnv.executeSql(
> s"""
> |create temporary table t2 (
> | id int not null,
> | a row<np int not null>
> |) with (
> | 'connector' = 'datagen'
> |)
> |""".stripMargin)
> val res = tEnv.executeSql(
> "EXPLAIN SELECT a.id, COALESCE(a.a.np, b.a.np) c1, IFNULL(a.a.np,
> b.a.np) c2 FROM t1 a left JOIN t2 b ON a.id=b.id where a.a is null or a.a.np
> is null")
> res.print()
> }
> {code}
> from 1.15, the coalesce will be a new builtin function, and the ast looks
> correct in version 1.15+, while before 1.15 it was rewritten as `case when`
--
This message was sent by Atlassian Jira
(v8.20.10#820010)