Hi all,
Very sorry for the long delay for it took a bit of time to do the 
investigation. 
@Krzysztof 
For the current unexpected behavior of GlobalCommitter
after failover, it is indeed caused by the bugs in implementation,
I filed an issue[1] for these bugs and I think we may fix the issue 
for 1.15.x, 1.16.x and 1.17.x before the next minor releases.
@Steven @Krzysztof
For the long-term solution >= 1.17, for supporting changing the parallelism
of the Committer operator, I initially meant that we could set the
parallelism of the committer operator, thus it could support arbitrary 
parallelism:
1. For ordinary sink that does not require global committer, it is still 
`writer (parallelism = n) -> 
 committer (parallelism = n)`. 
2. If the committer is not required in the implementation of v1, now it could be
 reimplemented with `writer (parallelism = n) -> committer (parallelism = 1)`.
3. If the committer is required in the implementation of v1, now it has several 
ways
 to re-implement on v2, like `writer -> pre-committer topology to do the rename 
->
 commiter (parallelism = 1)`.
Do you think this would be reasonable?
Also @Krzysztof I have the same question with Steven that is it possible 
directly write to
the formal files and skip the step of renaming? Since before the meta is 
written to the Delta Log
I guess the files are not visible to the users, thus it is safe to directly 
write to the formal files?
Best,
Yun Gao
[1] https://issues.apache.org/jira/browse/FLINK-29459 
<https://issues.apache.org/jira/browse/FLINK-29459 >
------------------------------------------------------------------
From:Steven Wu <stevenz...@gmail.com>
Send Time:2022 Sep. 14 (Wed.) 21:33
To:Krzysztof Chmielewski <krzysiek.chmielew...@gmail.com>
Cc:dev <dev@flink.apache.org>; Yun Gao <yungao...@aliyun.com>; hililiwei 
<hilili...@gmail.com>
Subject:Re: Sink V2 interface replacement for GlobalCommitter
Krzysztof, no worries. We are discussing the same topic (how to support storage 
with globally transactional commits).
> In Delta Sink connector we actually use both Committer [1] and 
> GlobalCommitter [2]. The former, since we are using Flink's Parquet file 
> writers is doing a very simple job of "of renaming the hidden file to make it 
> visible and removing from the name some 'in-progress file' marker". The 
> GlobalCommitter is committing data to the Delta Log.
Curious if the writers can write the visible files directly (vs hidden files 
first then renamed by committer). Since there is a global committer to commit 
the data files when Flink checkpoint completes, job failure or restart 
shouldn't cause data file dups or loss. I probably missed some context here.
On Wed, Sep 14, 2022 at 5:20 AM Krzysztof Chmielewski 
<krzysiek.chmielew...@gmail.com <mailto:krzysiek.chmielew...@gmail.com >> wrote:
Hi Yun,
Thanks for your input.
In Delta Sink connector we actually use both Committer [1] and GlobalCommitter 
[2]. The former, since we are using Flink's Parquet file writers is doing a 
very simple job of "of renaming the hidden file to make it visible and removing 
from the name some 'in-progress file' marker". The GlobalCommitter is 
committing data to the Delta Log.
With this design, having many instances of Committers actually has a benefit 
for us. Plus we would see some next features in our connector that would 
benefit from separate Committers with parallelism level higher than 1.
How I understood your suggestion Yun (and maybe It was a wrong interpretation) 
is to use both Committer and GlobalCommitter but to enforce parallelism level 1 
on the former. The GlobalCommitter created by Flink's 1.15 SinkV1Adapter has 
parallelism 1 as expected and how it was in Flink < 1.15.
Anyways, I've play a little bit with the Flink code and I managed to achieved 
this [3]. After some additional changes which I will describe below, our test 
described in [4] passed without any data loss and no Exceptions thrown by Flink.
However changing parallelism of Committer operator to hardcoded value 1 was not 
enough and I had to do two more things:
1. add rebalance step (RebalancePartitioner) to graph between writer and 
committer since now they have different parallelism level and default 
partitioner was FORWARD that caused an exception to be thrown - BTW this is 
clear and understood
2. modify Flinks CommittableCollectorSerializer [5] and this is I believe an 
important thing.
The modification I had to made was caused by "Duplicate Key" exception from 
deserialize(int version, byte[] serialized) method from line 143 of [5] where 
we process a stream of SubtaskCommittableManager objects and collect it into to 
the Map. The map key is a subtaskId from SubtaskCommittableManager object.
After Task Manager recovery it may happen that List of 
SubtaskCommittableManager that is processed in that deserialize method will 
contain two SubtaskCommittableManager for the same subtask ID. What I did is 
that for such a case I call SubtaskCommittableManager .merge(...) method.
With those modifications our Delta test [7] started to pass on Flink 1.15.
I do not know whether setting parallelism level of the Committer to 1 is a 
right thing to do. Like I mentioned, Committer is doing some work in our Sink 
implementation and we might have more usage for it in next features we would 
like to add that would benefit from keeping parallelism level equal to writers 
count.
I still think there is some issue with the V2 architecture for topologies with 
GlobalCommitter and failover scenarios [4] and even that duplicated key in [5] 
described above is another case, maybe we should never have two entries for 
same subtaskId. That I don't know.
P.S.
Steven, apologies for hijacking the thread a little bit.
Thanks,
Krzysztof Chmielewski
[1] 
https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaCommitter.java
 
