Martjin, thanks a lot for chiming in! Here are my concerns with adding GlobalCommitter in the PostCommitTopology 1. when we use TwoPhaseCommittingSink. We would need to create a noop/dummy committer. Actual Iceberg/DeltaLake commits happen in the PostCommit stage. The PostCommit stage should be doing some work after the commit (not for the commit). 2. GlobalCommitter is marked as @deprecated. It will be removed at a certain point. What then?
Thanks, Steven On Fri, Sep 9, 2022 at 1:20 PM Krzysztof Chmielewski < krzysiek.chmielew...@gmail.com> wrote: > Thanks Martijn, > I'm actually trying to run our V1 Delta connector on Flink 1.15 using > SinkV1Adapter with GlobalCommitterOperator. > > Having said that, I might have found a potential issue with > GlobalCommitterOperator, checkpoitining and failover recovery [1]. > For "normal" scenarios it does look good though. > > Regards, > Krzysztof Chmielewski > > [1] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc > > pt., 9 wrz 2022 o 20:49 Martijn Visser <martijnvis...@apache.org> > napisał(a): > > > Hi all, > > > > A couple of bits from when work was being done on the new sink: V1 is > > completely simulated as V2 [1]. V2 is strictly more expressive. > > > > If there's desire to stick to the `GlobalCommitter` interface, have a > > look at the StandardSinkTopologies. Or you can just add your own more > > fitting PostCommitTopology. The important part to remember is that this > > topology is lagging one checkpoint behind in terms of fault-tolerance: it > > only receives data once the committer committed > > on notifyCheckpointComplete. Thus, the global committer needs to be > > idempotent and able to restore the actual state on recovery. That > > limitation is coming in from Flink's checkpointing behaviour and applies > to > > both V1 and V2. GlobalCommitterOperator is abstracting these issues along > > with handling retries (so commits that happen much later). So it's > probably > > a good place to start just with the standard topology. > > > > Best regards, > > > > Martijn > > > > [1] > > > > > https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370 > > > > Op do 8 sep. 2022 om 20:51 schreef Krzysztof Chmielewski < > > krzysiek.chmielew...@gmail.com>: > > > > > Hi, > > > Krzysztof Chmielewski [1] from Delta-Flink connector open source > > community > > > here [2]. > > > > > > I'm totally agree with Steven on this. Sink's V1 GlobalCommitter is > > > something exactly what Flink-Delta Sink needs since it is the place > where > > > we do an actual commit to the Delta Log which should be done from a one > > > place/instance. > > > > > > Currently I'm evaluating V2 for our connector and having, how Steven > > > described it a "more natural, built-in concept/support of > GlobalCommitter > > > in the sink v2 interface" would be greatly appreciated. > > > > > > Cheers, > > > Krzysztof Chmielewski > > > > > > [1] https://github.com/kristoffSC > > > [2] https://github.com/delta-io/connectors/tree/master/flink > > > > > > czw., 8 wrz 2022 o 19:51 Steven Wu <stevenz...@gmail.com> napisał(a): > > > > > > > Hi Yun, > > > > > > > > Thanks a lot for the reply! > > > > > > > > While we can add the global committer in the WithPostCommitTopology, > > the > > > > semantics are weird. The Commit stage actually didn't commit anything > > to > > > > the Iceberg table, and the PostCommit stage is where the Iceberg > commit > > > > happens. > > > > > > > > I just took a quick look at DeltaLake Flink sink. It still uses the > V1 > > > sink > > > > interface [1]. I think it might have the same issue when switching to > > the > > > > V2 sink interface. > > > > > > > > For data lake storages (like Iceberg, DeltaLake) or any storage with > > > global > > > > transactional commit, it would be more natural to have a built-in > > > > concept/support of GlobalCommitter in the sink v2 interface. > > > > > > > > Thanks, > > > > Steven > > > > > > > > [1] > > > > > > > > > > > > > > https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java > > > > > > > > > > > > On Wed, Sep 7, 2022 at 2:15 AM Yun Gao <yungao...@aliyun.com.invalid > > > > > > wrote: > > > > > > > > > Hi Steven, Liwei, > > > > > Very sorry for missing this mail and response very late. > > > > > I think the initial thought is indeed to use > `WithPostCommitTopology` > > > as > > > > > a replacement of the original GlobalCommitter, and currently the > > > adapter > > > > of > > > > > Sink v1 on top of Sink v2 also maps the GlobalCommitter in Sink V1 > > > > > interface > > > > > onto an implementation of `WithPostCommitTopology`. > > > > > Since `WithPostCommitTopology` supports arbitrary subgraph, thus It > > > seems > > > > > to > > > > > me it could support both global committer and small file > compaction? > > We > > > > > might > > > > > have an `WithPostCommitTopology` implementation like > > > > > DataStream ds = add global committer; > > > > > if (enable file compaction) { > > > > > build the compaction subgraph from ds > > > > > } > > > > > Best, > > > > > Yun > > > > > [1] > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 > > > > > < > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 > > > > > > > > > > > ------------------------------------------------------------------ > > > > > From:Steven Wu <stevenz...@gmail.com> > > > > > Send Time:2022 Aug. 17 (Wed.) 07:30 > > > > > To:dev <dev@flink.apache.org>; hililiwei <hilili...@gmail.com> > > > > > Subject:Re: Sink V2 interface replacement for GlobalCommitter > > > > > > Plus, it will disable the future capability of small file > > compaction > > > > > stage post commit. > > > > > I should clarify this comment. if we are using the > > > > `WithPostCommitTopology` > > > > > for global committer, we would lose the capability of using the > post > > > > commit > > > > > stage for small files compaction. > > > > > On Tue, Aug 16, 2022 at 9:53 AM Steven Wu <stevenz...@gmail.com> > > > wrote: > > > > > > > > > > > > In the V1 sink interface, there is a GlobalCommitter for Iceberg. > > > With > > > > > the > > > > > > V2 sink interface, GlobalCommitter has been deprecated by > > > > > > WithPostCommitTopology. I thought the post commit stage is mainly > > for > > > > > async > > > > > > maintenance (like compaction). > > > > > > > > > > > > Are we supposed to do sth similar to the > > GlobalCommittingSinkAdapter? > > > > It > > > > > > seems like a temporary transition plan for bridging v1 sinks to > v2 > > > > > > interfaces. > > > > > > > > > > > > private class GlobalCommittingSinkAdapter extends > > > > > TwoPhaseCommittingSinkAdapter > > > > > > implements WithPostCommitTopology<InputT, CommT> { > > > > > > @Override > > > > > > public void > > > addPostCommitTopology(DataStream<CommittableMessage<CommT>> > > > > > committables) { > > > > > > StandardSinkTopologies.addGlobalCommitter( > > > > > > committables, > > > > > > GlobalCommitterAdapter::new, > > > > > > () -> sink.getCommittableSerializer().get()); > > > > > > } > > > > > > } > > > > > > > > > > > > > > > > > > In the Iceberg PR [1] for adopting the new sink interface, Liwei > > used > > > > the > > > > > > "global" partitioner to force all committables go to a single > > > committer > > > > > > task 0. It will effectively force a global committer disguised in > > the > > > > > > parallel committers. It is a little weird and also can lead to > > > > questions > > > > > > why other committer tasks are not getting any messages. Plus, it > > will > > > > > > disable the future capability of small file compaction stage post > > > > commit. > > > > > > Hence, I am asking what is the right approach to achieve global > > > > committer > > > > > > behavior. > > > > > > > > > > > > Thanks, > > > > > > Steven > > > > > > > > > > > > [1] https://github.com/apache/iceberg/pull/4904/files#r946975047 > < > > > > > https://github.com/apache/iceberg/pull/4904/files#r946975047 > > > > > > > > > > > > > > > > > > > > > >