Yun,

> `writer (parallelism = n) -> committer (parallelism = 1)`

Yes, a single-parallelism committer would work for Iceberg (and probably
other similar global transaction storage). This is how Iceberg sink is
implemented today.

> When we are refactoring the job finish process in FLIP-147 toensures all
the records could be committed at the end of boundedstreaming job, we have
to desert the support for the cascade commits, which makes the cascade
commit of `committer -> global committer` not work in all cases.

This is not a concern for Iceberg, as Iceberg doesn't need cascading
"committers -> global committers".

Thanks,
Steven


On Wed, Sep 28, 2022 at 9:21 AM Yun Gao <yungao...@aliyun.com> wrote:

> Hi all,
>
> Very sorry for the long delay for it took a bit of time to do the
> investigation.
>
> @Krzysztof
> For the current unexpected behavior of GlobalCommitter
> after failover, it is indeed caused by the bugs in implementation,
> I filed an issue[1] for these bugs and I think we may fix the issue
> for 1.15.x, 1.16.x and 1.17.x before the next minor releases.
>
> @Steven @Krzysztof
> For the long-term solution >= 1.17, for supporting changing the parallelism
> of the Committer operator, I initially meant that we could set the
> parallelism of the committer operator, thus it could support arbitrary
> parallelism:
>
> 1. For ordinary sink that does not require global committer, it is still
> `writer (parallelism = n) ->
>     committer (parallelism = n)`.
> 2. If the committer is not required in the implementation of v1, now it
> could be
>     reimplemented with `writer (parallelism = n) -> committer (parallelism
> = 1)`.
> 3. If the committer is required in the implementation of v1, now it has
> several ways
>     to re-implement on v2, like `writer -> pre-committer topology to do
> the rename ->
>     commiter  (parallelism = 1)`.
>
> Do you think this would be reasonable?
>
> Also @Krzysztof I have the same question with Steven that is it possible
> directly write to
> the formal files and skip the step of renaming? Since before the meta is
> written to the Delta Log
> I guess the files are not visible to the users, thus it is safe to
> directly write to the formal files?
>
> Best,
> Yun Gao
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-29459
>
>
>
> ------------------------------------------------------------------
> From:Steven Wu <stevenz...@gmail.com>
> Send Time:2022 Sep. 14 (Wed.) 21:33
> To:Krzysztof Chmielewski <krzysiek.chmielew...@gmail.com>
> Cc:dev <dev@flink.apache.org>; Yun Gao <yungao...@aliyun.com>; hililiwei <
> hilili...@gmail.com>
> Subject:Re: Sink V2 interface replacement for GlobalCommitter
>
> Krzysztof, no worries. We are discussing the same topic (how to support
> storage with globally transactional commits).
>
> > In Delta Sink connector we actually use both Committer [1] and
> GlobalCommitter [2]. The former, since we are using Flink's Parquet file
> writers is doing a very simple job of "of renaming the hidden file to make
> it visible and removing from the name some 'in-progress file' marker". The
> GlobalCommitter is committing data to the Delta Log.
>
> Curious if the writers can write the visible files directly (vs hidden
> files first then renamed by committer). Since there is a global committer
> to commit the data files when Flink checkpoint completes, job failure or
> restart shouldn't cause data file dups or loss. I probably missed some
> context here.
>
> On Wed, Sep 14, 2022 at 5:20 AM Krzysztof Chmielewski <
> krzysiek.chmielew...@gmail.com> wrote:
> Hi Yun,
> Thanks for your input.
>
> In Delta Sink connector we actually use both Committer [1] and
> GlobalCommitter [2]. The former, since we are using Flink's Parquet file
> writers is doing a very simple job of "of renaming the hidden file to make
> it visible and removing from the name some 'in-progress file' marker". The
> GlobalCommitter is committing data to the Delta Log.
>
> With this design, having many instances of Committers actually has a
> benefit for us. Plus we would see some next features in our connector that
> would benefit from separate Committers with parallelism level higher than 1.
>
> How I understood your suggestion Yun (and maybe It was a wrong
> interpretation) is to use both Committer and GlobalCommitter but to enforce
> parallelism level 1 on the former. The GlobalCommitter created by Flink's
> 1.15 SinkV1Adapter has parallelism 1 as expected and how it was in Flink <
> 1.15.
>
> Anyways, I've play a little bit with the Flink code and I managed to
> achieved this [3]. After some additional changes which I will describe
> below, our test described in [4] passed without any data loss and no
> Exceptions thrown by Flink.
>
> However changing parallelism of Committer operator to hardcoded value 1
> was not enough and I had to do two more things:
> 1. add rebalance step (RebalancePartitioner) to graph between writer and
> committer since now they have different parallelism level and default
> partitioner was FORWARD that caused an exception to be thrown - BTW this is
> clear and understood
> 2. modify Flinks CommittableCollectorSerializer [5] and this is I believe
> an important thing.
>
> The modification I had to made was caused by "Duplicate Key" exception
> from deserialize(int version, byte[] serialized) method from line 143 of
> [5] where we process a stream of SubtaskCommittableManager objects and
> collect it into to the Map. The map key is a subtaskId
> from SubtaskCommittableManager object.
>
> After Task Manager recovery it may happen that List of
> SubtaskCommittableManager that is processed in that  deserialize method
> will contain two SubtaskCommittableManager for the same subtask ID. What I
> did is that for such a case I call SubtaskCommittableManager .merge(...)
> method.
>
> With those modifications our Delta test [7] started to pass on Flink 1.15.
>
> I do not know whether setting parallelism level of the Committer to 1 is a
> right thing to do. Like I mentioned, Committer is doing some work in our
> Sink implementation and we might have more usage for it in next features we
> would like to add that would benefit from keeping parallelism level equal
> to writers count.
>
> I still think there is some issue with the V2 architecture for topologies
> with GlobalCommitter and failover scenarios [4] and even that duplicated
> key in [5] described above is another case, maybe we should never have two
> entries for same subtaskId. That I don't know.
>
> P.S.
> Steven, apologies for hijacking the thread a little bit.
>
> Thanks,
> Krzysztof Chmielewski
>
> [1]
> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaCommitter.java
> [2]
> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
> [3]
> https://drive.google.com/file/d/1kU0R9nLZneJBDAkgNiaRc90dLGycyTec/view?usp=sharing
> [4] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc
> [5]
> https://github.com/apache/flink/blob/release-1.15/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java
> [7]
> https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java
>
> śr., 14 wrz 2022 o 05:26 Steven Wu <stevenz...@gmail.com> napisał(a):
> > setting the committer parallelism to 1.
>
> Yun, setting the parallelism to 1 is essentially a global committer. That
> would work. not sure about the implications to other parts of the v2 sink
> interface.
>
> On Tue, Sep 13, 2022 at 2:17 PM Krzysztof Chmielewski <
> krzysiek.chmielew...@gmail.com> wrote:
> Hi  Martijn
> Could you clarify a little bit what do you mean by:
>
> "The important part to remember is that this
> topology is lagging one checkpoint behind in terms of fault-tolerance: it
> only receives data once the committer committed"
>
> What are the implications?
>
> Thanks,
> Krzysztof Chmielewski
>
> wt., 13 wrz 2022 o 09:57 Yun Gao <yungao...@aliyun.com.invalid>
> napisał(a):
> Hi,
> Very sorry for the late reply for being in the holiday.
> And also very thanks for the discussion, it also reminds me
> one more background on the change of the GlobalCommitter:
> When we are refactoring the job finish process in FLIP-147 to
> ensures all the records could be committed at the end of bounded
> streaming job, we have to desert the support for the cascade commits,
> which makes the cascade commit of `committer -> global committer` not work
> in all cases.
> For the current issues, one possible alternative option from my side is
> that we
> may support setting the committer parallelism to 1. Could this option
> solves
> the issue in the current scenarios? I'll also have a double check with if
> it could be implemented and the failed tests Krzysztof met.
> Best,
> Yun
> ------------------------------------------------------------------
> From:Steven Wu <stevenz...@gmail.com>
> Send Time:2022 Sep. 10 (Sat.) 11:31
> To:dev <dev@flink.apache.org>
> Cc:Yun Gao <yungao...@aliyun.com>; hililiwei <hilili...@gmail.com>
> Subject:Re: Sink V2 interface replacement for GlobalCommitter
> Martjin, thanks a lot for chiming in!
> Here are my concerns with adding GlobalCommitter in the PostCommitTopology
> 1. when we use TwoPhaseCommittingSink. We would need to create a
> noop/dummy committer. Actual Iceberg/DeltaLake commits happen in the
> PostCommit stage. The PostCommit stage should be doing some work after the
> commit (not for the commit).
> 2. GlobalCommitter is marked as @deprecated. It will be removed at a
> certain point. What then?
> Thanks,
> Steven
> On Fri, Sep 9, 2022 at 1:20 PM Krzysztof Chmielewski <
> krzysiek.chmielew...@gmail.com <mailto:krzysiek.chmielew...@gmail.com >>
> wrote:
> Thanks Martijn,
>  I'm actually trying to run our V1 Delta connector on Flink 1.15 using
>  SinkV1Adapter with GlobalCommitterOperator.
>  Having said that, I might have found a potential issue with
>  GlobalCommitterOperator, checkpoitining and failover recovery [1].
>  For "normal" scenarios it does look good though.
>  Regards,
>  Krzysztof Chmielewski
>  [1] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc <
> https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc >
>  pt., 9 wrz 2022 o 20:49 Martijn Visser <martijnvis...@apache.org <mailto:
> martijnvis...@apache.org >>
>  napisał(a):
>  > Hi all,
>  >
>  > A couple of bits from when work was being done on the new sink: V1 is
>  > completely simulated as V2 [1]. V2 is strictly more expressive.
>  >
>  > If there's desire to stick to the `GlobalCommitter` interface, have a
>  > look at the StandardSinkTopologies. Or you can just add your own more
>  > fitting PostCommitTopology. The important part to remember is that this
>  > topology is lagging one checkpoint behind in terms of fault-tolerance:
> it
>  > only receives data once the committer committed
>  > on notifyCheckpointComplete. Thus, the global committer needs to be
>  > idempotent and able to restore the actual state on recovery. That
>  > limitation is coming in from Flink's checkpointing behaviour and
> applies to
>  > both V1 and V2. GlobalCommitterOperator is abstracting these issues
> along
>  > with handling retries (so commits that happen much later). So it's
> probably
>  > a good place to start just with the standard topology.
>  >
>  > Best regards,
>  >
>  > Martijn
>  >
>  > [1]
>  >
>  >
> https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370
> <
> https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370
> >
>  >
>  > Op do 8 sep. 2022 om 20:51 schreef Krzysztof Chmielewski <
>  > krzysiek.chmielew...@gmail.com <mailto:krzysiek.chmielew...@gmail.com
> >>:
>  >
>  > > Hi,
>  > > Krzysztof Chmielewski [1] from Delta-Flink connector open source
>  > community
>  > > here [2].
>  > >
>  > > I'm totally agree with Steven on this. Sink's V1 GlobalCommitter is
>  > > something exactly what Flink-Delta Sink needs since it is the place
> where
>  > > we do an actual commit to the Delta Log which should be done from a
> one
>  > > place/instance.
>  > >
>  > > Currently I'm evaluating V2 for our connector and having, how Steven
>  > > described it a "more natural, built-in concept/support of
> GlobalCommitter
>  > > in the sink v2 interface" would be greatly appreciated.
>  > >
>  > > Cheers,
>  > > Krzysztof Chmielewski
>  > >
>  > > [1] https://github.com/kristoffSC <https://github.com/kristoffSC >
>  > > [2] https://github.com/delta-io/connectors/tree/master/flink <
> https://github.com/delta-io/connectors/tree/master/flink >
>  > >
>  > > czw., 8 wrz 2022 o 19:51 Steven Wu <stevenz...@gmail.com <mailto:
> stevenz...@gmail.com >> napisał(a):
>  > >
>  > > > Hi Yun,
>  > > >
>  > > > Thanks a lot for the reply!
>  > > >
>  > > > While we can add the global committer in the WithPostCommitTopology,
>  > the
>  > > > semantics are weird. The Commit stage actually didn't commit
> anything
>  > to
>  > > > the Iceberg table, and the PostCommit stage is where the Iceberg
> commit
>  > > > happens.
>  > > >
>  > > > I just took a quick look at DeltaLake Flink sink. It still uses the
> V1
>  > > sink
>  > > > interface [1]. I think it might have the same issue when switching
> to
>  > the
>  > > > V2 sink interface.
>  > > >
>  > > > For data lake storages (like Iceberg, DeltaLake) or any storage with
>  > > global
>  > > > transactional commit, it would be more natural to have a built-in
>  > > > concept/support of GlobalCommitter in the sink v2 interface.
>  > > >
>  > > > Thanks,
>  > > > Steven
>  > > >
>  > > > [1]
>  > > >
>  > > >
>  > >
>  >
> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
> <
> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
> >
>  > > >
>  > > >
>  > > > On Wed, Sep 7, 2022 at 2:15 AM Yun Gao <yungao...@aliyun.com.invalid
> >
>  > > > wrote:
>  > > >
>  > > > > Hi Steven, Liwei,
>  > > > > Very sorry for missing this mail and response very late.
>  > > > > I think the initial thought is indeed to use
> `WithPostCommitTopology`
>  > > as
>  > > > > a replacement of the original GlobalCommitter, and currently the
>  > > adapter
>  > > > of
>  > > > > Sink v1 on top of Sink v2 also maps the GlobalCommitter in Sink V1
>  > > > > interface
>  > > > > onto an implementation of `WithPostCommitTopology`.
>  > > > > Since `WithPostCommitTopology` supports arbitrary subgraph, thus
> It
>  > > seems
>  > > > > to
>  > > > > me it could support both global committer and small file
> compaction?
>  > We
>  > > > > might
>  > > > > have an `WithPostCommitTopology` implementation like
>  > > > > DataStream ds = add global committer;
>  > > > > if (enable file compaction) {
>  > > > > build the compaction subgraph from ds
>  > > > > }
>  > > > > Best,
>  > > > > Yun
>  > > > > [1]
>  > > > >
>  > > >
>  > >
>  >
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> <
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> >
>  > > > > <
>  > > > >
>  > > >
>  > >
>  >
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> <
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> >
>  > > > > >
>  > > > > ------------------------------------------------------------------
>  > > > > From:Steven Wu <stevenz...@gmail.com <mailto:stevenz...@gmail.com
> >>
>  > > > > Send Time:2022 Aug. 17 (Wed.) 07:30
>  > > > > To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org >>;
> hililiwei <hilili...@gmail.com <mailto:hilili...@gmail.com >>
>  > > > > Subject:Re: Sink V2 interface replacement for GlobalCommitter
>  > > > > > Plus, it will disable the future capability of small file
>  > compaction
>  > > > > stage post commit.
>  > > > > I should clarify this comment. if we are using the
>  > > > `WithPostCommitTopology`
>  > > > > for global committer, we would lose the capability of using the
> post
>  > > > commit
>  > > > > stage for small files compaction.
>  > > > > On Tue, Aug 16, 2022 at 9:53 AM Steven Wu <stevenz...@gmail.com
> <mailto:stevenz...@gmail.com >>
>  > > wrote:
>  > > > > >
>  > > > > > In the V1 sink interface, there is a GlobalCommitter for
> Iceberg.
>  > > With
>  > > > > the
>  > > > > > V2 sink interface, GlobalCommitter has been deprecated by
>  > > > > > WithPostCommitTopology. I thought the post commit stage is
> mainly
>  > for
>  > > > > async
>  > > > > > maintenance (like compaction).
>  > > > > >
>  > > > > > Are we supposed to do sth similar to the
>  > GlobalCommittingSinkAdapter?
>  > > > It
>  > > > > > seems like a temporary transition plan for bridging v1 sinks to
> v2
>  > > > > > interfaces.
>  > > > > >
>  > > > > > private class GlobalCommittingSinkAdapter extends
>  > > > > TwoPhaseCommittingSinkAdapter
>  > > > > > implements WithPostCommitTopology<InputT, CommT> {
>  > > > > > @Override
>  > > > > > public void
>  > > addPostCommitTopology(DataStream<CommittableMessage<CommT>>
>  > > > > committables) {
>  > > > > > StandardSinkTopologies.addGlobalCommitter(
>  > > > > > committables,
>  > > > > > GlobalCommitterAdapter::new,
>  > > > > > () -> sink.getCommittableSerializer().get());
>  > > > > > }
>  > > > > > }
>  > > > > >
>  > > > > >
>  > > > > > In the Iceberg PR [1] for adopting the new sink interface, Liwei
>  > used
>  > > > the
>  > > > > > "global" partitioner to force all committables go to a single
>  > > committer
>  > > > > > task 0. It will effectively force a global committer disguised
> in
>  > the
>  > > > > > parallel committers. It is a little weird and also can lead to
>  > > > questions
>  > > > > > why other committer tasks are not getting any messages. Plus, it
>  > will
>  > > > > > disable the future capability of small file compaction stage
> post
>  > > > commit.
>  > > > > > Hence, I am asking what is the right approach to achieve global
>  > > > committer
>  > > > > > behavior.
>  > > > > >
>  > > > > > Thanks,
>  > > > > > Steven
>  > > > > >
>  > > > > > [1]
> https://github.com/apache/iceberg/pull/4904/files#r946975047 <
> https://github.com/apache/iceberg/pull/4904/files#r946975047 > <
>  > > > > https://github.com/apache/iceberg/pull/4904/files#r946975047 <
> https://github.com/apache/iceberg/pull/4904/files#r946975047 > >
>  > > > > >
>  > > > >
>  > > >
>  > >
>  >
>
>
>

Reply via email to