Hi,
Very sorry for the late reply for being in the holiday. 
And also very thanks for the discussion, it also reminds me 
one more background on the change of the GlobalCommitter:
When we are refactoring the job finish process in FLIP-147 to
ensures all the records could be committed at the end of bounded
streaming job, we have to desert the support for the cascade commits, 
which makes the cascade commit of `committer -> global committer` not work
in all cases. 
For the current issues, one possible alternative option from my side is that we
may support setting the committer parallelism to 1. Could this option solves
the issue in the current scenarios? I'll also have a double check with if
it could be implemented and the failed tests Krzysztof met. 
Best,
Yun
------------------------------------------------------------------
From:Steven Wu <stevenz...@gmail.com>
Send Time:2022 Sep. 10 (Sat.) 11:31
To:dev <dev@flink.apache.org>
Cc:Yun Gao <yungao...@aliyun.com>; hililiwei <hilili...@gmail.com>
Subject:Re: Sink V2 interface replacement for GlobalCommitter
Martjin, thanks a lot for chiming in!
Here are my concerns with adding GlobalCommitter in the PostCommitTopology 
1. when we use TwoPhaseCommittingSink. We would need to create a noop/dummy 
committer. Actual Iceberg/DeltaLake commits happen in the PostCommit stage. The 
PostCommit stage should be doing some work after the commit (not for the 
commit).
2. GlobalCommitter is marked as @deprecated. It will be removed at a certain 
point. What then?
Thanks,
Steven
On Fri, Sep 9, 2022 at 1:20 PM Krzysztof Chmielewski 
<krzysiek.chmielew...@gmail.com <mailto:krzysiek.chmielew...@gmail.com >> wrote:
Thanks Martijn,
 I'm actually trying to run our V1 Delta connector on Flink 1.15 using
 SinkV1Adapter with GlobalCommitterOperator.
 Having said that, I might have found a potential issue with
 GlobalCommitterOperator, checkpoitining and failover recovery [1].
 For "normal" scenarios it does look good though.
 Regards,
 Krzysztof Chmielewski
 [1] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc 
<https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc >
 pt., 9 wrz 2022 o 20:49 Martijn Visser <martijnvis...@apache.org 
