Martjin, thanks a lot for chiming in!

Here are my concerns with adding GlobalCommitter in the PostCommitTopology
1. when we use TwoPhaseCommittingSink. We would need to create a noop/dummy
committer. Actual Iceberg/DeltaLake commits happen in the PostCommit stage.
The PostCommit stage should be doing some work after the commit (not for
the commit).
2. GlobalCommitter is marked as @deprecated. It will be removed at a
certain point. What then?

Thanks,
Steven

On Fri, Sep 9, 2022 at 1:20 PM Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> wrote:

> Thanks Martijn,
> I'm actually trying to run our V1 Delta connector on Flink 1.15 using
> SinkV1Adapter with GlobalCommitterOperator.
>
> Having said that, I might have found a potential issue with
> GlobalCommitterOperator, checkpoitining and failover recovery [1].
> For "normal" scenarios it does look good though.
>
> Regards,
> Krzysztof Chmielewski
>
> [1] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc
>
> pt., 9 wrz 2022 o 20:49 Martijn Visser <martijnvis...@apache.org>
> napisał(a):
>
> > Hi all,
> >
> > A couple of bits from when work was being done on the new sink: V1 is
> > completely simulated as V2 [1]. V2 is strictly more expressive.
> >
> > If there's desire to stick to the `GlobalCommitter` interface, have a
> > look at the StandardSinkTopologies. Or you can just add your own more
> > fitting PostCommitTopology. The important part to remember is that this
> > topology is lagging one checkpoint behind in terms of fault-tolerance: it
> > only receives data once the committer committed
> > on notifyCheckpointComplete. Thus, the global committer needs to be
> > idempotent and able to restore the actual state on recovery. That
> > limitation is coming in from Flink's checkpointing behaviour and applies
> to
> > both V1 and V2. GlobalCommitterOperator is abstracting these issues along
> > with handling retries (so commits that happen much later). So it's
> probably
> > a good place to start just with the standard topology.
> >
> > Best regards,
> >
> > Martijn
> >
> > [1]
> >
> >
> https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370
> >
> > Op do 8 sep. 2022 om 20:51 schreef Krzysztof Chmielewski <
> > krzysiek.chmielew...@gmail.com>:
> >
> > > Hi,
> > > Krzysztof Chmielewski [1] from Delta-Flink connector open source
> > community
> > > here [2].
> > >
> > > I'm totally agree with Steven on this. Sink's V1 GlobalCommitter is
> > > something exactly what Flink-Delta Sink needs since it is the place
> where
> > > we do an actual commit to the Delta Log which should be done from a one
> > > place/instance.
> > >
> > > Currently I'm evaluating V2 for our connector and having, how Steven
> > > described it a "more natural, built-in concept/support of
> GlobalCommitter
> > > in the sink v2 interface" would be greatly appreciated.
> > >
> > > Cheers,
> > > Krzysztof Chmielewski
> > >
> > > [1] https://github.com/kristoffSC
> > > [2] https://github.com/delta-io/connectors/tree/master/flink
> > >
> > > czw., 8 wrz 2022 o 19:51 Steven Wu <stevenz...@gmail.com> napisał(a):
> > >
> > > > Hi Yun,
> > > >
> > > > Thanks a lot for the reply!
> > > >
> > > > While we can add the global committer in the WithPostCommitTopology,
> > the
> > > > semantics are weird. The Commit stage actually didn't commit anything
> > to
> > > > the Iceberg table, and the PostCommit stage is where the Iceberg
> commit
> > > > happens.
> > > >
> > > > I just took a quick look at DeltaLake Flink sink. It still uses the
> V1
> > > sink
> > > > interface [1]. I think it might have the same issue when switching to
> > the
> > > > V2 sink interface.
> > > >
> > > > For data lake storages (like Iceberg, DeltaLake) or any storage with
> > > global
> > > > transactional commit, it would be more natural to have a built-in
> > > > concept/support of GlobalCommitter in the sink v2 interface.
> > > >
> > > > Thanks,
> > > > Steven
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
> > > >
> > > >
> > > > On Wed, Sep 7, 2022 at 2:15 AM Yun Gao <yungao...@aliyun.com.invalid
> >
> > > > wrote:
> > > >
> > > > > Hi Steven, Liwei,
> > > > > Very sorry for missing this mail and response very late.
> > > > > I think the initial thought is indeed to use
> `WithPostCommitTopology`
> > > as
> > > > > a replacement of the original GlobalCommitter, and currently the
> > > adapter
> > > > of
> > > > > Sink v1 on top of Sink v2 also maps the GlobalCommitter in Sink V1
> > > > > interface
> > > > > onto an implementation of `WithPostCommitTopology`.
> > > > > Since `WithPostCommitTopology` supports arbitrary subgraph, thus It
> > > seems
> > > > > to
> > > > > me it could support both global committer and small file
> compaction?
> > We
> > > > > might
> > > > > have an `WithPostCommitTopology` implementation like
> > > > > DataStream ds = add global committer;
> > > > > if (enable file compaction) {
> > > > >  build the compaction subgraph from ds
> > > > > }
> > > > > Best,
> > > > > Yun
> > > > > [1]
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> > > > > <
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> > > > > >
> > > > > ------------------------------------------------------------------
> > > > > From:Steven Wu <stevenz...@gmail.com>
> > > > > Send Time:2022 Aug. 17 (Wed.) 07:30
> > > > > To:dev <dev@flink.apache.org>; hililiwei <hilili...@gmail.com>
> > > > > Subject:Re: Sink V2 interface replacement for GlobalCommitter
> > > > > > Plus, it will disable the future capability of small file
> > compaction
> > > > > stage post commit.
> > > > > I should clarify this comment. if we are using the
> > > > `WithPostCommitTopology`
> > > > > for global committer, we would lose the capability of using the
> post
> > > > commit
> > > > > stage for small files compaction.
> > > > > On Tue, Aug 16, 2022 at 9:53 AM Steven Wu <stevenz...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > In the V1 sink interface, there is a GlobalCommitter for Iceberg.
> > > With
> > > > > the
> > > > > > V2 sink interface, GlobalCommitter has been deprecated by
> > > > > > WithPostCommitTopology. I thought the post commit stage is mainly
> > for
> > > > > async
> > > > > > maintenance (like compaction).
> > > > > >
> > > > > > Are we supposed to do sth similar to the
> > GlobalCommittingSinkAdapter?
> > > > It
> > > > > > seems like a temporary transition plan for bridging v1 sinks to
> v2
> > > > > > interfaces.
> > > > > >
> > > > > > private class GlobalCommittingSinkAdapter extends
> > > > > TwoPhaseCommittingSinkAdapter
> > > > > > implements WithPostCommitTopology<InputT, CommT> {
> > > > > > @Override
> > > > > > public void
> > > addPostCommitTopology(DataStream<CommittableMessage<CommT>>
> > > > > committables) {
> > > > > > StandardSinkTopologies.addGlobalCommitter(
> > > > > > committables,
> > > > > > GlobalCommitterAdapter::new,
> > > > > > () -> sink.getCommittableSerializer().get());
> > > > > > }
> > > > > > }
> > > > > >
> > > > > >
> > > > > > In the Iceberg PR [1] for adopting the new sink interface, Liwei
> > used
> > > > the
> > > > > > "global" partitioner to force all committables go to a single
> > > committer
> > > > > > task 0. It will effectively force a global committer disguised in
> > the
> > > > > > parallel committers. It is a little weird and also can lead to
> > > > questions
> > > > > > why other committer tasks are not getting any messages. Plus, it
> > will
> > > > > > disable the future capability of small file compaction stage post
> > > > commit.
> > > > > > Hence, I am asking what is the right approach to achieve global
> > > > committer
> > > > > > behavior.
> > > > > >
> > > > > > Thanks,
> > > > > > Steven
> > > > > >
> > > > > > [1] https://github.com/apache/iceberg/pull/4904/files#r946975047
> <
> > > > > https://github.com/apache/iceberg/pull/4904/files#r946975047 >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to