[ 
https://issues.apache.org/jira/browse/FLINK-31830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730846#comment-17730846
 ] 

Jane Chan edited comment on FLINK-31830 at 6/26/23 2:25 AM:
------------------------------------------------------------

It takes some time to reason out the cause, and sorry for the late update. 
h4. 1. Identify the problem

The issue has been reproduced using Flink release-1.14.6 (which depends on 
calcite-1.26).
h4. 2. Why SQL gets a correct plan, while API doesn't

First, the resolved schema differs. You can verify this by the following code 
snippet.
{code:scala}
 @Test
  def testSchema(): Unit = {
    // create temporary table t1
    val tableDescriptor = TableDescriptor.forConnector("datagen")
      .schema(Schema.newBuilder
        .column("id", DataTypes.INT.notNull)
        .column("a", DataTypes.ROW(DataTypes.FIELD("np", 
DataTypes.INT.notNull())).nullable)
        .build)
      .build
    tableEnv.createTemporaryTable("t1", tableDescriptor)

    // create temporary table t2
    tableEnv.executeSql(
      s"""
         |create temporary table t2 (
         |  id int not null,
         |  a row<np int not null>
         |) with (
         | 'connector' = 'datagen'
         |)
         |""".stripMargin)

    val catalogManager = 
tableEnv.asInstanceOf[StreamTableEnvironmentImpl].getCatalogManager
    val result1 =
      catalogManager.getTable(ObjectIdentifier.of("default_catalog", 
"default_database", "t1"))
    val result2 =
      catalogManager.getTable(ObjectIdentifier.of("default_catalog", 
"default_database", "t2"))
    println(result1.get().getResolvedSchema)
    println(result2.get().getResolvedSchema)
  }
{code}
The result will be
{code:sql}
--result1
(
  `id` INT NOT NULL,
  `a` ROW<`np` INT NOT NULL>
)

--result2  
(
  `id` INT NOT NULL,
  `a` ROW<`np` INT> -- changed by 
org.apache.calcite.sql.SqlDataTypeSpec#fixUpNullability
)
{code}
You can tell from the print result that the nullability specified by the user 
does not get respect. However, this is a by-design behavior for Calcite. The 
community has an in-depth discussion CALCITE-2464 on the semantics of setting 
nullability for structured type.

TL;DR

!image-2023-06-09-15-06-01-322.png|width=844,height=219!

This feature is introduced in Calcite 1.19.

 

As a result, the DDL `a` ROW<`np` INT NOT NULL> will be rewritten to `a` 
ROW<`np` INT> during the SQL-to-operation conversion.

(Please check org.apache.calcite.sql.SqlDataTypeSpec#fixNullability for more 
details).

 

As for the plan, I assume based on the schema produced by API
{code:java}
( `id` INT NOT NULL, `a` ROW<`np` INT NOT NULL> ) {code}
the optimization rules work as expected.

 

The filter condition after the left outer join is
{code:java}
where a.a is null or a.a.np is null {code}
and can be reduced and pushdown as 
{code:java}
where a.a is null {code}
since the nullability of a.np is always false.

 

And RemoveUnreachableCoalesceArgumentsRule matches the following case
{code:java}
COALESCE(a.a.np, b.a.np) c1 {code}
because a.a.np is never nullable, so the invocation of coalesce is reduced. 

!image-2023-06-09-15-21-13-720.png|width=892,height=575!

 
h4. 3. Conclusion

So in a nutshell, we should align the structured type nullability created 
through API with SQL. Last but not least, we should improve the document and 
add a description of the nullability of structured type, o.w. it might not be 
straightforward for users to understand.
h4.  


was (Author: qingyue):
It takes some time to reason out the cause, and sorry for the late update. 
h4. 1. Identify the problem

The issue has been reproduced using Flink release-1.14.6 (which depends on 
calcite-1.26).
h4. 2. Why SQL gets a correct plan, while API doesn't

First, the resolved schema differs. You can verify this by the following code 
snippet.
{code:scala}
 @Test
  def testSchema(): Unit = {
    // create temporary table t1
    val tableDescriptor = TableDescriptor.forConnector("datagen")
      .schema(Schema.newBuilder
        .column("id", DataTypes.INT.notNull)
        .column("a", DataTypes.ROW(DataTypes.FIELD("np", 
DataTypes.INT.notNull())).nullable)
        .build)
      .build
    tableEnv.createTemporaryTable("t1", tableDescriptor)

    // create temporary table t2
    tableEnv.executeSql(
      s"""
         |create temporary table t2 (
         |  id int not null,
         |  a row<np int not null>
         |) with (
         | 'connector' = 'datagen'
         |)
         |""".stripMargin)

    val catalogManager = 
tableEnv.asInstanceOf[StreamTableEnvironmentImpl].getCatalogManager
    val result1 =
      catalogManager.getTable(ObjectIdentifier.of("default_catalog", 
"default_database", "t1"))
    val result2 =
      catalogManager.getTable(ObjectIdentifier.of("default_catalog", 
"default_database", "t2"))
    println(result1.get().getResolvedSchema)
    println(result2.get().getResolvedSchema)
  }
{code}
The result will be
{code:sql}
--result1
(
  `id` INT NOT NULL,
  `a` ROW<`np` INT NOT NULL>
)

