[
https://issues.apache.org/jira/browse/FLINK-31830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730846#comment-17730846
]
Jane Chan commented on FLINK-31830:
-----------------------------------
It takes some time to reason out the cause, and sorry for the late update.
h4. 1. Identify the problem
The issue has been reproduced using Flink release-1.14.6 (which depends on
calcite-1.26).
h4. 2. Why SQL gets a correct plan, while API doesn't
First, the resolved schema differs. You can verify this by the following code
snippet.
{code:scala}
@Test
def testSchema(): Unit = {
// create temporary table t1
val tableDescriptor = TableDescriptor.forConnector("datagen")
.schema(Schema.newBuilder
.column("id", DataTypes.INT.notNull)
.column("a", DataTypes.ROW(DataTypes.FIELD("np",
DataTypes.INT.notNull())).nullable)
.build)
.build
tableEnv.createTemporaryTable("t1", tableDescriptor)
// create temporary table t2
tableEnv.executeSql(
s"""
|create temporary table t2 (
| id int not null,
| a row<np int not null>
|) with (
| 'connector' = 'datagen'
|)
|""".stripMargin)
val catalogManager =
tableEnv.asInstanceOf[StreamTableEnvironmentImpl].getCatalogManager
val result1 =
catalogManager.getTable(ObjectIdentifier.of("default_catalog",
"default_database", "t1"))
val result2 =
catalogManager.getTable(ObjectIdentifier.of("default_catalog",
"default_database", "t2"))
println(result1.get().getResolvedSchema)
println(result2.get().getResolvedSchema)
}
{code}
The result will be
{code:sql}
--result1
(
`id` INT NOT NULL,
`a` ROW<`np` INT NOT NULL>
)
--result2
(
`id` INT NOT NULL,
`a` ROW<`np` INT> -- changed by
org.apache.calcite.sql.SqlDataTypeSpec#fixNullability
)
{code}
You can tell from the print result that the nullability specified by the user
does not get respect. However, this is a by-design behavior for Calcite. The
community has an in-depth discussion CALCITE-2464 on the semantics of setting
nullability for structured type.
TL;DR
!image-2023-06-09-15-06-01-322.png|width=844,height=219!
This feature is introduced in Calcite 1.19. As a result, the DDL `a` ROW<`np`
INT NOT NULL> will be rewritten to `a` ROW<`np` INT> during the
SQL-to-operation conversion. (Please check
org.apache.calcite.sql.SqlDataTypeSpec#fixNullability for more details).
As for the plan, I assume based on the schema produced by API, the optimization
rules work as expected.
( `id` INT NOT NULL, `a` ROW<`np` INT NOT NULL> )
The filter condition after join
where a.a is null or a.a.np is null
can be reduced and pushdown as
where a.a is null
since the nullability of np is false.
And RemoveUnreachableCoalesceArgumentsRule matches the following case
COALESCE(a.a.np, b.a.np) c1
because a.a.np is never nullability, so the invocation of coalesce is reduced.
!image-2023-06-09-15-21-13-720.png|width=892,height=575!
h4. 3. Conclusion
So in a nutshell, we should align the structured type nullability created
through API with SQL. Last but not least, we should improve the document and
add a description of the nullability of structured type, o.w. it might not be
straightforward for users to understand.
h4.
> Coalesce on nested fields with different nullabilities will get wrong plan
> --------------------------------------------------------------------------
>
> Key: FLINK-31830
> URL: https://issues.apache.org/jira/browse/FLINK-31830
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.14.6
> Reporter: lincoln lee
> Assignee: Jane Chan
> Priority: Major
> Attachments: image-2023-06-09-15-06-01-322.png,
> image-2023-06-09-15-21-13-720.png
>
>
> A test case similar to FLINK-31829, only changes the nullable field `a.np` to
> not null, will get a wrong plan in 1.14.x (reported from the community user):
> {code}
> @Test
> def testCoalesceOnNestedColumns(): Unit = {
> val tEnv = util.tableEnv
> val tableDescriptor = TableDescriptor.forConnector("datagen")
> .schema(Schema.newBuilder
> .column("id", DataTypes.INT.notNull)
> .column("a", DataTypes.ROW(DataTypes.FIELD("np",
> DataTypes.INT.notNull())).nullable)
> .build)
> .build
> tEnv.createTemporaryTable("t1", tableDescriptor)
> tEnv.createTemporaryTable("t2", tableDescriptor)
> val res = tEnv.executeSql("EXPLAIN SELECT a.id, COALESCE(a.a.np, b.a.np)
> c1, IFNULL(a.a.np, b.a.np) c2 FROM t1 a left JOIN t2 b ON a.id=b.id where a.a
> is null or a.a.np is null")
> res.print()
> }
> == Abstract Syntax Tree ==
> LogicalProject(id=[$0], c1=[CAST($1.np):INTEGER], c2=[IFNULL($1.np, $3.np)])
> +- LogicalFilter(condition=[OR(IS NULL($1), IS NULL(CAST($1.np):INTEGER))])
> +- LogicalJoin(condition=[=($0, $2)], joinType=[left])
> :- LogicalTableScan(table=[[default_catalog, default_database, t1]])
> +- LogicalTableScan(table=[[default_catalog, default_database, t2]])
> {code}
> the top project in the ast is wrong: `LogicalProject(id=[$0],
> c1=[CAST($1.np):INTEGER], c2=[IFNULL($1.np, $3.np)])`, the
> `c1=[CAST($1.np):INTEGER]` relate to `COALESCE(a.a.np, b.a.np) c1` is
> incorrect,
> but this works fine when using sql ddl to create tables
> {code}
> @Test
> def testCoalesceOnNestedColumns2(): Unit = {
> val tEnv = util.tableEnv
> tEnv.executeSql(
> s"""
> |create temporary table t1 (
> | id int not null,
> | a row<np int not null>
> |) with (
> | 'connector' = 'datagen'
> |)
> |""".stripMargin)
> tEnv.executeSql(
> s"""
> |create temporary table t2 (
> | id int not null,
> | a row<np int not null>
> |) with (
> | 'connector' = 'datagen'
> |)
> |""".stripMargin)
> val res = tEnv.executeSql(
> "EXPLAIN SELECT a.id, COALESCE(a.a.np, b.a.np) c1, IFNULL(a.a.np,
> b.a.np) c2 FROM t1 a left JOIN t2 b ON a.id=b.id where a.a is null or a.a.np
> is null")
> res.print()
> }
> {code}
> from 1.15, the coalesce will be a new builtin function, and the ast looks
> correct in version 1.15+, while before 1.15 it was rewritten as `case when`
--
This message was sent by Atlassian Jira
(v8.20.10#820010)