Hi Yun,
Thanks for your input.

In Delta Sink connector we actually use both Committer [1] and
GlobalCommitter [2]. The former, since we are using Flink's Parquet file
writers is doing a very simple job of "of renaming the hidden file to make
it visible and removing from the name some 'in-progress file' marker". The
GlobalCommitter is committing data to the Delta Log.

With this design, having many instances of Committers actually has a
benefit for us. Plus we would see some next features in our connector that
would benefit from separate Committers with parallelism level higher than 1.

How I understood your suggestion Yun (and maybe It was a wrong
interpretation) is to use both Committer and GlobalCommitter but to enforce
parallelism level 1 on the former. The GlobalCommitter created by Flink's
1.15 SinkV1Adapter has parallelism 1 as expected and how it was in Flink <
1.15.

Anyways, I've play a little bit with the Flink code and I managed to
achieved this [3]. After some additional changes which I will describe
below, our test described in [4] passed without any data loss and no
Exceptions thrown by Flink.

However changing parallelism of Committer operator to hardcoded value 1 was
not enough and I had to do two more things:
1. add rebalance step (RebalancePartitioner) to graph between writer and
committer since now they have different parallelism level and default
partitioner was FORWARD that caused an exception to be thrown - BTW this is
clear and understood
2. modify Flinks CommittableCollectorSerializer [5] and this is I believe
an important thing.

The modification I had to made was caused by "Duplicate Key" exception from
deserialize(int version, byte[] serialized) method from line 143 of [5]
where we process a stream of SubtaskCommittableManager objects and collect
it into to the Map. The map key is a subtaskId
from SubtaskCommittableManager object.

After Task Manager recovery it may happen that List of
SubtaskCommittableManager that is processed in that  deserialize method
will contain two SubtaskCommittableManager for the same subtask ID. What I
did is that for such a case I call SubtaskCommittableManager .merge(...)
method.

With those modifications our Delta test [7] started to pass on Flink 1.15.

I do not know whether setting parallelism level of the Committer to 1 is a
right thing to do. Like I mentioned, Committer is doing some work in our
Sink implementation and we might have more usage for it in next features we
would like to add that would benefit from keeping parallelism level equal
to writers count.

I still think there is some issue with the V2 architecture for topologies
with GlobalCommitter and failover scenarios [4] and even that duplicated
key in [5] described above is another case, maybe we should never have two
entries for same subtaskId. That I don't know.

P.S.
Steven, apologies for hijacking the thread a little bit.

Thanks,
Krzysztof Chmielewski

[1]
https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaCommitter.java
[2]
https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
[3]
https://drive.google.com/file/d/1kU0R9nLZneJBDAkgNiaRc90dLGycyTec/view?usp=sharing
[4] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc
[5]
https://github.com/apache/flink/blob/release-1.15/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java
[7]
https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java

śr., 14 wrz 2022 o 05:26 Steven Wu <stevenz...@gmail.com> napisał(a):