<mailto:martijnvis...@apache.org >>
 napisał(a):
 > Hi all,
 >
 > A couple of bits from when work was being done on the new sink: V1 is
 > completely simulated as V2 [1]. V2 is strictly more expressive.
 >
 > If there's desire to stick to the `GlobalCommitter` interface, have a
 > look at the StandardSinkTopologies. Or you can just add your own more
 > fitting PostCommitTopology. The important part to remember is that this
 > topology is lagging one checkpoint behind in terms of fault-tolerance: it
 > only receives data once the committer committed
 > on notifyCheckpointComplete. Thus, the global committer needs to be
 > idempotent and able to restore the actual state on recovery. That
 > limitation is coming in from Flink's checkpointing behaviour and applies to
 > both V1 and V2. GlobalCommitterOperator is abstracting these issues along
 > with handling retries (so commits that happen much later). So it's probably
 > a good place to start just with the standard topology.
 >
 > Best regards,
 >
 > Martijn
 >
 > [1]
 >
 > https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370
 >  
 > <https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370
 >  >
 >
 > Op do 8 sep. 2022 om 20:51 schreef Krzysztof Chmielewski <
 > krzysiek.chmielew...@gmail.com <mailto:krzysiek.chmielew...@gmail.com >>:
 >
 > > Hi,
 > > Krzysztof Chmielewski [1] from Delta-Flink connector open source
 > community
 > > here [2].
 > >
 > > I'm totally agree with Steven on this. Sink's V1 GlobalCommitter is
 > > something exactly what Flink-Delta Sink needs since it is the place where
 > > we do an actual commit to the Delta Log which should be done from a one
 > > place/instance.
 > >
 > > Currently I'm evaluating V2 for our connector and having, how Steven
 > > described it a "more natural, built-in concept/support of GlobalCommitter
 > > in the sink v2 interface" would be greatly appreciated.
 > >
 > > Cheers,
 > > Krzysztof Chmielewski
 > >
 > > [1] https://github.com/kristoffSC <https://github.com/kristoffSC >
 > > [2] https://github.com/delta-io/connectors/tree/master/flink 
 > > <https://github.com/delta-io/connectors/tree/master/flink >
 > >
 > > czw., 8 wrz 2022 o 19:51 Steven Wu <stevenz...@gmail.com 
 > > <mailto:stevenz...@gmail.com >> napisał(a):
 > >
 > > > Hi Yun,
 > > >
 > > > Thanks a lot for the reply!
 > > >
 > > > While we can add the global committer in the WithPostCommitTopology,
 > the
 > > > semantics are weird. The Commit stage actually didn't commit anything
 > to
 > > > the Iceberg table, and the PostCommit stage is where the Iceberg commit
 > > > happens.
 > > >
 > > > I just took a quick look at DeltaLake Flink sink. It still uses the V1
 > > sink
 > > > interface [1]. I think it might have the same issue when switching to
 > the
 > > > V2 sink interface.
 > > >
 > > > For data lake storages (like Iceberg, DeltaLake) or any storage with
 > > global
 > > > transactional commit, it would be more natural to have a built-in
 > > > concept/support of GlobalCommitter in the sink v2 interface.
 > > >
 > > > Thanks,
 > > > Steven
 > > >
 > > > [1]
 > > >
 > > >
 > >
 > https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
 >  
 > <https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
 >  >
 > > >
 > > >
 > > > On Wed, Sep 7, 2022 at 2:15 AM Yun Gao <yungao...@aliyun.com.invalid>
 > > > wrote:
 > > >
 > > > > Hi Steven, Liwei,
 > > > > Very sorry for missing this mail and response very late.
 > > > > I think the initial thought is indeed to use `WithPostCommitTopology`
 > > as
 > > > > a replacement of the original GlobalCommitter, and currently the
 > > adapter
 > > > of
 > > > > Sink v1 on top of Sink v2 also maps the GlobalCommitter in Sink V1
 > > > > interface
 > > > > onto an implementation of `WithPostCommitTopology`.
 > > > > Since `WithPostCommitTopology` supports arbitrary subgraph, thus It
 > > seems
 > > > > to
 > > > > me it could support both global committer and small file compaction?
 > We
 > > > > might
 > > > > have an `WithPostCommitTopology` implementation like
 > > > > DataStream ds = add global committer;
 > > > > if (enable file compaction) {
 > > > > build the compaction subgraph from ds
 > > > > }
 > > > > Best,
 > > > > Yun
 > > > > [1]
 > > > >
 > > >
 > >
 > https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
 >  
 > <https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
 >  >
 > > > > <
 > > > >
 > > >
 > >
 > https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
 >  
 > <https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
 >  >
 > > > > >
 > > > > ------------------------------------------------------------------
 > > > > From:Steven Wu <stevenz...@gmail.com <mailto:stevenz...@gmail.com >>
 > > > > Send Time:2022 Aug. 17 (Wed.) 07:30
 > > > > To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org >>; 
 > > > > hililiwei <hilili...@gmail.com <mailto:hilili...@gmail.com >>
 > > > > Subject:Re: Sink V2 interface replacement for GlobalCommitter
 > > > > > Plus, it will disable the future capability of small file
 > compaction
 > > > > stage post commit.
 > > > > I should clarify this comment. if we are using the
 > > > `WithPostCommitTopology`
 > > > > for global committer, we would lose the capability of using the post
 > > > commit
 > > > > stage for small files compaction.
 > > > > On Tue, Aug 16, 2022 at 9:53 AM Steven Wu <stevenz...@gmail.com 
 > > > > <mailto:stevenz...@gmail.com >>
 > > wrote:
 > > > > >
 > > > > > In the V1 sink interface, there is a GlobalCommitter for Iceberg.
 > > With
 > > > > the
 > > > > > V2 sink interface, GlobalCommitter has been deprecated by
 > > > > > WithPostCommitTopology. I thought the post commit stage is mainly
 > for
 > > > > async
 > > > > > maintenance (like compaction).
 > > > > >
 > > > > > Are we supposed to do sth similar to the
 > GlobalCommittingSinkAdapter?
 > > > It
 > > > > > seems like a temporary transition plan for bridging v1 sinks to v2
 > > > > > interfaces.
 > > > > >
 > > > > > private class GlobalCommittingSinkAdapter extends
 > > > > TwoPhaseCommittingSinkAdapter
 > > > > > implements WithPostCommitTopology<InputT, CommT> {
 > > > > > @Override
 > > > > > public void
 > > addPostCommitTopology(DataStream<CommittableMessage<CommT>>
 > > > > committables) {
 > > > > > StandardSinkTopologies.addGlobalCommitter(
 > > > > > committables,
 > > > > > GlobalCommitterAdapter::new,
 > > > > > () -> sink.getCommittableSerializer().get());
 > > > > > }
 > > > > > }
 > > > > >
 > > > > >
 > > > > > In the Iceberg PR [1] for adopting the new sink interface, Liwei
 > used
 > > > the
 > > > > > "global" partitioner to force all committables go to a single
 > > committer
 > > > > > task 0. It will effectively force a global committer disguised in
 > the
 > > > > > parallel committers. It is a little weird and also can lead to
 > > > questions
 > > > > > why other committer tasks are not getting any messages. Plus, it
 > will
 > > > > > disable the future capability of small file compaction stage post
 > > > commit.
 > > > > > Hence, I am asking what is the right approach to achieve global
 > > > committer
 > > > > > behavior.
 > > > > >
 > > > > > Thanks,
 > > > > > Steven
 > > > > >
 > > > > > [1] https://github.com/apache/iceberg/pull/4904/files#r946975047 
 > > > > > <https://github.com/apache/iceberg/pull/4904/files#r946975047 > <
 > > > > https://github.com/apache/iceberg/pull/4904/files#r946975047 
 > > > > <https://github.com/apache/iceberg/pull/4904/files#r946975047 > >
 > > > > >
 > > > >
 > > >
 > >
 >

Reply via email to