--result2  
(
  `id` INT NOT NULL,
  `a` ROW<`np` INT> -- changed by 
org.apache.calcite.sql.SqlDataTypeSpec#fixNullability
)
{code}
You can tell from the print result that the nullability specified by the user 
does not get respect. However, this is a by-design behavior for Calcite. The 
community has an in-depth discussion CALCITE-2464 on the semantics of setting 
nullability for structured type.

TL;DR

!image-2023-06-09-15-06-01-322.png|width=844,height=219!

This feature is introduced in Calcite 1.19.

 

As a result, the DDL `a` ROW<`np` INT NOT NULL> will be rewritten to `a` 
ROW<`np` INT> during the SQL-to-operation conversion.

(Please check org.apache.calcite.sql.SqlDataTypeSpec#fixNullability for more 
details).

 

As for the plan, I assume based on the schema produced by API
{code:java}
( `id` INT NOT NULL, `a` ROW<`np` INT NOT NULL> ) {code}
the optimization rules work as expected.

 

The filter condition after the left outer join is
{code:java}
where a.a is null or a.a.np is null {code}
and can be reduced and pushdown as 
{code:java}
where a.a is null {code}
since the nullability of a.np is always false.

 

And RemoveUnreachableCoalesceArgumentsRule matches the following case
{code:java}
COALESCE(a.a.np, b.a.np) c1 {code}
because a.a.np is never nullable, so the invocation of coalesce is reduced. 

!image-2023-06-09-15-21-13-720.png|width=892,height=575!

 
h4. 3. Conclusion

So in a nutshell, we should align the structured type nullability created 
through API with SQL. Last but not least, we should improve the document and 
add a description of the nullability of structured type, o.w. it might not be 
straightforward for users to understand.
h4.  

> Coalesce on nested fields with different nullabilities will get wrong plan
> --------------------------------------------------------------------------
>
>                 Key: FLINK-31830
>                 URL: https://issues.apache.org/jira/browse/FLINK-31830
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.14.6
>            Reporter: lincoln lee
>            Assignee: Jane Chan
>            Priority: Major
>         Attachments: image-2023-06-09-15-06-01-322.png, 
> image-2023-06-09-15-21-13-720.png
>
>
> A test case similar to FLINK-31829, only changes the nullable field `a.np` to 
> not null, will get a wrong plan in 1.14.x (reported from the community user):
> {code}
>   @Test
>   def testCoalesceOnNestedColumns(): Unit = {
>     val tEnv = util.tableEnv
>     val tableDescriptor = TableDescriptor.forConnector("datagen")
>         .schema(Schema.newBuilder
>             .column("id", DataTypes.INT.notNull)
>             .column("a", DataTypes.ROW(DataTypes.FIELD("np", 
> DataTypes.INT.notNull())).nullable)
>             .build)
>         .build
>     tEnv.createTemporaryTable("t1", tableDescriptor)
>     tEnv.createTemporaryTable("t2", tableDescriptor)
>     val res = tEnv.executeSql("EXPLAIN SELECT a.id, COALESCE(a.a.np, b.a.np) 
> c1, IFNULL(a.a.np, b.a.np) c2 FROM t1 a left JOIN t2 b ON a.id=b.id where a.a 
> is null or a.a.np is null")
>     res.print()
> }  
> == Abstract Syntax Tree ==
> LogicalProject(id=[$0], c1=[CAST($1.np):INTEGER], c2=[IFNULL($1.np, $3.np)])
> +- LogicalFilter(condition=[OR(IS NULL($1), IS NULL(CAST($1.np):INTEGER))])
>    +- LogicalJoin(condition=[=($0, $2)], joinType=[left])
>       :- LogicalTableScan(table=[[default_catalog, default_database, t1]])
>       +- LogicalTableScan(table=[[default_catalog, default_database, t2]])
> {code}
> the top project in the ast is wrong:  `LogicalProject(id=[$0], 
> c1=[CAST($1.np):INTEGER], c2=[IFNULL($1.np, $3.np)])`, the 
> `c1=[CAST($1.np):INTEGER]` relate to `COALESCE(a.a.np, b.a.np) c1` is 
> incorrect,
> but this works fine when using sql ddl to create tables
> {code}
>   @Test
>   def testCoalesceOnNestedColumns2(): Unit = {
>     val tEnv = util.tableEnv
>     tEnv.executeSql(
>       s"""
>          |create temporary table t1 (
>          |  id int not null,
>          |  a row<np int not null>
>          |) with (
>          | 'connector' = 'datagen'
>          |)
>          |""".stripMargin)
>     tEnv.executeSql(
>       s"""
>          |create temporary table t2 (
>          |  id int not null,
>          |  a row<np int not null>
>          |) with (
>          | 'connector' = 'datagen'
>          |)
>          |""".stripMargin)
>     val res = tEnv.executeSql(
>       "EXPLAIN SELECT a.id, COALESCE(a.a.np, b.a.np) c1, IFNULL(a.a.np, 
> b.a.np) c2 FROM t1 a left JOIN t2 b ON a.id=b.id where a.a is null or a.a.np 
> is null")
>     res.print()
>   }
> {code}
> from 1.15, the coalesce will be a new builtin function, and the ast looks 
> correct in version 1.15+, while before 1.15 it was rewritten as `case when`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to