In the V1 sink interface, there is a GlobalCommitter for Iceberg. With the
V2 sink interface,  GlobalCommitter has been deprecated by
WithPostCommitTopology. I thought the post commit stage is mainly for async
maintenance (like compaction).

Are we supposed to do sth similar to the GlobalCommittingSinkAdapter? It
seems like a temporary transition plan for bridging v1 sinks to v2
interfaces.

private class GlobalCommittingSinkAdapter extends TwoPhaseCommittingSinkAdapter
        implements WithPostCommitTopology<InputT, CommT> {
    @Override
    public void
addPostCommitTopology(DataStream<CommittableMessage<CommT>>
committables) {
       StandardSinkTopologies.addGlobalCommitter(
                committables,
                GlobalCommitterAdapter::new,
                () -> sink.getCommittableSerializer().get());
    }
}


In the Iceberg PR [1] for adopting the new sink interface, Liwei used the
"global" partitioner to force all committables go to a single committer
task 0. It will effectively force a global committer disguised in the
parallel committers. It is a little weird and also can lead to questions
why other committer tasks are not getting any messages. Plus, it will
disable the future capability of small file compaction stage post commit.
Hence, I am asking what is the right approach to achieve global committer
behavior.

Thanks,
Steven

[1] https://github.com/apache/iceberg/pull/4904/files#r946975047

Reply via email to