> > setting the committer parallelism to 1.
>
> Yun, setting the parallelism to 1 is essentially a global committer. That
> would work. not sure about the implications to other parts of the v2 sink
> interface.
>
> On Tue, Sep 13, 2022 at 2:17 PM Krzysztof Chmielewski <
> krzysiek.chmielew...@gmail.com> wrote:
>
>> Hi  Martijn
>> Could you clarify a little bit what do you mean by:
>>
>> "The important part to remember is that this
>> topology is lagging one checkpoint behind in terms of fault-tolerance: it
>> only receives data once the committer committed"
>>
>> What are the implications?
>>
>> Thanks,
>> Krzysztof Chmielewski
>>
>> wt., 13 wrz 2022 o 09:57 Yun Gao <yungao...@aliyun.com.invalid>
>> napisał(a):
>>
>>> Hi,
>>> Very sorry for the late reply for being in the holiday.
>>> And also very thanks for the discussion, it also reminds me
>>> one more background on the change of the GlobalCommitter:
>>> When we are refactoring the job finish process in FLIP-147 to
>>> ensures all the records could be committed at the end of bounded
>>> streaming job, we have to desert the support for the cascade commits,
>>> which makes the cascade commit of `committer -> global committer` not
>>> work
>>> in all cases.
>>> For the current issues, one possible alternative option from my side is
>>> that we
>>> may support setting the committer parallelism to 1. Could this option
>>> solves
>>> the issue in the current scenarios? I'll also have a double check with if
>>> it could be implemented and the failed tests Krzysztof met.
>>> Best,
>>> Yun
>>> ------------------------------------------------------------------
>>> From:Steven Wu <stevenz...@gmail.com>
>>> Send Time:2022 Sep. 10 (Sat.) 11:31
>>> To:dev <dev@flink.apache.org>
>>> Cc:Yun Gao <yungao...@aliyun.com>; hililiwei <hilili...@gmail.com>
>>> Subject:Re: Sink V2 interface replacement for GlobalCommitter
>>> Martjin, thanks a lot for chiming in!
>>> Here are my concerns with adding GlobalCommitter in the
>>> PostCommitTopology
>>> 1. when we use TwoPhaseCommittingSink. We would need to create a
>>> noop/dummy committer. Actual Iceberg/DeltaLake commits happen in the
>>> PostCommit stage. The PostCommit stage should be doing some work after the
>>> commit (not for the commit).
>>> 2. GlobalCommitter is marked as @deprecated. It will be removed at a
>>> certain point. What then?
>>> Thanks,
>>> Steven
>>> On Fri, Sep 9, 2022 at 1:20 PM Krzysztof Chmielewski <
>>> krzysiek.chmielew...@gmail.com <mailto:krzysiek.chmielew...@gmail.com
>>> >> wrote:
>>> Thanks Martijn,
>>>  I'm actually trying to run our V1 Delta connector on Flink 1.15 using
>>>  SinkV1Adapter with GlobalCommitterOperator.
>>>  Having said that, I might have found a potential issue with
>>>  GlobalCommitterOperator, checkpoitining and failover recovery [1].
>>>  For "normal" scenarios it does look good though.
>>>  Regards,
>>>  Krzysztof Chmielewski
>>>  [1] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc <
>>> https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc >
>>>  pt., 9 wrz 2022 o 20:49 Martijn Visser <martijnvis...@apache.org
>>> <mailto:martijnvis...@apache.org >>
>>>  napisał(a):
>>>  > Hi all,
>>>  >
>>>  > A couple of bits from when work was being done on the new sink: V1 is
>>>  > completely simulated as V2 [1]. V2 is strictly more expressive.
>>>  >
>>>  > If there's desire to stick to the `GlobalCommitter` interface, have a
>>>  > look at the StandardSinkTopologies. Or you can just add your own more
>>>  > fitting PostCommitTopology. The important part to remember is that
>>> this
>>>  > topology is lagging one checkpoint behind in terms of
>>> fault-tolerance: it
>>>  > only receives data once the committer committed
>>>  > on notifyCheckpointComplete. Thus, the global committer needs to be
>>>  > idempotent and able to restore the actual state on recovery. That
>>>  > limitation is coming in from Flink's checkpointing behaviour and
>>> applies to
>>>  > both V1 and V2. GlobalCommitterOperator is abstracting these issues
>>> along
>>>  > with handling retries (so commits that happen much later). So it's
>>> probably
>>>  > a good place to start just with the standard topology.
>>>  >
>>>  > Best regards,
>>>  >
>>>  > Martijn
>>>  >
>>>  > [1]
>>>  >
>>>  >
>>> https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370
>>> <
>>> https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370
>>> >
>>>  >
>>>  > Op do 8 sep. 2022 om 20:51 schreef Krzysztof Chmielewski <
>>>  > krzysiek.chmielew...@gmail.com <mailto:krzysiek.chmielew...@gmail.com
>>> >>:
>>>  >
>>>  > > Hi,
>>>  > > Krzysztof Chmielewski [1] from Delta-Flink connector open source
>>>  > community
>>>  > > here [2].
>>>  > >
>>>  > > I'm totally agree with Steven on this. Sink's V1 GlobalCommitter is
>>>  > > something exactly what Flink-Delta Sink needs since it is the place
>>> where
>>>  > > we do an actual commit to the Delta Log which should be done from a
>>> one
>>>  > > place/instance.
>>>  > >
>>>  > > Currently I'm evaluating V2 for our connector and having, how Steven
>>>  > > described it a "more natural, built-in concept/support of
>>> GlobalCommitter
>>>  > > in the sink v2 interface" would be greatly appreciated.
>>>  > >
>>>  > > Cheers,
>>>  > > Krzysztof Chmielewski
>>>  > >
>>>  > > [1] https://github.com/kristoffSC <https://github.com/kristoffSC >
>>>  > > [2] https://github.com/delta-io/connectors/tree/master/flink <
>>> https://github.com/delta-io/connectors/tree/master/flink >
>>>  > >
>>>  > > czw., 8 wrz 2022 o 19:51 Steven Wu <stevenz...@gmail.com <mailto:
>>> stevenz...@gmail.com >> napisał(a):
>>>  > >
>>>  > > > Hi Yun,
>>>  > > >
>>>  > > > Thanks a lot for the reply!
>>>  > > >
>>>  > > > While we can add the global committer in the
>>> WithPostCommitTopology,
>>>  > the
>>>  > > > semantics are weird. The Commit stage actually didn't commit
>>> anything
>>>  > to
>>>  > > > the Iceberg table, and the PostCommit stage is where the Iceberg
>>> commit
>>>  > > > happens.
>>>  > > >
>>>  > > > I just took a quick look at DeltaLake Flink sink. It still uses
>>> the V1
>>>  > > sink
>>>  > > > interface [1]. I think it might have the same issue when
>>> switching to
>>>  > the
>>>  > > > V2 sink interface.
>>>  > > >
>>>  > > > For data lake storages (like Iceberg, DeltaLake) or any storage
>>> with
>>>  > > global
>>>  > > > transactional commit, it would be more natural to have a built-in
>>>  > > > concept/support of GlobalCommitter in the sink v2 interface.
>>>  > > >
>>>  > > > Thanks,
>>>  > > > Steven
>>>  > > >
>>>  > > > [1]
>>>  > > >
>>>  > > >
>>>  > >
>>>  >
>>> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
>>> <
>>> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
>>> >
>>>  > > >
>>>  > > >
>>>  > > > On Wed, Sep 7, 2022 at 2:15 AM Yun Gao
>>> <yungao...@aliyun.com.invalid>
>>>  > > > wrote:
>>>  > > >
>>>  > > > > Hi Steven, Liwei,
>>>  > > > > Very sorry for missing this mail and response very late.
>>>  > > > > I think the initial thought is indeed to use
>>> `WithPostCommitTopology`
>>>  > > as
>>>  > > > > a replacement of the original GlobalCommitter, and currently the
>>>  > > adapter
>>>  > > > of
>>>  > > > > Sink v1 on top of Sink v2 also maps the GlobalCommitter in Sink
>>> V1
>>>  > > > > interface
>>>  > > > > onto an implementation of `WithPostCommitTopology`.
>>>  > > > > Since `WithPostCommitTopology` supports arbitrary subgraph,
>>> thus It
>>>  > > seems
>>>  > > > > to
>>>  > > > > me it could support both global committer and small file
>>> compaction?
>>>  > We
>>>  > > > > might
>>>  > > > > have an `WithPostCommitTopology` implementation like
>>>  > > > > DataStream ds = add global committer;
>>>  > > > > if (enable file compaction) {
>>>  > > > > build the compaction subgraph from ds
>>>  > > > > }
>>>  > > > > Best,
>>>  > > > > Yun
>>>  > > > > [1]
>>>  > > > >
>>>  > > >
>>>  > >
>>>  >
>>> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
>>> <
>>> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
>>> >
>>>  > > > > <
>>>  > > > >
>>>  > > >
>>>  > >
>>>  >
>>> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
>>> <
>>> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
>>> >
>>>  > > > > >
>>>  > > > >
>>> ------------------------------------------------------------------
>>>  > > > > From:Steven Wu <stevenz...@gmail.com <mailto:
>>> stevenz...@gmail.com >>
>>>  > > > > Send Time:2022 Aug. 17 (Wed.) 07:30
>>>  > > > > To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org >>;
>>> hililiwei <hilili...@gmail.com <mailto:hilili...@gmail.com >>
>>>  > > > > Subject:Re: Sink V2 interface replacement for GlobalCommitter
>>>  > > > > > Plus, it will disable the future capability of small file
>>>  > compaction
>>>  > > > > stage post commit.
>>>  > > > > I should clarify this comment. if we are using the
>>>  > > > `WithPostCommitTopology`
>>>  > > > > for global committer, we would lose the capability of using the
>>> post
>>>  > > > commit
>>>  > > > > stage for small files compaction.
>>>  > > > > On Tue, Aug 16, 2022 at 9:53 AM Steven Wu <stevenz...@gmail.com
>>> <mailto:stevenz...@gmail.com >>
>>>  > > wrote:
>>>  > > > > >
>>>  > > > > > In the V1 sink interface, there is a GlobalCommitter for
>>> Iceberg.
>>>  > > With
>>>  > > > > the
>>>  > > > > > V2 sink interface, GlobalCommitter has been deprecated by
>>>  > > > > > WithPostCommitTopology. I thought the post commit stage is
>>> mainly
>>>  > for
>>>  > > > > async
>>>  > > > > > maintenance (like compaction).
>>>  > > > > >
>>>  > > > > > Are we supposed to do sth similar to the
>>>  > GlobalCommittingSinkAdapter?
>>>  > > > It
>>>  > > > > > seems like a temporary transition plan for bridging v1 sinks
>>> to v2
>>>  > > > > > interfaces.
>>>  > > > > >
>>>  > > > > > private class GlobalCommittingSinkAdapter extends
>>>  > > > > TwoPhaseCommittingSinkAdapter
>>>  > > > > > implements WithPostCommitTopology<InputT, CommT> {
>>>  > > > > > @Override
>>>  > > > > > public void
>>>  > > addPostCommitTopology(DataStream<CommittableMessage<CommT>>
>>>  > > > > committables) {
>>>  > > > > > StandardSinkTopologies.addGlobalCommitter(
>>>  > > > > > committables,
>>>  > > > > > GlobalCommitterAdapter::new,
>>>  > > > > > () -> sink.getCommittableSerializer().get());
>>>  > > > > > }
>>>  > > > > > }
>>>  > > > > >
>>>  > > > > >
>>>  > > > > > In the Iceberg PR [1] for adopting the new sink interface,
>>> Liwei
>>>  > used
>>>  > > > the
>>>  > > > > > "global" partitioner to force all committables go to a single
>>>  > > committer
>>>  > > > > > task 0. It will effectively force a global committer
>>> disguised in
>>>  > the
>>>  > > > > > parallel committers. It is a little weird and also can lead to
>>>  > > > questions
>>>  > > > > > why other committer tasks are not getting any messages. Plus,
>>> it
>>>  > will
>>>  > > > > > disable the future capability of small file compaction stage
>>> post
>>>  > > > commit.
>>>  > > > > > Hence, I am asking what is the right approach to achieve
>>> global
>>>  > > > committer
>>>  > > > > > behavior.
>>>  > > > > >
>>>  > > > > > Thanks,
>>>  > > > > > Steven
>>>  > > > > >
>>>  > > > > > [1]
>>> https://github.com/apache/iceberg/pull/4904/files#r946975047 <
>>> https://github.com/apache/iceberg/pull/4904/files#r946975047 > <
>>>  > > > > https://github.com/apache/iceberg/pull/4904/files#r946975047 <
>>> https://github.com/apache/iceberg/pull/4904/files#r946975047 > >
>>>  > > > > >
>>>  > > > >
>>>  > > >
>>>  > >
>>>  >
>>>
>>

Reply via email to