Reo-LEI opened a new issue #2918:
URL: https://github.com/apache/iceberg/issues/2918


   ### Duplicate Rows Problem
   
   Recently, I trying use FlinkSQL to synchronize MySQL to iceberg. Saving 
MySQL/PostgreSQL to iceberg is very common case in the construction of data 
warehouses.  But I got duplicat rows which have same primary key in iceberg 
table but not in mysql table. I will illustrate the reason of this problem as 
follow.
   
   Assume we have a sample MySQL table which just has one record:
   id | data
   ---|---
   1| aaa
   
   And we define an ETL job by FlinkSQL as follow:
   ```
   CREATE TABLE `mysql_table` (
       `id` INT, 
       `data` STRING
   )
   WITH (
       'connector' = 'mysql-cdc', 
       'database-name' = 'xxx', 
       ...
   )
   
   CREATE TABLE `iceberg_table` (
       `id` INT, 
       `data` STRING
       PRIMARY KEY(exec_id) NOT ENFORCED
   )
   WITH (
       'connector' = 'iceberg', 
       'catalog-type' = 'hive', 
       'catalog-name' = 'iceberg', 
       'catalog-database' = 'xxx', 
       'format.version' = '2'
       ...
   )
   
   INSERT INTO `iceberg_table``
   SELECT `id`, `data` FROM `mysql_table`
   ```
   
   If we don't setting the default parallelism of job, we will get the job 
graph like this:
   ```
   FlinkCDCSource ---forward---> Filter ---forward---> IcebergStreamWriter 
---forward---> IcebergFilesCommitter
   ```
   That will be work fine and not duplicat rows. because all opr parallelism is 
1, and all cdc data will be distributed to one IcebergStreamWriter.
   
   But once we setting the default parallelism of job by `SET 
table.exec.resource.default-parallelism=3`, we will get another job graph as 
follow:
   ```
                                 +---> Filter ---forward---> 
IcebergStreamWriter-1 ---+
   FlinkCDCSource ---rebalance---+---> Filter ---forward---> 
IcebergStreamWriter-2 ---+---rebalance---> IcebergFilesCommitter
                                 +---> Filter ---forward---> 
IcebergStreamWriter-3 ---+
   ```
   Notice the `Filter` opr parallelism is follow the job default parallelism 
and increase to 3.  For iceberg `FlinkSink`, due to the `writeParallelism` is 
not be set so the `IcebergStreamWriter` parallelism will follow the 
`rowDataInput`(`Filter`) parallelism as 3.
   The `FlinkCDCSource` parallelism will be alway 1, because binlog data need 
to be send by serial.  Consider Flink use `rebalance` as default shuffle 
strategy. Now we can see the CDC data will be rebalance to three different 
`Filter` and then emit different `IcebergStreamWriter`. 
   
   At this time, If we insert one row and update it as follow, we will get an 
duplicate row.
   ```
   INSERT INTO `mysql_table`(id, data) VALUES(2, 'bbb');
   UPDATE `mysql_table` SET data = 'xxx' WHERE id = '2';
   ```
   MySQL:
   id | data
   ---|---
   1| aaa
   2| xxx
   
   Iceberg:
   id | data
   ---|---
   1| aaa
   2| bbb
   2| xxx
   
   We got the duplicate row of `<2, 'bbb'>` because the change log data wil be 
rebalance to different `IcebergStreamWriter`. 
   For the first `INSERT` SQL, we will got a +I record, and that will be send 
to the `IcebergStreamWriter-1`.
   For the second `UPDATE` SQL, we will got a -U record and a +U record. The -U 
record will be send to the `IcebergStreamWriter-2` and +U record wil be 
rebalance to the `IcebergStreamWriter-3` 
   Due to `IcebergStreamWriter-2` have not any data `INSERT` before, it's 
insertedRowMap is empty, and then the pos-delete(-U) will be ignore and will 
not write to delete file. 
   Finally we got an duplicate row `<2, 'bbb'>`.
   
   ### Disorder Problem
   The reason for duplicat rows is as mentioned above, but when I try to fix 
that by add keyBy before `IcebergStreamWriter`, I encounter another problem.
   For FlinkSQL, flink will append a `Filter` opr before call 
`IcebergTableSink.getSinkRuntimeProvider` to prevenet send null data to 
downstream.  So, when we get the `rowDataInput` from flink in `FlinkSink`, 
`rowDataInput` definitely contain a `Filter` opr which parallelism is follow 
the job default parallelism as above. 
   
   In our case, if we just keyBy `equalityFields` before add 
`IcebergStreamWriter`, we will get the job graph as follow: 
   ```
                                 +---> Filter ---+      +---> 
IcebergStreamWriter-1 ---+
   FlinkCDCSource ---rebalance---+---> Filter ---+-hash-+---> 
IcebergStreamWriter-2 ---+---rebalance---> IcebergFilesCommitter
                                 +---> Filter ---+      +---> 
IcebergStreamWriter-3 ---+
   ```
   Notice the CDC data come from `FlinkCDCSource` will be rebalance to `Filter` 
immediately.  Even if all CDC data which have same key will be emit to same 
`IcebergStreamWriter`, but that will make disorder between CDC data(e.g. same 
IcebergStreamWriter will receive +U before -U). And finally we will get a lot 
of strange result. 
   
   ### Goal
   In summary, I think iceberg should ensure that data with the same primary 
key will be updated and not duplicat rows when we sink CDC/UPSERT data to 
iceberg. 
   And I trying to fix this problem by chaining the `Source` and `Filter` opr 
and add keyBy before `IcebergStreamWriter` when use FlinkSQL.
   And finally we can got a correct job graph like this:
   ```
                                                +---> IcebergStreamWriter-1 ---+
   FlinkCDCSource ---forward---> Filter ---hash-+---> IcebergStreamWriter-2 
---+---rebalance---> IcebergFilesCommitter
                                                +---> IcebergStreamWriter-3 ---+
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to