> setting the committer parallelism to 1.

Yun, setting the parallelism to 1 is essentially a global committer. That
would work. not sure about the implications to other parts of the v2 sink
interface.

On Tue, Sep 13, 2022 at 2:17 PM Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> wrote:

> Hi  Martijn
> Could you clarify a little bit what do you mean by:
>
> "The important part to remember is that this
> topology is lagging one checkpoint behind in terms of fault-tolerance: it
> only receives data once the committer committed"
>
> What are the implications?
>
> Thanks,
> Krzysztof Chmielewski
>
> wt., 13 wrz 2022 o 09:57 Yun Gao <yungao...@aliyun.com.invalid>
> napisał(a):
>
>> Hi,
>> Very sorry for the late reply for being in the holiday.
>> And also very thanks for the discussion, it also reminds me
>> one more background on the change of the GlobalCommitter:
>> When we are refactoring the job finish process in FLIP-147 to
>> ensures all the records could be committed at the end of bounded
>> streaming job, we have to desert the support for the cascade commits,
>> which makes the cascade commit of `committer -> global committer` not work
>> in all cases.
>> For the current issues, one possible alternative option from my side is
>> that we
>> may support setting the committer parallelism to 1. Could this option
>> solves
>> the issue in the current scenarios? I'll also have a double check with if
>> it could be implemented and the failed tests Krzysztof met.
>> Best,
>> Yun
>> ------------------------------------------------------------------
>> From:Steven Wu <stevenz...@gmail.com>
>> Send Time:2022 Sep. 10 (Sat.) 11:31
>> To:dev <dev@flink.apache.org>
>> Cc:Yun Gao <yungao...@aliyun.com>; hililiwei <hilili...@gmail.com>
>> Subject:Re: Sink V2 interface replacement for GlobalCommitter
>> Martjin, thanks a lot for chiming in!
>> Here are my concerns with adding GlobalCommitter in the
>> PostCommitTopology
>> 1. when we use TwoPhaseCommittingSink. We would need to create a
>> noop/dummy committer. Actual Iceberg/DeltaLake commits happen in the
>> PostCommit stage. The PostCommit stage should be doing some work after the
>> commit (not for the commit).
>> 2. GlobalCommitter is marked as @deprecated. It will be removed at a
>> certain point. What then?
>> Thanks,
>> Steven
>> On Fri, Sep 9, 2022 at 1:20 PM Krzysztof Chmielewski <
>> krzysiek.chmielew...@gmail.com <mailto:krzysiek.chmielew...@gmail.com >>
>> wrote:
>> Thanks Martijn,
>>  I'm actually trying to run our V1 Delta connector on Flink 1.15 using
>>  SinkV1Adapter with GlobalCommitterOperator.
>>  Having said that, I might have found a potential issue with
>>  GlobalCommitterOperator, checkpoitining and failover recovery [1].
>>  For "normal" scenarios it does look good though.
>>  Regards,
>>  Krzysztof Chmielewski
>>  [1] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc <
>> https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc >
>>  pt., 9 wrz 2022 o 20:49 Martijn Visser <martijnvis...@apache.org
>> <mailto:martijnvis...@apache.org >>
>>  napisał(a):
>>  > Hi all,
>>  >
>>  > A couple of bits from when work was being done on the new sink: V1 is
>>  > completely simulated as V2 [1]. V2 is strictly more expressive.
>>  >
>>  > If there's desire to stick to the `GlobalCommitter` interface, have a
>>  > look at the StandardSinkTopologies. Or you can just add your own more
>>  > fitting PostCommitTopology. The important part to remember is that this
>>  > topology is lagging one checkpoint behind in terms of fault-tolerance:
>> it
>>  > only receives data once the committer committed
>>  > on notifyCheckpointComplete. Thus, the global committer needs to be
>>  > idempotent and able to restore the actual state on recovery. That
>>  > limitation is coming in from Flink's checkpointing behaviour and
>> applies to
>>  > both V1 and V2. GlobalCommitterOperator is abstracting these issues
>> along
>>  > with handling retries (so commits that happen much later). So it's
>> probably
>>  > a good place to start just with the standard topology.
>>  >
>>  > Best regards,
>>  >
>>  > Martijn
>>  >
>>  > [1]
>>  >
>>  >
>> https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370
>> <
>> https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370
>> >
>>  >
>>  > Op do 8 sep. 2022 om 20:51 schreef Krzysztof Chmielewski <
>>  > krzysiek.chmielew...@gmail.com <mailto:krzysiek.chmielew...@gmail.com
>> >>:
>>  >
>>  > > Hi,
>>  > > Krzysztof Chmielewski [1] from Delta-Flink connector open source
>>  > community
>>  > > here [2].
>>  > >
>>  > > I'm totally agree with Steven on this. Sink's V1 GlobalCommitter is
>>  > > something exactly what Flink-Delta Sink needs since it is the place
>> where
>>  > > we do an actual commit to the Delta Log which should be done from a
>> one
>>  > > place/instance.
>>  > >
>>  > > Currently I'm evaluating V2 for our connector and having, how Steven
>>  > > described it a "more natural, built-in concept/support of
>> GlobalCommitter
>>  > > in the sink v2 interface" would be greatly appreciated.
>>  > >
>>  > > Cheers,
>>  > > Krzysztof Chmielewski
>>  > >
>>  > > [1] https://github.com/kristoffSC <https://github.com/kristoffSC >
>>  > > [2] https://github.com/delta-io/connectors/tree/master/flink <
>> https://github.com/delta-io/connectors/tree/master/flink >
>>  > >
>>  > > czw., 8 wrz 2022 o 19:51 Steven Wu <stevenz...@gmail.com <mailto:
>> stevenz...@gmail.com >> napisał(a):
>>  > >
>>  > > > Hi Yun,
>>  > > >
>>  > > > Thanks a lot for the reply!
>>  > > >
>>  > > > While we can add the global committer in the
>> WithPostCommitTopology,
>>  > the
>>  > > > semantics are weird. The Commit stage actually didn't commit
>> anything
>>  > to
>>  > > > the Iceberg table, and the PostCommit stage is where the Iceberg
>> commit
>>  > > > happens.
>>  > > >
>>  > > > I just took a quick look at DeltaLake Flink sink. It still uses
>> the V1
>>  > > sink
>>  > > > interface [1]. I think it might have the same issue when switching
>> to
>>  > the
>>  > > > V2 sink interface.
>>  > > >
>>  > > > For data lake storages (like Iceberg, DeltaLake) or any storage
>> with
>>  > > global
>>  > > > transactional commit, it would be more natural to have a built-in
>>  > > > concept/support of GlobalCommitter in the sink v2 interface.
>>  > > >
>>  > > > Thanks,
>>  > > > Steven
>>  > > >
>>  > > > [1]
>>  > > >
>>  > > >
>>  > >
>>  >
>> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
>> <
>> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
>> >
>>  > > >
>>  > > >
>>  > > > On Wed, Sep 7, 2022 at 2:15 AM Yun Gao
>> <yungao...@aliyun.com.invalid>
>>  > > > wrote:
>>  > > >
>>  > > > > Hi Steven, Liwei,
>>  > > > > Very sorry for missing this mail and response very late.
>>  > > > > I think the initial thought is indeed to use
>> `WithPostCommitTopology`
>>  > > as
>>  > > > > a replacement of the original GlobalCommitter, and currently the
>>  > > adapter
>>  > > > of
>>  > > > > Sink v1 on top of Sink v2 also maps the GlobalCommitter in Sink
>> V1
>>  > > > > interface
>>  > > > > onto an implementation of `WithPostCommitTopology`.
>>  > > > > Since `WithPostCommitTopology` supports arbitrary subgraph, thus
>> It
>>  > > seems
>>  > > > > to
>>  > > > > me it could support both global committer and small file
>> compaction?
>>  > We
>>  > > > > might
>>  > > > > have an `WithPostCommitTopology` implementation like
>>  > > > > DataStream ds = add global committer;
>>  > > > > if (enable file compaction) {
>>  > > > > build the compaction subgraph from ds
>>  > > > > }
>>  > > > > Best,
>>  > > > > Yun
>>  > > > > [1]
>>  > > > >
>>  > > >
>>  > >
>>  >
>> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
>> <
>> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
>> >
>>  > > > > <
>>  > > > >
>>  > > >
>>  > >
>>  >
>> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
>> <
>> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
>> >
>>  > > > > >
>>  > > > >
>> ------------------------------------------------------------------
>>  > > > > From:Steven Wu <stevenz...@gmail.com <mailto:
>> stevenz...@gmail.com >>
>>  > > > > Send Time:2022 Aug. 17 (Wed.) 07:30
>>  > > > > To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org >>;
>> hililiwei <hilili...@gmail.com <mailto:hilili...@gmail.com >>
>>  > > > > Subject:Re: Sink V2 interface replacement for GlobalCommitter
>>  > > > > > Plus, it will disable the future capability of small file
>>  > compaction
>>  > > > > stage post commit.
>>  > > > > I should clarify this comment. if we are using the
>>  > > > `WithPostCommitTopology`
>>  > > > > for global committer, we would lose the capability of using the
>> post
>>  > > > commit
>>  > > > > stage for small files compaction.
>>  > > > > On Tue, Aug 16, 2022 at 9:53 AM Steven Wu <stevenz...@gmail.com
>> <mailto:stevenz...@gmail.com >>
>>  > > wrote:
>>  > > > > >
>>  > > > > > In the V1 sink interface, there is a GlobalCommitter for
>> Iceberg.
>>  > > With
>>  > > > > the
>>  > > > > > V2 sink interface, GlobalCommitter has been deprecated by
>>  > > > > > WithPostCommitTopology. I thought the post commit stage is
>> mainly
>>  > for
>>  > > > > async
>>  > > > > > maintenance (like compaction).
>>  > > > > >
>>  > > > > > Are we supposed to do sth similar to the
>>  > GlobalCommittingSinkAdapter?
>>  > > > It
>>  > > > > > seems like a temporary transition plan for bridging v1 sinks
>> to v2
>>  > > > > > interfaces.
>>  > > > > >
>>  > > > > > private class GlobalCommittingSinkAdapter extends
>>  > > > > TwoPhaseCommittingSinkAdapter
>>  > > > > > implements WithPostCommitTopology<InputT, CommT> {
>>  > > > > > @Override
>>  > > > > > public void
>>  > > addPostCommitTopology(DataStream<CommittableMessage<CommT>>
>>  > > > > committables) {
>>  > > > > > StandardSinkTopologies.addGlobalCommitter(
>>  > > > > > committables,
>>  > > > > > GlobalCommitterAdapter::new,
>>  > > > > > () -> sink.getCommittableSerializer().get());
>>  > > > > > }
>>  > > > > > }
>>  > > > > >
>>  > > > > >
>>  > > > > > In the Iceberg PR [1] for adopting the new sink interface,
>> Liwei
>>  > used
>>  > > > the
>>  > > > > > "global" partitioner to force all committables go to a single
>>  > > committer
>>  > > > > > task 0. It will effectively force a global committer disguised
>> in
>>  > the
>>  > > > > > parallel committers. It is a little weird and also can lead to
>>  > > > questions
>>  > > > > > why other committer tasks are not getting any messages. Plus,
>> it
>>  > will
>>  > > > > > disable the future capability of small file compaction stage
>> post
>>  > > > commit.
>>  > > > > > Hence, I am asking what is the right approach to achieve global
>>  > > > committer
>>  > > > > > behavior.
>>  > > > > >
>>  > > > > > Thanks,
>>  > > > > > Steven
>>  > > > > >
>>  > > > > > [1]
>> https://github.com/apache/iceberg/pull/4904/files#r946975047 <
>> https://github.com/apache/iceberg/pull/4904/files#r946975047 > <
>>  > > > > https://github.com/apache/iceberg/pull/4904/files#r946975047 <
>> https://github.com/apache/iceberg/pull/4904/files#r946975047 > >
>>  > > > > >
>>  > > > >
>>  > > >
>>  > >
>>  >
>>
>

Reply via email to