Reo-LEI opened a new issue #2918:
URL: https://github.com/apache/iceberg/issues/2918
### Duplicate Rows Problem
Recently, I trying use FlinkSQL to synchronize MySQL to iceberg. Saving
MySQL/PostgreSQL to iceberg is very common case in the construction of data
warehouses. But I got duplicat rows which have same primary key in iceberg
table but not in mysql table. I will illustrate the reason of this problem as
follow.
Assume we have a sample MySQL table which just has one record:
id | data
---|---
1| aaa
And we define an ETL job by FlinkSQL as follow:
```
CREATE TABLE `mysql_table` (
`id` INT,
`data` STRING
)
WITH (
'connector' = 'mysql-cdc',
'database-name' = 'xxx',
...
)
CREATE TABLE `iceberg_table` (
`id` INT,
`data` STRING
PRIMARY KEY(exec_id) NOT ENFORCED
)
WITH (
'connector' = 'iceberg',
'catalog-type' = 'hive',
'catalog-name' = 'iceberg',
'catalog-database' = 'xxx',
'format.version' = '2'
...
)
INSERT INTO `iceberg_table``
SELECT `id`, `data` FROM `mysql_table`
```
If we don't setting the default parallelism of job, we will get the job
graph like this:
```
FlinkCDCSource ---forward---> Filter ---forward---> IcebergStreamWriter
---forward---> IcebergFilesCommitter
```
That will be work fine and not duplicat rows. because all opr parallelism is
1, and all cdc data will be distributed to one IcebergStreamWriter.
But once we setting the default parallelism of job by `SET
table.exec.resource.default-parallelism=3`, we will get another job graph as
follow:
```
+---> Filter ---forward--->
IcebergStreamWriter-1 ---+
FlinkCDCSource ---rebalance---+---> Filter ---forward--->
IcebergStreamWriter-2 ---+---rebalance---> IcebergFilesCommitter
+---> Filter ---forward--->
IcebergStreamWriter-3 ---+
```
Notice the `Filter` opr parallelism is follow the job default parallelism
and increase to 3. For iceberg `FlinkSink`, due to the `writeParallelism` is
not be set so the `IcebergStreamWriter` parallelism will follow the
`rowDataInput`(`Filter`) parallelism as 3.
The `FlinkCDCSource` parallelism will be alway 1, because binlog data need
to be send by serial. Consider Flink use `rebalance` as default shuffle
strategy. Now we can see the CDC data will be rebalance to three different
`Filter` and then emit different `IcebergStreamWriter`.
At this time, If we insert one row and update it as follow, we will get an
duplicate row.
```
INSERT INTO `mysql_table`(id, data) VALUES(2, 'bbb');
UPDATE `mysql_table` SET data = 'xxx' WHERE id = '2';
```
MySQL:
id | data
---|---
1| aaa
2| xxx
Iceberg:
id | data
---|---
1| aaa
2| bbb
2| xxx
We got the duplicate row of `<2, 'bbb'>` because the change log data wil be
rebalance to different `IcebergStreamWriter`.
For the first `INSERT` SQL, we will got a +I record, and that will be send
to the `IcebergStreamWriter-1`.
For the second `UPDATE` SQL, we will got a -U record and a +U record. The -U
record will be send to the `IcebergStreamWriter-2` and +U record wil be
rebalance to the `IcebergStreamWriter-3`
Due to `IcebergStreamWriter-2` have not any data `INSERT` before, it's
insertedRowMap is empty, and then the pos-delete(-U) will be ignore and will
not write to delete file.
Finally we got an duplicate row `<2, 'bbb'>`.
### Disorder Problem
The reason for duplicat rows is as mentioned above, but when I try to fix
that by add keyBy before `IcebergStreamWriter`, I encounter another problem.
For FlinkSQL, flink will append a `Filter` opr before call
`IcebergTableSink.getSinkRuntimeProvider` to prevenet send null data to
downstream. So, when we get the `rowDataInput` from flink in `FlinkSink`,
`rowDataInput` definitely contain a `Filter` opr which parallelism is follow
the job default parallelism as above.
In our case, if we just keyBy `equalityFields` before add
`IcebergStreamWriter`, we will get the job graph as follow:
```
+---> Filter ---+ +--->
IcebergStreamWriter-1 ---+
FlinkCDCSource ---rebalance---+---> Filter ---+-hash-+--->
IcebergStreamWriter-2 ---+---rebalance---> IcebergFilesCommitter
+---> Filter ---+ +--->
IcebergStreamWriter-3 ---+
```
Notice the CDC data come from `FlinkCDCSource` will be rebalance to `Filter`
immediately. Even if all CDC data which have same key will be emit to same
`IcebergStreamWriter`, but that will make disorder between CDC data(e.g. same
IcebergStreamWriter will receive +U before -U). And finally we will get a lot
of strange result.
### Goal
In summary, I think iceberg should ensure that data with the same primary
key will be updated and not duplicat rows when we sink CDC/UPSERT data to
iceberg.
And I trying to fix this problem by chaining the `Source` and `Filter` opr
and add keyBy before `IcebergStreamWriter` when use FlinkSQL.
And finally we can got a correct job graph like this:
```
+---> IcebergStreamWriter-1 ---+
FlinkCDCSource ---forward---> Filter ---hash-+---> IcebergStreamWriter-2
---+---rebalance---> IcebergFilesCommitter
+---> IcebergStreamWriter-3 ---+
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]