> setting the committer parallelism to 1. Yun, setting the parallelism to 1 is essentially a global committer. That would work. not sure about the implications to other parts of the v2 sink interface.
On Tue, Sep 13, 2022 at 2:17 PM Krzysztof Chmielewski < krzysiek.chmielew...@gmail.com> wrote: > Hi Martijn > Could you clarify a little bit what do you mean by: > > "The important part to remember is that this > topology is lagging one checkpoint behind in terms of fault-tolerance: it > only receives data once the committer committed" > > What are the implications? > > Thanks, > Krzysztof Chmielewski > > wt., 13 wrz 2022 o 09:57 Yun Gao <yungao...@aliyun.com.invalid> > napisał(a): > >> Hi, >> Very sorry for the late reply for being in the holiday. >> And also very thanks for the discussion, it also reminds me >> one more background on the change of the GlobalCommitter: >> When we are refactoring the job finish process in FLIP-147 to >> ensures all the records could be committed at the end of bounded >> streaming job, we have to desert the support for the cascade commits, >> which makes the cascade commit of `committer -> global committer` not work >> in all cases. >> For the current issues, one possible alternative option from my side is >> that we >> may support setting the committer parallelism to 1. Could this option >> solves >> the issue in the current scenarios? I'll also have a double check with if >> it could be implemented and the failed tests Krzysztof met. >> Best, >> Yun >> ------------------------------------------------------------------ >> From:Steven Wu <stevenz...@gmail.com> >> Send Time:2022 Sep. 10 (Sat.) 11:31 >> To:dev <dev@flink.apache.org> >> Cc:Yun Gao <yungao...@aliyun.com>; hililiwei <hilili...@gmail.com> >> Subject:Re: Sink V2 interface replacement for GlobalCommitter >> Martjin, thanks a lot for chiming in! >> Here are my concerns with adding GlobalCommitter in the >> PostCommitTopology >> 1. when we use TwoPhaseCommittingSink. We would need to create a >> noop/dummy committer. Actual Iceberg/DeltaLake commits happen in the >> PostCommit stage. The PostCommit stage should be doing some work after the >> commit (not for the commit). >> 2. GlobalCommitter is marked as @deprecated. It will be removed at a >> certain point. What then? >> Thanks, >> Steven >> On Fri, Sep 9, 2022 at 1:20 PM Krzysztof Chmielewski < >> krzysiek.chmielew...@gmail.com <mailto:krzysiek.chmielew...@gmail.com >> >> wrote: >> Thanks Martijn, >> I'm actually trying to run our V1 Delta connector on Flink 1.15 using >> SinkV1Adapter with GlobalCommitterOperator. >> Having said that, I might have found a potential issue with >> GlobalCommitterOperator, checkpoitining and failover recovery [1]. >> For "normal" scenarios it does look good though. >> Regards, >> Krzysztof Chmielewski >> [1] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc < >> https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc > >> pt., 9 wrz 2022 o 20:49 Martijn Visser <martijnvis...@apache.org >> <mailto:martijnvis...@apache.org >> >> napisał(a): >> > Hi all, >> > >> > A couple of bits from when work was being done on the new sink: V1 is >> > completely simulated as V2 [1]. V2 is strictly more expressive. >> > >> > If there's desire to stick to the `GlobalCommitter` interface, have a >> > look at the StandardSinkTopologies. Or you can just add your own more >> > fitting PostCommitTopology. The important part to remember is that this >> > topology is lagging one checkpoint behind in terms of fault-tolerance: >> it >> > only receives data once the committer committed >> > on notifyCheckpointComplete. Thus, the global committer needs to be >> > idempotent and able to restore the actual state on recovery. That >> > limitation is coming in from Flink's checkpointing behaviour and >> applies to >> > both V1 and V2. GlobalCommitterOperator is abstracting these issues >> along >> > with handling retries (so commits that happen much later). So it's >> probably >> > a good place to start just with the standard topology. >> > >> > Best regards, >> > >> > Martijn >> > >> > [1] >> > >> > >> https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370 >> < >> https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370 >> > >> > >> > Op do 8 sep. 2022 om 20:51 schreef Krzysztof Chmielewski < >> > krzysiek.chmielew...@gmail.com <mailto:krzysiek.chmielew...@gmail.com >> >>: >> > >> > > Hi, >> > > Krzysztof Chmielewski [1] from Delta-Flink connector open source >> > community >> > > here [2]. >> > > >> > > I'm totally agree with Steven on this. Sink's V1 GlobalCommitter is >> > > something exactly what Flink-Delta Sink needs since it is the place >> where >> > > we do an actual commit to the Delta Log which should be done from a >> one >> > > place/instance. >> > > >> > > Currently I'm evaluating V2 for our connector and having, how Steven >> > > described it a "more natural, built-in concept/support of >> GlobalCommitter >> > > in the sink v2 interface" would be greatly appreciated. >> > > >> > > Cheers, >> > > Krzysztof Chmielewski >> > > >> > > [1] https://github.com/kristoffSC <https://github.com/kristoffSC > >> > > [2] https://github.com/delta-io/connectors/tree/master/flink < >> https://github.com/delta-io/connectors/tree/master/flink > >> > > >> > > czw., 8 wrz 2022 o 19:51 Steven Wu <stevenz...@gmail.com <mailto: >> stevenz...@gmail.com >> napisał(a): >> > > >> > > > Hi Yun, >> > > > >> > > > Thanks a lot for the reply! >> > > > >> > > > While we can add the global committer in the >> WithPostCommitTopology, >> > the >> > > > semantics are weird. The Commit stage actually didn't commit >> anything >> > to >> > > > the Iceberg table, and the PostCommit stage is where the Iceberg >> commit >> > > > happens. >> > > > >> > > > I just took a quick look at DeltaLake Flink sink. It still uses >> the V1 >> > > sink >> > > > interface [1]. I think it might have the same issue when switching >> to >> > the >> > > > V2 sink interface. >> > > > >> > > > For data lake storages (like Iceberg, DeltaLake) or any storage >> with >> > > global >> > > > transactional commit, it would be more natural to have a built-in >> > > > concept/support of GlobalCommitter in the sink v2 interface. >> > > > >> > > > Thanks, >> > > > Steven >> > > > >> > > > [1] >> > > > >> > > > >> > > >> > >> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java >> < >> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java >> > >> > > > >> > > > >> > > > On Wed, Sep 7, 2022 at 2:15 AM Yun Gao >> <yungao...@aliyun.com.invalid> >> > > > wrote: >> > > > >> > > > > Hi Steven, Liwei, >> > > > > Very sorry for missing this mail and response very late. >> > > > > I think the initial thought is indeed to use >> `WithPostCommitTopology` >> > > as >> > > > > a replacement of the original GlobalCommitter, and currently the >> > > adapter >> > > > of >> > > > > Sink v1 on top of Sink v2 also maps the GlobalCommitter in Sink >> V1 >> > > > > interface >> > > > > onto an implementation of `WithPostCommitTopology`. >> > > > > Since `WithPostCommitTopology` supports arbitrary subgraph, thus >> It >> > > seems >> > > > > to >> > > > > me it could support both global committer and small file >> compaction? >> > We >> > > > > might >> > > > > have an `WithPostCommitTopology` implementation like >> > > > > DataStream ds = add global committer; >> > > > > if (enable file compaction) { >> > > > > build the compaction subgraph from ds >> > > > > } >> > > > > Best, >> > > > > Yun >> > > > > [1] >> > > > > >> > > > >> > > >> > >> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 >> < >> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 >> > >> > > > > < >> > > > > >> > > > >> > > >> > >> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 >> < >> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365 >> > >> > > > > > >> > > > > >> ------------------------------------------------------------------ >> > > > > From:Steven Wu <stevenz...@gmail.com <mailto: >> stevenz...@gmail.com >> >> > > > > Send Time:2022 Aug. 17 (Wed.) 07:30 >> > > > > To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org >>; >> hililiwei <hilili...@gmail.com <mailto:hilili...@gmail.com >> >> > > > > Subject:Re: Sink V2 interface replacement for GlobalCommitter >> > > > > > Plus, it will disable the future capability of small file >> > compaction >> > > > > stage post commit. >> > > > > I should clarify this comment. if we are using the >> > > > `WithPostCommitTopology` >> > > > > for global committer, we would lose the capability of using the >> post >> > > > commit >> > > > > stage for small files compaction. >> > > > > On Tue, Aug 16, 2022 at 9:53 AM Steven Wu <stevenz...@gmail.com >> <mailto:stevenz...@gmail.com >> >> > > wrote: >> > > > > > >> > > > > > In the V1 sink interface, there is a GlobalCommitter for >> Iceberg. >> > > With >> > > > > the >> > > > > > V2 sink interface, GlobalCommitter has been deprecated by >> > > > > > WithPostCommitTopology. I thought the post commit stage is >> mainly >> > for >> > > > > async >> > > > > > maintenance (like compaction). >> > > > > > >> > > > > > Are we supposed to do sth similar to the >> > GlobalCommittingSinkAdapter? >> > > > It >> > > > > > seems like a temporary transition plan for bridging v1 sinks >> to v2 >> > > > > > interfaces. >> > > > > > >> > > > > > private class GlobalCommittingSinkAdapter extends >> > > > > TwoPhaseCommittingSinkAdapter >> > > > > > implements WithPostCommitTopology<InputT, CommT> { >> > > > > > @Override >> > > > > > public void >> > > addPostCommitTopology(DataStream<CommittableMessage<CommT>> >> > > > > committables) { >> > > > > > StandardSinkTopologies.addGlobalCommitter( >> > > > > > committables, >> > > > > > GlobalCommitterAdapter::new, >> > > > > > () -> sink.getCommittableSerializer().get()); >> > > > > > } >> > > > > > } >> > > > > > >> > > > > > >> > > > > > In the Iceberg PR [1] for adopting the new sink interface, >> Liwei >> > used >> > > > the >> > > > > > "global" partitioner to force all committables go to a single >> > > committer >> > > > > > task 0. It will effectively force a global committer disguised >> in >> > the >> > > > > > parallel committers. It is a little weird and also can lead to >> > > > questions >> > > > > > why other committer tasks are not getting any messages. Plus, >> it >> > will >> > > > > > disable the future capability of small file compaction stage >> post >> > > > commit. >> > > > > > Hence, I am asking what is the right approach to achieve global >> > > > committer >> > > > > > behavior. >> > > > > > >> > > > > > Thanks, >> > > > > > Steven >> > > > > > >> > > > > > [1] >> https://github.com/apache/iceberg/pull/4904/files#r946975047 < >> https://github.com/apache/iceberg/pull/4904/files#r946975047 > < >> > > > > https://github.com/apache/iceberg/pull/4904/files#r946975047 < >> https://github.com/apache/iceberg/pull/4904/files#r946975047 > > >> > > > > > >> > > > > >> > > > >> > > >> > >> >