Reo-LEI commented on pull request #2898:
URL: https://github.com/apache/iceberg/pull/2898#issuecomment-890355891


   > I think this may be avoided by setting `write.distribution-mode` to `hash`.
   > 
https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java#L322
   
   Actually set `write.distribution-mode = hash` can not resolve this. Because 
CDC data shoule be distributed to primary key but not partition fields. For 
example, an iceberg table has equalityFields (dt, hour, id) and partition 
fields (dt, hour). If set `write.distribution-mode = hash`, the CDC data will 
be keyBy (dt, hour), and we can not ensure same id CDC data will send to same 
`IcebergStreamWriter`. 
   
   And the second reason is , for FlinkSQL, `rowDataInput` contain a 
`FlinkCDCSource ` opr and a `Filter` opr. The `FlinkCDCSource ` opr parallelism 
is 1 and the `Filter` opr parallelism use job default parallelism. If we only 
keyBy `rowDataInput` output data to `IcebergStreamWriter`, we will get a job 
graph as follow: 
   ```
                                 +---> Filter ---+      +---> 
IcebergStreamWriter ---+
   FlinkCDCSource ---rebalance---+---> Filter ---+-hash-+---> 
IcebergStreamWriter ---+---rebalance---> IcebergFilesCommitter
                                 +---> Filter ---+      +---> 
IcebergStreamWriter ---+
   ```
   As result, the CDC data will be rebalance to `Filter`. That will make CDC 
data disorder(e.g. same `IcebergStreamWriter` will receive +U before -U), and 
we will get some strange result. So we need to keep the `Filter` have same 
parallelism to it's input opr.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to