Reo-LEI commented on pull request #2898: URL: https://github.com/apache/iceberg/pull/2898#issuecomment-890355891
> I think this may be avoided by setting `write.distribution-mode` to `hash`. > https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java#L322 Actually set `write.distribution-mode = hash` can not resolve this. Because CDC data shoule be distributed to primary key but not partition fields. For example, an iceberg table has equalityFields (dt, hour, id) and partition fields (dt, hour). If set `write.distribution-mode = hash`, the CDC data will be keyBy (dt, hour), and we can not ensure same id CDC data will send to same `IcebergStreamWriter`. And the second reason is , for FlinkSQL, `rowDataInput` contain a `FlinkCDCSource ` opr and a `Filter` opr. The `FlinkCDCSource ` opr parallelism is 1 and the `Filter` opr parallelism use job default parallelism. If we only keyBy `rowDataInput` output data to `IcebergStreamWriter`, we will get a job graph as follow: ``` +---> Filter ---+ +---> IcebergStreamWriter ---+ FlinkCDCSource ---rebalance---+---> Filter ---+-hash-+---> IcebergStreamWriter ---+---rebalance---> IcebergFilesCommitter +---> Filter ---+ +---> IcebergStreamWriter ---+ ``` As result, the CDC data will be rebalance to `Filter`. That will make CDC data disorder(e.g. same `IcebergStreamWriter` will receive +U before -U), and we will get some strange result. So we need to keep the `Filter` have same parallelism to it's input opr. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
