[ 
https://issues.apache.org/jira/browse/FLINK-36043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17873230#comment-17873230
 ] 

lincoln lee commented on FLINK-36043:
-------------------------------------

[~catyee] [~yunta] Intuitively this is a possible optimization point, but in 
practice it is not that simple.

To give a counterexample of the `coalesce` function, for two rows of data like 
this:
a, b
0, 1
null,1

After `coalesce(a, 0), b` will get
0, 1
0, 1

Clearly, this no longer satisfies the semantic requirements of upsertKey.
I've considered similar derivation optimizations, but on reflection I've found 
that the actual extended derivation is extremely limited(The derivation of 
upsertKey is similar to primary key, and needs to fulfill the functional 
dependency 
[https://opentextbc.ca/dbdesign01/chapter/chapter-11-functional-dependencies/).]

 

But for your example, a workaround would be to rewrite the sql to advance 
coalesce before group by:

{code}

Flink SQL> explain insert into sink_table
> select
>   col1,
>   col2,
>   count(col3) as col3_cnt,
>   max(col4) as col4_max
> from (select coalesce(col1, 0) col1, col2, col3, col4 from source_table)
> group by col1,col2;

{code}

 

> [SQL] Unexpected SinkMaterializer operator generated when use coalesce 
> function on upsert key fields
> ----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-36043
>                 URL: https://issues.apache.org/jira/browse/FLINK-36043
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.20.0
>            Reporter: Yuan Kui
>            Priority: Major
>         Attachments: image-2024-08-13-16-40-27-929.png
>
>
> As deigned, the SinkMaterializer operator should not be generated when the 
> upsert keys are the same as the primary keys. Example:
> {code:java}
> -- source table
> create table source_table (
>    col1 int,
>    col2 int,
>    col3 int,
>    col4 int
> ) with (
>    'connector' = 'datagen',
>    'rows-per-second'='10'
> );
> -- sink table
> create table sink_table(
>    col1 int,
>    col2 int,
>    col3_cnt bigint,
>    col4_max int,
>    primary key(col1, col2) not enforced
> ) with (
>    'connector' = 'blackhole'
> );
> -- sql
> insert into sink_table
> select
>   col1,
>   col2,
>   count(col3) as col3_cnt,
>   max(col4) as col4_max
> from source_table
> group by col1,col2;{code}
> It works well, and the excution plan has no SinkMaterializer operator:
> {code:java}
> == Optimized Execution Plan ==
> Sink(table=[default_catalog.default_database.sink_table], fields=[col1, col2, 
> col3_cnt, col4_max])
> +- GroupAggregate(groupBy=[col1, col2], select=[col1, col2, COUNT(col3) AS 
> col3_cnt, MAX(col4) AS col4_max])
>    +- Exchange(distribution=[hash[col1, col2]])
>       +- TableSourceScan(table=[[default_catalog, default_database, 
> source_table]], fields=[col1, col2, col3, col4])  {code}
> however, if we use coalesce function on upsert keys, such as:
> {code:java}
> insert into sink_table
> select
>   -- use coalesce
>   coalesce(col1, 0) as col1, 
>   col2,
>   count(col3) as col3_cnt,
>   max(col4) as col4_max
> from source_table
> group by col1,col2; {code}
> the SinkMaterializer operator will be generated:
> {code:java}
> == Optimized Execution Plan ==
> Sink(table=[default_catalog.default_database.sink_table], fields=[col1, col2, 
> col3_cnt, col4_max], upsertMaterialize=[true])
> +- Calc(select=[coalesce(col1, 0) AS col1, col2, col3_cnt, col4_max])
>    +- GroupAggregate(groupBy=[col1, col2], select=[col1, col2, COUNT(col3) AS 
> col3_cnt, MAX(col4) AS col4_max])
>       +- Exchange(distribution=[hash[col1, col2]])
>          +- TableSourceScan(table=[[default_catalog, default_database, 
> source_table]], fields=[col1, col2, col3, col4]) {code}
> Changing `coalesce(col1, 0)` to `if(col1 is null, 0, col1)` will meet the 
> same problem.
>  
> The code for determining whether a SinkMaterializer operator should be 
> generated is in 
> `org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.SatisfyUpdateKindTraitVisitor#analyzeUpsertMaterializeStrategy`.
>  If making a point in line 881 like this:
> !image-2024-08-13-16-40-27-929.png!
> I found the changeLogUpsertKeys are empty, which lead to 'whether to generate 
> SinkMaterializer'  always true.
> Is that by design or a bug?
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to