[
https://issues.apache.org/jira/browse/HUDI-8996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Geser Dugarov updated HUDI-8996:
--------------------------------
Description:
{code:sql}
CREATE TABLE hudi_debug (
id INT,
part INT,
desc STRING,
PRIMARY KEY (id) NOT ENFORCED
) PARTITIONED BY (`part`)
WITH (
'connector' = 'hudi',
'path' = '.../hudi_debug',
'compaction.schedule.enabled'='false',
'compaction.async.enabled'='false',
'clean.async.enabled'='false',
'write.tasks'='1',
'read.tasks'='1',
'table.type' = 'MERGE_ON_READ',
'write.operation' = 'upsert',
'index.global.enabled' = 'true'
);
{code}
{code:sql}
INSERT INTO hudi_debug VALUES
(1,100,'aaa'),
(2,200,'bbb');
{code}
And then I want to upsert into existed table:
{code:sql}
INSERT INTO hudi_debug VALUES
(1,111,'aaa_new'),
(2,200,'bbb_new');
{code}
>From code for {code:java}BucketAssignFunction::processRecord{code} I expect
>that delete record will be generated, and
{code:sql}
SELECT * FROM hudi_debug;
{code}
will give
{noformat}
id part desc
2 200 bbb_new
1 111 aaa_new
{noformat}
But I got:
{noformat}
id part desc
2 200 bbb_new
1 111 aaa_new
1 100 aaa
{noformat}
was:
```SQL
CREATE TABLE hudi_debug (
id INT,
part INT,
desc STRING,
PRIMARY KEY (id) NOT ENFORCED
) PARTITIONED BY (`part`)
WITH (
'connector' = 'hudi',
'path' = '.../hudi_debug',
'compaction.schedule.enabled'='false',
'compaction.async.enabled'='false',
'clean.async.enabled'='false',
'write.tasks'='1',
'read.tasks'='1',
'table.type' = 'MERGE_ON_READ',
'write.operation' = 'upsert',
'index.global.enabled' = 'true'
);
INSERT INTO hudi_debug VALUES
(1,100,'aaa'),
(2,200,'bbb');
```
And then I want to upsert into existed table:
```SQL
INSERT INTO hudi_debug VALUES
(1,111,'aaa_new'),
(2,200,'bbb_new');
```
>From code for `BucketAssignFunction::processRecord` I expect that delete
>record will be generated, and
```SQL
SELECT * FROM hudi_debug;
```
will give
```text
id part desc
2 200 bbb_new
1 111 aaa_new
```
But I got:
```text
id part desc
2 200 bbb_new
1 111 aaa_new
1 100 aaa
```
```
> No delete records for Flink upsert if partition path changed
> ------------------------------------------------------------
>
> Key: HUDI-8996
> URL: https://issues.apache.org/jira/browse/HUDI-8996
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: Geser Dugarov
> Assignee: Geser Dugarov
> Priority: Major
>
> {code:sql}
> CREATE TABLE hudi_debug (
> id INT,
> part INT,
> desc STRING,
> PRIMARY KEY (id) NOT ENFORCED
> ) PARTITIONED BY (`part`)
> WITH (
> 'connector' = 'hudi',
> 'path' = '.../hudi_debug',
> 'compaction.schedule.enabled'='false',
> 'compaction.async.enabled'='false',
> 'clean.async.enabled'='false',
> 'write.tasks'='1',
> 'read.tasks'='1',
> 'table.type' = 'MERGE_ON_READ',
> 'write.operation' = 'upsert',
> 'index.global.enabled' = 'true'
> );
> {code}
> {code:sql}
> INSERT INTO hudi_debug VALUES
> (1,100,'aaa'),
> (2,200,'bbb');
> {code}
> And then I want to upsert into existed table:
> {code:sql}
> INSERT INTO hudi_debug VALUES
> (1,111,'aaa_new'),
> (2,200,'bbb_new');
> {code}
> From code for {code:java}BucketAssignFunction::processRecord{code} I expect
> that delete record will be generated, and
> {code:sql}
> SELECT * FROM hudi_debug;
> {code}
> will give
> {noformat}
> id part desc
> 2 200 bbb_new
> 1 111 aaa_new
> {noformat}
> But I got:
> {noformat}
> id part desc
> 2 200 bbb_new
> 1 111 aaa_new
> 1 100 aaa
> {noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)