Reo-LEI edited a comment on issue #2918:
URL: https://github.com/apache/iceberg/issues/2918#issuecomment-903086302
@stevenzwu @kbendick Thanks for feed back let me know yours concern. I think
I could answer your questions in here.
Although I encounter this problem when use FlinkSQL to sync cdc data to
iceberg, but I think we should keep consider more general situation about we
write cdc/upsert stream data to iceberg table, such as user custom define
source which produce cdc/upsert stream data or flink table retract/upsert data,
and not only in flink sql but also in flink streaming. So I'd like to discuss
all situation of that.
### Discussion of current iceberg
suppose we got an `ChangelogStream` need to write to iceberg table. Consider
`IcebergStreamWriter` parllelism could be config by `writeParallelism`, so we
got different parllelism variate as follow:
```
x: changelog-stream-parallelism
y: job-default-parallelism
z: iceberg-writer-parallelism
```
#### For FlinkSQL
**Case1:** x == y == z
```
ChangelogStream-x ---forward---> Filter-y ---forward--->
IcebergStreamWriter-z ---forward---> IcebergFilesCommitter
```
**Case2**: x != y == z
```
ChangelogStream-1 ---+ +---> Filter-1 ---forward--->
IcebergStreamWriter-1 ---+
... +-rebalance-+ ...
+-rebalance---> IcebergFilesCommitter
ChangelogStream-x ---+ +---> Filter-y ---forward--->
IcebergStreamWriter-z ---+
```
**Case3**: x == y != z
```
+--->
IcebergStreamWriter-1 ---+
ChangelogStream-x ---forward---> Filter-y ---rebalance-+
+-rebalance---> IcebergFilesCommitter
+--->
IcebergStreamWriter-z ---+
```
**Case4**: x != y != z
```
ChangelogStream-1 ---+ +---> Filter-1 ---+ +--->
IcebergStreamWriter-1 ---+
... +-rebalance-+ ... +-rebalance-+
... +-rebalance---> IcebergFilesCommitter
ChangelogStream-x ---+ +---> Filter-y ---+ +--->
IcebergStreamWriter-z ---+
```
In the above case, except for case 1, we can found there are at lease once
`rebalance` between `ChangelogStream` and `IcebergStreamWriter`, that will make
the change log records (which have same primary key) distributed to different
`IcebergStreamWriter` or disorder as i said before. Finally, we will get
duplicate rows or other strange result(especially Case4).
#### For FlinkStreaming
The discussions for FlinkStreaming are same as FlinkSQL, just replace
`Filter` as `Map`(the `Map` opr is come from Iceberg `FlinkSink`, that is use
to convert inpurt record to `RowData` type).
### Solution
In general,we could not decide the parallelism of `ChangelogStream`, and we
don’t know user will how to adjust the job and writer parallelism to deal with
higher traffic. In my opinion, Iceberg should ensure result correctness in all
case, no matter how the user configures the parallelism of job or opr.
So in #2898, I add `keyBy` between `Filter/Map` and `IcebergStreamWriter`,
that can ensure all same primary key record will be distributed to same
`IcebergStreamWriter` to ensure result correctness. And I trying to set
`Filter/Map` parallelism follow its input and chainig its input opr to prevent
disorder problem.
Finally, in all parallelism case as above, we can definitively get correct
DAG and ensure result correctness.
```
ChangelogStream-1 ---forward---> Filter-1 ---+ +--->
IcebergStreamWriter-1 ---+
... ... +-hash-+ ...
+-rebalance---> IcebergFilesCommitter
ChangelogStream-x ---forward---> Filter-x ---+ +--->
IcebergStreamWriter-y/z ---+
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]