pvary opened a new pull request, #6528: URL: https://github.com/apache/iceberg/pull/6528
Currently the Flink Iceberg commits are identifier by the `table` and the `JobID`. This could become problematic when there are multiple sinks for the same table for the same job. Let's imagine the following sequence of events: - Sink1 `notifyCheckpointComplete(99)` commits checkpoint the changes to the Iceberg table, and marks `99` as the latest checkpoint which was committed - Sink1 `notifyCheckpointComplete(99)` commits checkpoint the changes to the Iceberg table, and marks `99` as the latest checkpoint which was committed - New checkpoint is created with the id `100` - Sink1 `notifyCheckpointComplete(100)` commits checkpoint the changes to the Iceberg table, and marks `100` as the latest checkpoint which was committed - Job is cancelled - Job is restarted - Sink1 sees that the latest checkpoint which is committed is `100`, so nothing to do - Sink2 sees that the latest checkpoint which is committed is `100`, so nothing to do - Which is wrong as Sink2 never committed the relevant changes to the table To fix this added another identifier `operatorUniqueId` is added to the snapshot summary when checking for the commits, and this operatorId is also checked on recovery. The PR contains 2 additional unit tests to check the scenario. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
