I also forgot to mention that a fix must also be made to Flink 2.0.1, which is still unreleased.
On Wed, Sep 17, 2025 at 10:55 AM Maximilian Michels <m...@apache.org> wrote: > > I've created a JIRA to track this issue for the upcoming 1.19.4 and > 1.20.3 releases: https://issues.apache.org/jira/browse/FLINK-38370. > > On Wed, Sep 17, 2025 at 10:52 AM Maximilian Michels <m...@apache.org> wrote: > > > > Hi, > > > > When working on https://github.com/apache/iceberg/pull/13714, Steven > > and I noticed changed behavior in Flink 2.1.0 with the > > CommiterOperator, which is also present in Flink 1.20.2 and 1.19.3. > > > > I don't see a problem pushing this change to Flink 2.1.0, since it is > > a minor release. However, I noticed that 1.19.3 and 1.20.2 are now > > subject to data loss with Iceberg. > > > > What is the issue? > > ============== > > > > In a nutshell, the CommiterOperator (V2 Sink) does not commit all > > pending committables for batch pipelines anymore, but only for > > current_checkpoint_id+1. This is caused specifically by this line: > > https://github.com/apache/flink/blob/5ad464d8156b5094e0aba7712e4aa22b9f44e84d/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java#L154 > > > > In Iceberg, we use `Long.MAX_VALUE` for the final checkpoint id: > > https://github.com/apache/iceberg/blob/ee90c10e39cec0ccceb9425e03a3e0b5690daf3b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java#L79. > > This value was generally used by other connectors in Flink. This > > behavior was changed in > > https://issues.apache.org/jira/browse/FLINK-37605 / > > https://github.com/apache/flink/pull/26433. > > > > Since committables using `Long.MAX_VALUE` won't be committed, the > > Iceberg table won't be updated on shutdown, which means it will not > > contain any data! That results in potential data loss. > > > > How to fix this issue? > > ================ > > > > For Flink 2.1.0+, we can generate committables with `checkpoint = > > last_checkpoint_id + 1`. We already took this approach in > > https://github.com/apache/iceberg/pull/13714. > > > > That is fine for the upcoming Iceberg version for Flink 2.1.0, but > > users who use the current Iceberg version with 1.19.2 or 1.20.1 will > > be up for a surprise when they upgrade to the latest patch release > > 1.19.3 or 1.20.2. They will experience data loss in batch pipelines. > > While we may want to keep the other changes in FLINK-37605, I think we > > need to include a fix for the Flink 1.19.4 and Flink 1.20.3 release to > > ensure we commit all pending committables in batch pipelines. > > > > I'm curious to hear if you think there is another way to resolve this. > > > > Beyond this issue > > ============= > > > > The overarching question of all of this is, why are we pushing these > > types of invasive changes to patch releases? I think the change could > > have used another pair of eyes to assess the potential impact. > > > > Another thing which we could have done is to run the Iceberg tests > > during release testing, which would have uncovered this bug. I'll > > definitely check that for the next release. > > > > Cheers and thanks, > > Max