[
https://issues.apache.org/jira/browse/FLINK-36043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yuan Kui updated FLINK-36043:
-----------------------------
Description:
As deigned, the SinkMaterializer operator should not be generated when the
upsert keys are the same as the primary keys. Example:
{code:java}
-- source table
create table source_table (
col1 int,
col2 int,
col3 int,
col4 int
) with (
'connector' = 'datagen',
'rows-per-second'='10'
);
-- sink table
create table sink_table(
col1 int,
col2 int,
col3_cnt bigint,
col4_max int,
primary key(col1, col2) not enforced
) with (
'connector' = 'blackhole'
);
-- sql
insert into sink_table
select
col1,
col2,
count(col3) like col3_cnt,
max(col4) like col4_max
from source_table
group by col1,col2;{code}
It works well, and the excution plan has no SinkMaterializer operator:
{code:java}
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.sink_table], fields=[col1, col2,
col3_cnt, col4_max])
+- GroupAggregate(groupBy=[col1, col2], select=[col1, col2, COUNT(col3) AS
col3_cnt, MAX(col4) AS col4_max])
+- Exchange(distribution=[hash[col1, col2]])
+- TableSourceScan(table=[[default_catalog, default_database,
source_table]], fields=[col1, col2, col3, col4]) {code}
however, if we use coalesce function on upsert keys, such as:
{code:java}
insert into sink_table
select
-- use coalesce
coalesce(col1, 0) as col1,
col2,
count(col3) like col3_cnt,
max(col4) like col4_max
from source_table
group by col1,col2; {code}
the SinkMaterializer operator will be generated:
{code:java}
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.sink_table], fields=[col1, col2,
col3_cnt, col4_max], upsertMaterialize=[true])
+- Calc(select=[coalesce(col1, 0) AS col1, col2, col3_cnt, col4_max])
+- GroupAggregate(groupBy=[col1, col2], select=[col1, col2, COUNT(col3) AS
col3_cnt, MAX(col4) AS col4_max])
+- Exchange(distribution=[hash[col1, col2]])
+- TableSourceScan(table=[[default_catalog, default_database,
source_table]], fields=[col1, col2, col3, col4]) {code}
Changing `coalesce(col1, 0)` to `if(col1 is null, 0, col1)` will meet the same
problem.
The code for determining whether a SinkMaterializer operator should be
generated is in
`org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.SatisfyUpdateKindTraitVisitor#analyzeUpsertMaterializeStrategy`.
If making a point in line 881 like this:
!image-2024-08-13-16-40-27-929.png!
I found the changeLogUpsertKeys are empty, which lead to 'whether to generate
SinkMaterializer' always true.
Is that by designed or a bug?
was:
As deigned, the SinkMaterializer operator should not be generated when the
upsert keys are the same as the primary keys. Example:
{code:java}
-- source table
create table source_table (
col1 int,
col2 int,
col3 int,
col4 int
) with (
'connector' = 'datagen',
'rows-per-second'='10'
);
-- sink table
create table sink_table(
col1 int,
col2 int,
col3_cnt bigint,
col4_max int,
primary key(col1, col2) not enforced
) with (
'connector' = 'blackhole'
);
-- sql
insert into sink_table
select
col1,
col2,
count(col3) like col3_cnt,
max(col4) like col4_max
from source_table
group by col1,col2;{code}
It works well, and the excution plan has no SinkMaterializer operator:
{code:java}
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.sink_table], fields=[col1, col2,
col3_cnt, col4_max])
+- GroupAggregate(groupBy=[col1, col2], select=[col1, col2, COUNT(col3) AS
col3_cnt, MAX(col4) AS col4_max])
+- Exchange(distribution=[hash[col1, col2]])
+- TableSourceScan(table=[[default_catalog, default_database,
source_table]], fields=[col1, col2, col3, col4]) {code}
however, if we use coalesce function on upsert keys, such as:
{code:java}
insert into sink_table
select
-- use coalesce
coalesce(col1, 0) as col1,
col2,
count(col3) like col3_cnt,
max(col4) like col4_max
from source_table
group by col1,col2; {code}
the SinkMaterializer operator will be generated:
{code:java}
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.sink_table], fields=[col1, col2,
col3_cnt, col4_max], upsertMaterialize=[true])
+- Calc(select=[coalesce(col1, 0) AS col1, col2, col3_cnt, col4_max])
+- GroupAggregate(groupBy=[col1, col2], select=[col1, col2, COUNT(col3) AS
col3_cnt, MAX(col4) AS col4_max])
+- Exchange(distribution=[hash[col1, col2]])
+- TableSourceScan(table=[[default_catalog, default_database,
source_table]], fields=[col1, col2, col3, col4]) {code}
If changing `coalesce(col1, 0)` to `if(col1 is null, 0, col1)`, meet the same
problem.
The code for determining whether a SinkMaterializer operator should be
generated is in
`org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.SatisfyUpdateKindTraitVisitor#analyzeUpsertMaterializeStrategy`.
If making a point in line 881 like this:
!image-2024-08-13-16-40-27-929.png!
I found the changeLogUpsertKeys are empty, which lead to 'whether to generate
SinkMaterializer' always true.
Is that by designed or a bug?
> [SQL] Unexpected SinkMaterializer operator generated when use coalesce
> function on upsert key fields
> ----------------------------------------------------------------------------------------------------
>
> Key: FLINK-36043
> URL: https://issues.apache.org/jira/browse/FLINK-36043
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.20.0
> Reporter: Yuan Kui
> Priority: Major
> Attachments: image-2024-08-13-16-40-27-929.png
>
>
> As deigned, the SinkMaterializer operator should not be generated when the
> upsert keys are the same as the primary keys. Example:
> {code:java}
> -- source table
> create table source_table (
> col1 int,
> col2 int,
> col3 int,
> col4 int
> ) with (
> 'connector' = 'datagen',
> 'rows-per-second'='10'
> );
> -- sink table
> create table sink_table(
> col1 int,
> col2 int,
> col3_cnt bigint,
> col4_max int,
> primary key(col1, col2) not enforced
> ) with (
> 'connector' = 'blackhole'
> );
> -- sql
> insert into sink_table
> select
> col1,
> col2,
> count(col3) like col3_cnt,
> max(col4) like col4_max
> from source_table
> group by col1,col2;{code}
> It works well, and the excution plan has no SinkMaterializer operator:
> {code:java}
> == Optimized Execution Plan ==
> Sink(table=[default_catalog.default_database.sink_table], fields=[col1, col2,
> col3_cnt, col4_max])
> +- GroupAggregate(groupBy=[col1, col2], select=[col1, col2, COUNT(col3) AS
> col3_cnt, MAX(col4) AS col4_max])
> +- Exchange(distribution=[hash[col1, col2]])
> +- TableSourceScan(table=[[default_catalog, default_database,
> source_table]], fields=[col1, col2, col3, col4]) {code}
> however, if we use coalesce function on upsert keys, such as:
> {code:java}
> insert into sink_table
> select
> -- use coalesce
> coalesce(col1, 0) as col1,
> col2,
> count(col3) like col3_cnt,
> max(col4) like col4_max
> from source_table
> group by col1,col2; {code}
> the SinkMaterializer operator will be generated:
> {code:java}
> == Optimized Execution Plan ==
> Sink(table=[default_catalog.default_database.sink_table], fields=[col1, col2,
> col3_cnt, col4_max], upsertMaterialize=[true])
> +- Calc(select=[coalesce(col1, 0) AS col1, col2, col3_cnt, col4_max])
> +- GroupAggregate(groupBy=[col1, col2], select=[col1, col2, COUNT(col3) AS
> col3_cnt, MAX(col4) AS col4_max])
> +- Exchange(distribution=[hash[col1, col2]])
> +- TableSourceScan(table=[[default_catalog, default_database,
> source_table]], fields=[col1, col2, col3, col4]) {code}
> Changing `coalesce(col1, 0)` to `if(col1 is null, 0, col1)` will meet the
> same problem.
>
> The code for determining whether a SinkMaterializer operator should be
> generated is in
> `org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.SatisfyUpdateKindTraitVisitor#analyzeUpsertMaterializeStrategy`.
> If making a point in line 881 like this:
> !image-2024-08-13-16-40-27-929.png!
> I found the changeLogUpsertKeys are empty, which lead to 'whether to generate
> SinkMaterializer' always true.
> Is that by designed or a bug?
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)