pvary opened a new pull request, #6528:
URL: https://github.com/apache/iceberg/pull/6528

   Currently the Flink Iceberg commits are identifier by the `table` and the 
`JobID`. This could become problematic when there are multiple sinks for the 
same table for the same job.
   
   Let's imagine the following sequence of events:
   - Sink1 `notifyCheckpointComplete(99)` commits checkpoint the changes to the 
Iceberg table, and marks `99` as the latest checkpoint which was committed
   - Sink1 `notifyCheckpointComplete(99)` commits checkpoint the changes to the 
Iceberg table, and marks `99` as the latest checkpoint which was committed
   - New checkpoint is created with the id `100`
   - Sink1 `notifyCheckpointComplete(100)` commits checkpoint the changes to 
the Iceberg table, and marks `100` as the latest checkpoint which was committed
   - Job is cancelled
   - Job is restarted
   - Sink1 sees that the latest checkpoint which is committed is `100`, so 
nothing to do
   - Sink2 sees that the latest checkpoint which is committed is `100`, so 
nothing to do - Which is wrong as Sink2 never committed the relevant changes to 
the table
   
   To fix this added another identifier `operatorUniqueId` is added to the 
snapshot summary when checking for the commits, and this operatorId is also 
checked on recovery.
   
   The PR contains 2 additional unit tests to check the scenario.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to