<https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaCommitter.java
 >
[2] 
https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
 
<https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
 >
[3] 
https://drive.google.com/file/d/1kU0R9nLZneJBDAkgNiaRc90dLGycyTec/view?usp=sharing
 
<https://drive.google.com/file/d/1kU0R9nLZneJBDAkgNiaRc90dLGycyTec/view?usp=sharing
 >
[4] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc 
<https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc >
[5] 
https://github.com/apache/flink/blob/release-1.15/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java
 
<https://github.com/apache/flink/blob/release-1.15/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java
 >
[7] 
https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java
 
<https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java
 >
śr., 14 wrz 2022 o 05:26 Steven Wu <stevenz...@gmail.com 
<mailto:stevenz...@gmail.com >> napisał(a):
> setting the committer parallelism to 1.
Yun, setting the parallelism to 1 is essentially a global committer. That would 
work. not sure about the implications to other parts of the v2 sink interface.
On Tue, Sep 13, 2022 at 2:17 PM Krzysztof Chmielewski 
<krzysiek.chmielew...@gmail.com <mailto:krzysiek.chmielew...@gmail.com >> wrote:
Hi Martijn
Could you clarify a little bit what do you mean by:
"The important part to remember is that this
topology is lagging one checkpoint behind in terms of fault-tolerance: it
only receives data once the committer committed"
What are the implications? 
Thanks,
Krzysztof Chmielewski
wt., 13 wrz 2022 o 09:57 Yun Gao <yungao...@aliyun.com.invalid> napisał(a):
Hi,
 Very sorry for the late reply for being in the holiday. 
 And also very thanks for the discussion, it also reminds me 
 one more background on the change of the GlobalCommitter:
 When we are refactoring the job finish process in FLIP-147 to
 ensures all the records could be committed at the end of bounded
 streaming job, we have to desert the support for the cascade commits, 
 which makes the cascade commit of `committer -> global committer` not work
 in all cases. 
 For the current issues, one possible alternative option from my side is that we
 may support setting the committer parallelism to 1. Could this option solves
 the issue in the current scenarios? I'll also have a double check with if
 it could be implemented and the failed tests Krzysztof met. 
 Best,
 Yun
 ------------------------------------------------------------------
 From:Steven Wu <stevenz...@gmail.com <mailto:stevenz...@gmail.com >>
 Send Time:2022 Sep. 10 (Sat.) 11:31
 To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org >>
 Cc:Yun Gao <yungao...@aliyun.com <mailto:yungao...@aliyun.com >>; hililiwei 
<hilili...@gmail.com <mailto:hilili...@gmail.com >>
 Subject:Re: Sink V2 interface replacement for GlobalCommitter
 Martjin, thanks a lot for chiming in!
 Here are my concerns with adding GlobalCommitter in the PostCommitTopology 
 1. when we use TwoPhaseCommittingSink. We would need to create a noop/dummy 
committer. Actual Iceberg/DeltaLake commits happen in the PostCommit stage. The 
PostCommit stage should be doing some work after the commit (not for the 
commit).
 2. GlobalCommitter is marked as @deprecated. It will be removed at a certain 
point. What then?
 Thanks,
 Steven
 On Fri, Sep 9, 2022 at 1:20 PM Krzysztof Chmielewski 
<krzysiek.chmielew...@gmail.com <mailto:krzysiek.chmielew...@gmail.com > 
<mailto:krzysiek.chmielew...@gmail.com <mailto:krzysiek.chmielew...@gmail.com > 
>> wrote:
 Thanks Martijn,
 I'm actually trying to run our V1 Delta connector on Flink 1.15 using
 SinkV1Adapter with GlobalCommitterOperator.
 Having said that, I might have found a potential issue with
 GlobalCommitterOperator, checkpoitining and failover recovery [1].
 For "normal" scenarios it does look good though.
 Regards,
 Krzysztof Chmielewski
 [1] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc 
<https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc > 
<https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc 
<https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc > >
 pt., 9 wrz 2022 o 20:49 Martijn Visser <martijnvis...@apache.org 
