Hi  Martijn
Could you clarify a little bit what do you mean by:

"The important part to remember is that this
topology is lagging one checkpoint behind in terms of fault-tolerance: it
only receives data once the committer committed"

What are the implications?

Thanks,
Krzysztof Chmielewski

wt., 13 wrz 2022 o 09:57 Yun Gao <yungao...@aliyun.com.invalid> napisał(a):

> Hi,
> Very sorry for the late reply for being in the holiday.
> And also very thanks for the discussion, it also reminds me
> one more background on the change of the GlobalCommitter:
> When we are refactoring the job finish process in FLIP-147 to
> ensures all the records could be committed at the end of bounded
> streaming job, we have to desert the support for the cascade commits,
> which makes the cascade commit of `committer -> global committer` not work
> in all cases.
> For the current issues, one possible alternative option from my side is
> that we
> may support setting the committer parallelism to 1. Could this option
> solves
> the issue in the current scenarios? I'll also have a double check with if
> it could be implemented and the failed tests Krzysztof met.
> Best,
> Yun
> ------------------------------------------------------------------
> From:Steven Wu <stevenz...@gmail.com>
> Send Time:2022 Sep. 10 (Sat.) 11:31
> To:dev <dev@flink.apache.org>
> Cc:Yun Gao <yungao...@aliyun.com>; hililiwei <hilili...@gmail.com>
> Subject:Re: Sink V2 interface replacement for GlobalCommitter
> Martjin, thanks a lot for chiming in!
> Here are my concerns with adding GlobalCommitter in the PostCommitTopology
> 1. when we use TwoPhaseCommittingSink. We would need to create a
> noop/dummy committer. Actual Iceberg/DeltaLake commits happen in the
> PostCommit stage. The PostCommit stage should be doing some work after the
> commit (not for the commit).
> 2. GlobalCommitter is marked as @deprecated. It will be removed at a
> certain point. What then?
> Thanks,
> Steven
> On Fri, Sep 9, 2022 at 1:20 PM Krzysztof Chmielewski <
> krzysiek.chmielew...@gmail.com <mailto:krzysiek.chmielew...@gmail.com >>
> wrote:
> Thanks Martijn,
>  I'm actually trying to run our V1 Delta connector on Flink 1.15 using
>  SinkV1Adapter with GlobalCommitterOperator.
>  Having said that, I might have found a potential issue with
>  GlobalCommitterOperator, checkpoitining and failover recovery [1].
>  For "normal" scenarios it does look good though.
>  Regards,
>  Krzysztof Chmielewski
>  [1] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc <
> https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc >
>  pt., 9 wrz 2022 o 20:49 Martijn Visser <martijnvis...@apache.org <mailto:
> martijnvis...@apache.org >>
>  napisał(a):
>  > Hi all,
>  >
>  > A couple of bits from when work was being done on the new sink: V1 is
>  > completely simulated as V2 [1]. V2 is strictly more expressive.
>  >
>  > If there's desire to stick to the `GlobalCommitter` interface, have a
>  > look at the StandardSinkTopologies. Or you can just add your own more
>  > fitting PostCommitTopology. The important part to remember is that this
>  > topology is lagging one checkpoint behind in terms of fault-tolerance:
> it
>  > only receives data once the committer committed
>  > on notifyCheckpointComplete. Thus, the global committer needs to be
>  > idempotent and able to restore the actual state on recovery. That
>  > limitation is coming in from Flink's checkpointing behaviour and
> applies to
>  > both V1 and V2. GlobalCommitterOperator is abstracting these issues
> along
>  > with handling retries (so commits that happen much later). So it's
> probably
>  > a good place to start just with the standard topology.
>  >
>  > Best regards,
>  >
>  > Martijn
>  >
>  > [1]
>  >
>  >
> https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370
> <
> https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370
> >
>  >
>  > Op do 8 sep. 2022 om 20:51 schreef Krzysztof Chmielewski <
>  > krzysiek.chmielew...@gmail.com <mailto:krzysiek.chmielew...@gmail.com
> >>:
>  >
>  > > Hi,
>  > > Krzysztof Chmielewski [1] from Delta-Flink connector open source
>  > community
>  > > here [2].
>  > >
>  > > I'm totally agree with Steven on this. Sink's V1 GlobalCommitter is
>  > > something exactly what Flink-Delta Sink needs since it is the place
> where
>  > > we do an actual commit to the Delta Log which should be done from a
> one
>  > > place/instance.
>  > >
>  > > Currently I'm evaluating V2 for our connector and having, how Steven
>  > > described it a "more natural, built-in concept/support of
> GlobalCommitter
>  > > in the sink v2 interface" would be greatly appreciated.
>  > >
>  > > Cheers,
>  > > Krzysztof Chmielewski
>  > >
>  > > [1] https://github.com/kristoffSC <https://github.com/kristoffSC >
>  > > [2] https://github.com/delta-io/connectors/tree/master/flink <
> https://github.com/delta-io/connectors/tree/master/flink >
>  > >
>  > > czw., 8 wrz 2022 o 19:51 Steven Wu <stevenz...@gmail.com <mailto:
> stevenz...@gmail.com >> napisał(a):
>  > >
>  > > > Hi Yun,
>  > > >
>  > > > Thanks a lot for the reply!
>  > > >
>  > > > While we can add the global committer in the WithPostCommitTopology,
>  > the
>  > > > semantics are weird. The Commit stage actually didn't commit
> anything
>  > to
>  > > > the Iceberg table, and the PostCommit stage is where the Iceberg
> commit
>  > > > happens.
>  > > >
>  > > > I just took a quick look at DeltaLake Flink sink. It still uses the
> V1
>  > > sink
>  > > > interface [1]. I think it might have the same issue when switching
> to
>  > the
>  > > > V2 sink interface.
>  > > >
>  > > > For data lake storages (like Iceberg, DeltaLake) or any storage with
>  > > global
>  > > > transactional commit, it would be more natural to have a built-in
>  > > > concept/support of GlobalCommitter in the sink v2 interface.
>  > > >
>  > > > Thanks,
>  > > > Steven
>  > > >
>  > > > [1]
>  > > >
>  > > >
>  > >
>  >
> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
> <
> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
> >
>  > > >
>  > > >
>  > > > On Wed, Sep 7, 2022 at 2:15 AM Yun Gao <yungao...@aliyun.com.invalid
> >
>  > > > wrote:
>  > > >
>  > > > > Hi Steven, Liwei,
>  > > > > Very sorry for missing this mail and response very late.
>  > > > > I think the initial thought is indeed to use
> `WithPostCommitTopology`
>  > > as
>  > > > > a replacement of the original GlobalCommitter, and currently the
>  > > adapter
>  > > > of
>  > > > > Sink v1 on top of Sink v2 also maps the GlobalCommitter in Sink V1
>  > > > > interface
>  > > > > onto an implementation of `WithPostCommitTopology`.
>  > > > > Since `WithPostCommitTopology` supports arbitrary subgraph, thus
> It
>  > > seems
>  > > > > to
>  > > > > me it could support both global committer and small file
> compaction?
>  > We
>  > > > > might
>  > > > > have an `WithPostCommitTopology` implementation like
>  > > > > DataStream ds = add global committer;
>  > > > > if (enable file compaction) {
>  > > > > build the compaction subgraph from ds
>  > > > > }
>  > > > > Best,
>  > > > > Yun
>  > > > > [1]
>  > > > >
>  > > >
>  > >
>  >
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> <
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> >
>  > > > > <
>  > > > >
>  > > >
>  > >
>  >
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> <
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> >
>  > > > > >
>  > > > > ------------------------------------------------------------------
>  > > > > From:Steven Wu <stevenz...@gmail.com <mailto:stevenz...@gmail.com
> >>
>  > > > > Send Time:2022 Aug. 17 (Wed.) 07:30
>  > > > > To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org >>;
> hililiwei <hilili...@gmail.com <mailto:hilili...@gmail.com >>
>  > > > > Subject:Re: Sink V2 interface replacement for GlobalCommitter
>  > > > > > Plus, it will disable the future capability of small file
>  > compaction
>  > > > > stage post commit.
>  > > > > I should clarify this comment. if we are using the
>  > > > `WithPostCommitTopology`
>  > > > > for global committer, we would lose the capability of using the
> post
>  > > > commit
>  > > > > stage for small files compaction.
>  > > > > On Tue, Aug 16, 2022 at 9:53 AM Steven Wu <stevenz...@gmail.com
> <mailto:stevenz...@gmail.com >>
>  > > wrote:
>  > > > > >
>  > > > > > In the V1 sink interface, there is a GlobalCommitter for
> Iceberg.
>  > > With
>  > > > > the
>  > > > > > V2 sink interface, GlobalCommitter has been deprecated by
>  > > > > > WithPostCommitTopology. I thought the post commit stage is
> mainly
>  > for
>  > > > > async
>  > > > > > maintenance (like compaction).
>  > > > > >
>  > > > > > Are we supposed to do sth similar to the
>  > GlobalCommittingSinkAdapter?
>  > > > It
>  > > > > > seems like a temporary transition plan for bridging v1 sinks to
> v2
>  > > > > > interfaces.
>  > > > > >
>  > > > > > private class GlobalCommittingSinkAdapter extends
>  > > > > TwoPhaseCommittingSinkAdapter
>  > > > > > implements WithPostCommitTopology<InputT, CommT> {
>  > > > > > @Override
>  > > > > > public void
>  > > addPostCommitTopology(DataStream<CommittableMessage<CommT>>
>  > > > > committables) {
>  > > > > > StandardSinkTopologies.addGlobalCommitter(
>  > > > > > committables,
>  > > > > > GlobalCommitterAdapter::new,
>  > > > > > () -> sink.getCommittableSerializer().get());
>  > > > > > }
>  > > > > > }
>  > > > > >
>  > > > > >
>  > > > > > In the Iceberg PR [1] for adopting the new sink interface, Liwei
>  > used
>  > > > the
>  > > > > > "global" partitioner to force all committables go to a single
>  > > committer
>  > > > > > task 0. It will effectively force a global committer disguised
> in
>  > the
>  > > > > > parallel committers. It is a little weird and also can lead to
>  > > > questions
>  > > > > > why other committer tasks are not getting any messages. Plus, it
>  > will
>  > > > > > disable the future capability of small file compaction stage
> post
>  > > > commit.
>  > > > > > Hence, I am asking what is the right approach to achieve global
>  > > > committer
>  > > > > > behavior.
>  > > > > >
>  > > > > > Thanks,
>  > > > > > Steven
>  > > > > >
>  > > > > > [1]
> https://github.com/apache/iceberg/pull/4904/files#r946975047 <
> https://github.com/apache/iceberg/pull/4904/files#r946975047 > <
>  > > > > https://github.com/apache/iceberg/pull/4904/files#r946975047 <
> https://github.com/apache/iceberg/pull/4904/files#r946975047 > >
>  > > > > >
>  > > > >
>  > > >
>  > >
>  >
>

Reply via email to