Hi Yun, Thanks a lot for the reply!
While we can add the global committer in the WithPostCommitTopology, the semantics are weird. The Commit stage actually didn't commit anything to the Iceberg table, and the PostCommit stage is where the Iceberg commit happens. I just took a quick look at DeltaLake Flink sink. It still uses the V1 sink interface [1]. I think it might have the same issue when switching to the V2 sink interface. For data lake storages (like Iceberg, DeltaLake) or any storage with global transactional commit, it would be more natural to have a built-in concept/support of GlobalCommitter in the sink v2 interface. Thanks, Steven [1] https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java On Wed, Sep 7, 2022 at 2:15 AM Yun Gao <yungao...@aliyun.com.invalid> wrote: > Hi Steven, Liwei, > Very sorry for missing this mail and response very late. > I think the initial thought is indeed to use `WithPostCommitTopology` as > a replacement of the original GlobalCommitter, and currently the adapter of > Sink v1 on top of Sink v2 also maps the GlobalCommitter in Sink V1 > interface > onto an implementation of `WithPostCommitTopology`. > Since `WithPostCommitTopology` supports arbitrary subgraph, thus It seems > to > me it could support both global committer and small file compaction? We > might > have an `WithPostCommitTopology` implementation like > DataStream ds = add global committer; > if (enable file compaction) { > build the compaction subgraph from ds > } > Best, > Yun > [1] > https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 > < > https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 > > > ------------------------------------------------------------------ > From:Steven Wu <stevenz...@gmail.com> > Send Time:2022 Aug. 17 (Wed.) 07:30 > To:dev <dev@flink.apache.org>; hililiwei <hilili...@gmail.com> > Subject:Re: Sink V2 interface replacement for GlobalCommitter > > Plus, it will disable the future capability of small file compaction > stage post commit. > I should clarify this comment. if we are using the `WithPostCommitTopology` > for global committer, we would lose the capability of using the post commit > stage for small files compaction. > On Tue, Aug 16, 2022 at 9:53 AM Steven Wu <stevenz...@gmail.com> wrote: > > > > In the V1 sink interface, there is a GlobalCommitter for Iceberg. With > the > > V2 sink interface, GlobalCommitter has been deprecated by > > WithPostCommitTopology. I thought the post commit stage is mainly for > async > > maintenance (like compaction). > > > > Are we supposed to do sth similar to the GlobalCommittingSinkAdapter? It > > seems like a temporary transition plan for bridging v1 sinks to v2 > > interfaces. > > > > private class GlobalCommittingSinkAdapter extends > TwoPhaseCommittingSinkAdapter > > implements WithPostCommitTopology<InputT, CommT> { > > @Override > > public void addPostCommitTopology(DataStream<CommittableMessage<CommT>> > committables) { > > StandardSinkTopologies.addGlobalCommitter( > > committables, > > GlobalCommitterAdapter::new, > > () -> sink.getCommittableSerializer().get()); > > } > > } > > > > > > In the Iceberg PR [1] for adopting the new sink interface, Liwei used the > > "global" partitioner to force all committables go to a single committer > > task 0. It will effectively force a global committer disguised in the > > parallel committers. It is a little weird and also can lead to questions > > why other committer tasks are not getting any messages. Plus, it will > > disable the future capability of small file compaction stage post commit. > > Hence, I am asking what is the right approach to achieve global committer > > behavior. > > > > Thanks, > > Steven > > > > [1] https://github.com/apache/iceberg/pull/4904/files#r946975047 < > https://github.com/apache/iceberg/pull/4904/files#r946975047 > > > >