Reo-LEI edited a comment on issue #2918:
URL: https://github.com/apache/iceberg/issues/2918#issuecomment-903086302


   @stevenzwu @kbendick Thanks for feed back let me know yours concern. I think 
I could answer your questions in here.
   
   Although I encounter this problem when use FlinkSQL to sync cdc data to 
iceberg, but I think we should keep consider more general situation about we 
write cdc/upsert stream data to iceberg table, such as user custom define 
source which produce cdc/upsert stream data or flink table retract/upsert data, 
and not only in flink sql but also in flink streaming. So I'd like to discuss 
all situation of that.
   
   ### Discussion of current iceberg
   suppose we got an `ChangelogStream` need to write to iceberg table. Consider 
`IcebergStreamWriter` parllelism could be config by `writeParallelism`, so we 
got different parllelism variate as follow:
   ```
   x: changelog-stream-parallelism
   y: job-default-parallelism
   z: iceberg-writer-parallelism
   ```
   #### For FlinkSQL
   **Case1:** x == y == z          
   ```       
   ChangelogStream-x ---forward---> Filter-y ---forward---> 
IcebergStreamWriter-z ---forward---> IcebergFilesCommitter
   ```
   
   **Case2**: x != y == z    
   ```            
   ChangelogStream-1 ---+           +---> Filter-1 ---forward---> 
IcebergStreamWriter-1 ---+
          ...           +-rebalance-+        ...                                
           +-rebalance---> IcebergFilesCommitter
   ChangelogStream-x ---+           +---> Filter-y ---forward---> 
IcebergStreamWriter-z ---+
   ```
   
   **Case3**: x == y != z               
   ``` 
                                                          +---> 
IcebergStreamWriter-1 ---+
   ChangelogStream-x ---forward---> Filter-y ---rebalance-+                     
         +-rebalance---> IcebergFilesCommitter
                                                          +---> 
IcebergStreamWriter-z ---+
   ```
   
   **Case4**: x != y != z                
   ```
   ChangelogStream-1 ---+           +---> Filter-1 ---+           +---> 
IcebergStreamWriter-1 ---+
          ...           +-rebalance-+        ...      +-rebalance-+            
...               +-rebalance---> IcebergFilesCommitter
   ChangelogStream-x ---+           +---> Filter-y ---+           +---> 
IcebergStreamWriter-z ---+
   ```
   
   In the above case, except for case 1, we can found there are at lease once 
`rebalance` between `ChangelogStream` and `IcebergStreamWriter`, that will make 
the change log records (which have same primary key) distributed to different 
`IcebergStreamWriter` or disorder as i said before. Finally, we will get 
duplicate rows or other strange result(especially Case4).
   
   #### For FlinkStreaming
   The discussions for FlinkStreaming are same as FlinkSQL, just replace 
`Filter` as `Map`(the `Map` opr is come from Iceberg `FlinkSink`, that is use 
to convert inpurt record to `RowData` type).
   
   ### Solution
   In general,we could not decide the parallelism of `ChangelogStream`, and we 
don’t know user will how to adjust the job and writer parallelism to deal with 
higher traffic. In my opinion, Iceberg should ensure result correctness in all 
case, no matter how the user configures the parallelism of job or opr.
   
   So in #2898, I add `keyBy` between `Filter/Map` and `IcebergStreamWriter`, 
that can ensure all same primary key record will be distributed to same 
`IcebergStreamWriter` to ensure result correctness. And I trying to set 
`Filter/Map` parallelism follow its input and chainig its input opr to prevent 
disorder problem. 
   
   Finally, in all parallelism case as above, we can definitively get correct 
DAG and ensure result correctness.
   ```
   ChangelogStream-1 ---forward---> Filter-1 ---+      +---> 
IcebergStreamWriter-1   ---+
         ...                          ...       +-hash-+             ...        
        +-rebalance---> IcebergFilesCommitter
   ChangelogStream-x ---forward---> Filter-x ---+      +---> 
IcebergStreamWriter-y/z ---+
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to