I also forgot to mention that a fix must also be made to Flink 2.0.1,
which is still unreleased.


On Wed, Sep 17, 2025 at 10:55 AM Maximilian Michels <m...@apache.org> wrote:
>
> I've created a JIRA to track this issue for the upcoming 1.19.4 and
> 1.20.3 releases: https://issues.apache.org/jira/browse/FLINK-38370.
>
> On Wed, Sep 17, 2025 at 10:52 AM Maximilian Michels <m...@apache.org> wrote:
> >
> > Hi,
> >
> > When working on https://github.com/apache/iceberg/pull/13714, Steven
> > and I noticed changed behavior in Flink 2.1.0 with the
> > CommiterOperator, which is also present in Flink 1.20.2 and 1.19.3.
> >
> > I don't see a problem pushing this change to Flink 2.1.0, since it is
> > a minor release. However, I noticed that 1.19.3 and 1.20.2 are now
> > subject to data loss with Iceberg.
> >
> > What is the issue?
> > ==============
> >
> > In a nutshell, the CommiterOperator (V2 Sink) does not commit all
> > pending committables for batch pipelines anymore, but only for
> > current_checkpoint_id+1. This is caused specifically by this line:
> > https://github.com/apache/flink/blob/5ad464d8156b5094e0aba7712e4aa22b9f44e84d/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java#L154
> >
> > In Iceberg, we use `Long.MAX_VALUE` for the final checkpoint id:
> > https://github.com/apache/iceberg/blob/ee90c10e39cec0ccceb9425e03a3e0b5690daf3b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java#L79.
> > This value was generally used by other connectors in Flink. This
> > behavior was changed in
> > https://issues.apache.org/jira/browse/FLINK-37605 /
> > https://github.com/apache/flink/pull/26433.
> >
> > Since committables using `Long.MAX_VALUE` won't be committed, the
> > Iceberg table won't be updated on shutdown, which means it will not
> > contain any data! That results in potential data loss.
> >
> > How to fix this issue?
> > ================
> >
> > For Flink 2.1.0+, we can generate committables with `checkpoint =
> > last_checkpoint_id + 1`. We already took this approach in
> > https://github.com/apache/iceberg/pull/13714.
> >
> > That is fine for the upcoming Iceberg version for Flink 2.1.0, but
> > users who use the current Iceberg version with 1.19.2 or 1.20.1 will
> > be up for a surprise when they upgrade to the latest patch release
> > 1.19.3 or 1.20.2. They will experience data loss in batch pipelines.
> > While we may want to keep the other changes in FLINK-37605, I think we
> > need to include a fix for the Flink 1.19.4 and Flink 1.20.3 release to
> > ensure we commit all pending committables in batch pipelines.
> >
> > I'm curious to hear if you think there is another way to resolve this.
> >
> > Beyond this issue
> > =============
> >
> > The overarching question of all of this is, why are we pushing these
> > types of invasive changes to patch releases? I think the change could
> > have used another pair of eyes to assess the potential impact.
> >
> > Another thing which we could have done is to run the Iceberg tests
> > during release testing, which would have uncovered this bug. I'll
> > definitely check that for the next release.
> >
> > Cheers and thanks,
> > Max

Reply via email to