<mailto:martijnvis...@apache.org > <mailto:martijnvis...@apache.org 
<mailto:martijnvis...@apache.org > >>
 napisał(a):
 > Hi all,
 >
 > A couple of bits from when work was being done on the new sink: V1 is
 > completely simulated as V2 [1]. V2 is strictly more expressive.
 >
 > If there's desire to stick to the `GlobalCommitter` interface, have a
 > look at the StandardSinkTopologies. Or you can just add your own more
 > fitting PostCommitTopology. The important part to remember is that this
 > topology is lagging one checkpoint behind in terms of fault-tolerance: it
 > only receives data once the committer committed
 > on notifyCheckpointComplete. Thus, the global committer needs to be
 > idempotent and able to restore the actual state on recovery. That
 > limitation is coming in from Flink's checkpointing behaviour and applies to
 > both V1 and V2. GlobalCommitterOperator is abstracting these issues along
 > with handling retries (so commits that happen much later). So it's probably
 > a good place to start just with the standard topology.
 >
 > Best regards,
 >
 > Martijn
 >
 > [1]
 >
 > https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370
 >  
 > <https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370
 >  > 
 > <https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370
 >  
 > <https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370
 >  > >
 >
 > Op do 8 sep. 2022 om 20:51 schreef Krzysztof Chmielewski <
 > krzysiek.chmielew...@gmail.com <mailto:krzysiek.chmielew...@gmail.com > 
 > <mailto:krzysiek.chmielew...@gmail.com 
 > <mailto:krzysiek.chmielew...@gmail.com > >>:
 >
 > > Hi,
 > > Krzysztof Chmielewski [1] from Delta-Flink connector open source
 > community
 > > here [2].
 > >
 > > I'm totally agree with Steven on this. Sink's V1 GlobalCommitter is
 > > something exactly what Flink-Delta Sink needs since it is the place where
 > > we do an actual commit to the Delta Log which should be done from a one
 > > place/instance.
 > >
 > > Currently I'm evaluating V2 for our connector and having, how Steven
 > > described it a "more natural, built-in concept/support of GlobalCommitter
 > > in the sink v2 interface" would be greatly appreciated.
 > >
 > > Cheers,
 > > Krzysztof Chmielewski
 > >
 > > [1] https://github.com/kristoffSC <https://github.com/kristoffSC > 
 > > <https://github.com/kristoffSC <https://github.com/kristoffSC > >
 > > [2] https://github.com/delta-io/connectors/tree/master/flink 
 > > <https://github.com/delta-io/connectors/tree/master/flink > 
 > > <https://github.com/delta-io/connectors/tree/master/flink 
 > > <https://github.com/delta-io/connectors/tree/master/flink > >
 > >
 > > czw., 8 wrz 2022 o 19:51 Steven Wu <stevenz...@gmail.com 
 > > <mailto:stevenz...@gmail.com > <mailto:stevenz...@gmail.com 
 > > <mailto:stevenz...@gmail.com > >> napisał(a):
 > >
 > > > Hi Yun,
 > > >
 > > > Thanks a lot for the reply!
 > > >
 > > > While we can add the global committer in the WithPostCommitTopology,
 > the
 > > > semantics are weird. The Commit stage actually didn't commit anything
 > to
 > > > the Iceberg table, and the PostCommit stage is where the Iceberg commit
 > > > happens.
 > > >
 > > > I just took a quick look at DeltaLake Flink sink. It still uses the V1
 > > sink
 > > > interface [1]. I think it might have the same issue when switching to
 > the
 > > > V2 sink interface.
 > > >
 > > > For data lake storages (like Iceberg, DeltaLake) or any storage with
 > > global
 > > > transactional commit, it would be more natural to have a built-in
 > > > concept/support of GlobalCommitter in the sink v2 interface.
 > > >
 > > > Thanks,
 > > > Steven
 > > >
 > > > [1]
 > > >
 > > >
 > >
 > https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
 >  
 > <https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
 >  > 
 > <https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
 >  
 > <https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
 >  > >
 > > >
 > > >
 > > > On Wed, Sep 7, 2022 at 2:15 AM Yun Gao <yungao...@aliyun.com.invalid>
 > > > wrote:
 > > >
 > > > > Hi Steven, Liwei,
 > > > > Very sorry for missing this mail and response very late.
 > > > > I think the initial thought is indeed to use `WithPostCommitTopology`
 > > as
 > > > > a replacement of the original GlobalCommitter, and currently the
 > > adapter
 > > > of
 > > > > Sink v1 on top of Sink v2 also maps the GlobalCommitter in Sink V1
 > > > > interface
 > > > > onto an implementation of `WithPostCommitTopology`.
 > > > > Since `WithPostCommitTopology` supports arbitrary subgraph, thus It
 > > seems
 > > > > to
 > > > > me it could support both global committer and small file compaction?
 > We
 > > > > might
 > > > > have an `WithPostCommitTopology` implementation like
 > > > > DataStream ds = add global committer;
 > > > > if (enable file compaction) {
 > > > > build the compaction subgraph from ds
 > > > > }
 > > > > Best,
 > > > > Yun
 > > > > [1]
 > > > >
 > > >
 > >
 > https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
 >  
 > <https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
 >  > 
 > <https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
 >  
 > <https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
 >  > >
 > > > > <
 > > > >
 > > >
 > >
 > https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
 >  
 > <https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
 >  > 
 > <https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
 >  
 > <https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
 >  > >
 > > > > >
 > > > > ------------------------------------------------------------------
 > > > > From:Steven Wu <stevenz...@gmail.com <mailto:stevenz...@gmail.com > 
 > > > > <mailto:stevenz...@gmail.com <mailto:stevenz...@gmail.com > >>
 > > > > Send Time:2022 Aug. 17 (Wed.) 07:30
 > > > > To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org > 
 > > > > <mailto:dev@flink.apache.org <mailto:dev@flink.apache.org > >>; 
 > > > > hililiwei <hilili...@gmail.com <mailto:hilili...@gmail.com > 
 > > > > <mailto:hilili...@gmail.com <mailto:hilili...@gmail.com > >>
 > > > > Subject:Re: Sink V2 interface replacement for GlobalCommitter
 > > > > > Plus, it will disable the future capability of small file
 > compaction
 > > > > stage post commit.
 > > > > I should clarify this comment. if we are using the
 > > > `WithPostCommitTopology`
 > > > > for global committer, we would lose the capability of using the post
 > > > commit
 > > > > stage for small files compaction.
 > > > > On Tue, Aug 16, 2022 at 9:53 AM Steven Wu <stevenz...@gmail.com 
 > > > > <mailto:stevenz...@gmail.com > <mailto:stevenz...@gmail.com 
 > > > > <mailto:stevenz...@gmail.com > >>
 > > wrote:
 > > > > >
 > > > > > In the V1 sink interface, there is a GlobalCommitter for Iceberg.
 > > With
 > > > > the
 > > > > > V2 sink interface, GlobalCommitter has been deprecated by
 > > > > > WithPostCommitTopology. I thought the post commit stage is mainly
 > for
 > > > > async
 > > > > > maintenance (like compaction).
 > > > > >
 > > > > > Are we supposed to do sth similar to the
 > GlobalCommittingSinkAdapter?
 > > > It
 > > > > > seems like a temporary transition plan for bridging v1 sinks to v2
 > > > > > interfaces.
 > > > > >
 > > > > > private class GlobalCommittingSinkAdapter extends
 > > > > TwoPhaseCommittingSinkAdapter
 > > > > > implements WithPostCommitTopology<InputT, CommT> {
 > > > > > @Override
 > > > > > public void
 > > addPostCommitTopology(DataStream<CommittableMessage<CommT>>
 > > > > committables) {
 > > > > > StandardSinkTopologies.addGlobalCommitter(
 > > > > > committables,
 > > > > > GlobalCommitterAdapter::new,
 > > > > > () -> sink.getCommittableSerializer().get());
 > > > > > }
 > > > > > }
 > > > > >
 > > > > >
 > > > > > In the Iceberg PR [1] for adopting the new sink interface, Liwei
 > used
 > > > the
 > > > > > "global" partitioner to force all committables go to a single
 > > committer
 > > > > > task 0. It will effectively force a global committer disguised in
 > the
 > > > > > parallel committers. It is a little weird and also can lead to
 > > > questions
 > > > > > why other committer tasks are not getting any messages. Plus, it
 > will
 > > > > > disable the future capability of small file compaction stage post
 > > > commit.
 > > > > > Hence, I am asking what is the right approach to achieve global
 > > > committer
 > > > > > behavior.
 > > > > >
 > > > > > Thanks,
 > > > > > Steven
 > > > > >
 > > > > > [1] https://github.com/apache/iceberg/pull/4904/files#r946975047 
 > > > > > <https://github.com/apache/iceberg/pull/4904/files#r946975047 > 
 > > > > > <https://github.com/apache/iceberg/pull/4904/files#r946975047 
 > > > > > <https://github.com/apache/iceberg/pull/4904/files#r946975047 > > <
 > > > > https://github.com/apache/iceberg/pull/4904/files#r946975047 
 > > > > <https://github.com/apache/iceberg/pull/4904/files#r946975047 > 
 > > > > <https://github.com/apache/iceberg/pull/4904/files#r946975047 
 > > > > <https://github.com/apache/iceberg/pull/4904/files#r946975047 > > >
 > > > > >
 > > > >
 > > >
 > >
 >

Reply via email to