Reo-LEI opened a new pull request #2898:
URL: https://github.com/apache/iceberg/pull/2898
For CDC data, usually we will simply synchronize the data to iceberg. And
we can use FlinkSQL to do this.
Consider we use FlinkSQL define a sourc of cdc-connector and a sink of
iceberg-connector, and simply INSERT INTO cdc data from src to snk. Finally we
will get a job graph like this:
```
FlinkCDCSource ---forward---> Filter ---forward---> IcebergStreamWriter
---forward---> IcebergFilesCommitter
```
That will be work fine, because all opr parallelism is 1, all cdc data will
be distributed to one `IcebergStreamWriter`.
But once we setting the default parallelism of job, example 3 parrallelism,
we will get another job graph as follow:
```
+---> Filter ---forward--->
IcebergStreamWriter ---+
FlinkCDCSource ---rebalance---+---> Filter ---forward--->
IcebergStreamWriter ---+---rebalance---> IcebergFilesCommitter
+---> Filter ---forward--->
IcebergStreamWriter ---+
```
Now the CDC data will be distributed to three different
`IcebergStreamWriter`, because we have not keyBy the primary key. If we insert
one row and update it as follow, we will get an duplicate row.
```
// first field is id(primary key), second field is value
INSERT <1, aaa>
UPDATE <1, bbb>
```
Because the change log wil be rebalance to different `IcebergStreamWriter`,
first writer will get +I record, second writer will get -U record and last
writer will get +U record. Due to second writer's `insertedRowMap` is empty and
the pos-delete(-U) will be ignore. Finally we got an duplicate row.
I try to fix that by add keyBy before `IcebergStreamWriter` and chaining the
`Source` and `Filter` opr if use FlinkSQL.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]