Hi Liwei,

I think it's indeed better to have a separate discussion thread on that.

Best regards,

Martijn

On Mon, Oct 10, 2022 at 7:22 AM liwei li <hilili...@gmail.com> wrote:

> Thanks for the discussion.
>
> Favor schemes that allow for custom committer parallelism. This will help
> us better use the new unified sink.
>
> As an added point, in the current scheme, if I want to get some task
> information, such as jobid operatorid, in writer and commiter methods, it
> is not easy. flink does not expose this information. But this information
> is useful for saving snapshot information, naming files, and so on. Should
> we consider improving it? Of course. That's not what's in this thread, but
> I'm bringing it up for reference.
>
> Thanks,
> Liwei
>
> On Thu, Sep 29, 2022 at 02:44 Steven Wu <stevenz...@gmail.com> wrote:
>
> > Yun,
> >
> > > `writer (parallelism = n) -> committer (parallelism = 1)`
> >
> > Yes, a single-parallelism committer would work for Iceberg (and probably
> > other similar global transaction storage). This is how Iceberg sink is
> > implemented today.
> >
> > > When we are refactoring the job finish process in FLIP-147 toensures
> > all the records could be committed at the end of boundedstreaming job, we
> > have to desert the support for the cascade commits, which makes the
> > cascade commit of `committer -> global committer` not work in all cases.
> >
> > This is not a concern for Iceberg, as Iceberg doesn't need cascading
> > "committers -> global committers".
> >
> > Thanks,
> > Steven
> >
> >
> > On Wed, Sep 28, 2022 at 9:21 AM Yun Gao <yungao...@aliyun.com> wrote:
> >
> >> Hi all,
> >>
> >> Very sorry for the long delay for it took a bit of time to do the
> >> investigation.
> >>
> >> @Krzysztof
> >> For the current unexpected behavior of GlobalCommitter
> >> after failover, it is indeed caused by the bugs in implementation,
> >> I filed an issue[1] for these bugs and I think we may fix the issue
> >> for 1.15.x, 1.16.x and 1.17.x before the next minor releases.
> >>
> >> @Steven @Krzysztof
> >> For the long-term solution >= 1.17, for supporting changing the
> >> parallelism
> >> of the Committer operator, I initially meant that we could set the
> >> parallelism of the committer operator, thus it could support arbitrary
> >> parallelism:
> >>
> >> 1. For ordinary sink that does not require global committer, it is still
> >> `writer (parallelism = n) ->
> >>     committer (parallelism = n)`.
> >> 2. If the committer is not required in the implementation of v1, now it
> >> could be
> >>     reimplemented with `writer (parallelism = n) -> committer
> >> (parallelism = 1)`.
> >> 3. If the committer is required in the implementation of v1, now it has
> >> several ways
> >>     to re-implement on v2, like `writer -> pre-committer topology to do
> >> the rename ->
> >>     commiter  (parallelism = 1)`.
> >>
> >> Do you think this would be reasonable?
> >>
> >> Also @Krzysztof I have the same question with Steven that is it possible
> >> directly write to
> >> the formal files and skip the step of renaming? Since before the meta is
> >> written to the Delta Log
> >> I guess the files are not visible to the users, thus it is safe to
> >> directly write to the formal files?
> >>
> >> Best,
> >> Yun Gao
> >>
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-29459
> >>
> >>
> >>
> >> ------------------------------------------------------------------
> >> From:Steven Wu <stevenz...@gmail.com>
> >> Send Time:2022 Sep. 14 (Wed.) 21:33
> >> To:Krzysztof Chmielewski <krzysiek.chmielew...@gmail.com>
> >> Cc:dev <dev@flink.apache.org>; Yun Gao <yungao...@aliyun.com>;
> hililiwei
> >> <hilili...@gmail.com>
> >> Subject:Re: Sink V2 interface replacement for GlobalCommitter
> >>
> >> Krzysztof, no worries. We are discussing the same topic (how to support
> >> storage with globally transactional commits).
> >>
> >> > In Delta Sink connector we actually use both Committer [1] and
> >> GlobalCommitter [2]. The former, since we are using Flink's Parquet file
> >> writers is doing a very simple job of "of renaming the hidden file to
> make
> >> it visible and removing from the name some 'in-progress file' marker".
> The
> >> GlobalCommitter is committing data to the Delta Log.
> >>
> >> Curious if the writers can write the visible files directly (vs hidden
> >> files first then renamed by committer). Since there is a global
> committer
> >> to commit the data files when Flink checkpoint completes, job failure or
> >> restart shouldn't cause data file dups or loss. I probably missed some
> >> context here.
> >>
> >> On Wed, Sep 14, 2022 at 5:20 AM Krzysztof Chmielewski <
> >> krzysiek.chmielew...@gmail.com> wrote:
> >> Hi Yun,
> >> Thanks for your input.
> >>
> >> In Delta Sink connector we actually use both Committer [1] and
> >> GlobalCommitter [2]. The former, since we are using Flink's Parquet file
> >> writers is doing a very simple job of "of renaming the hidden file to
> make
> >> it visible and removing from the name some 'in-progress file' marker".
> The
> >> GlobalCommitter is committing data to the Delta Log.
> >>
> >> With this design, having many instances of Committers actually has a
> >> benefit for us. Plus we would see some next features in our connector
> that
> >> would benefit from separate Committers with parallelism level higher
> than 1.
> >>
> >> How I understood your suggestion Yun (and maybe It was a wrong
> >> interpretation) is to use both Committer and GlobalCommitter but to
> enforce
> >> parallelism level 1 on the former. The GlobalCommitter created by
> Flink's
> >> 1.15 SinkV1Adapter has parallelism 1 as expected and how it was in
> Flink <
> >> 1.15.
> >>
> >> Anyways, I've play a little bit with the Flink code and I managed to
> >> achieved this [3]. After some additional changes which I will describe
> >> below, our test described in [4] passed without any data loss and no
> >> Exceptions thrown by Flink.
> >>
> >> However changing parallelism of Committer operator to hardcoded value 1
> >> was not enough and I had to do two more things:
> >> 1. add rebalance step (RebalancePartitioner) to graph between writer and
> >> committer since now they have different parallelism level and default
> >> partitioner was FORWARD that caused an exception to be thrown - BTW
> this is
> >> clear and understood
> >> 2. modify Flinks CommittableCollectorSerializer [5] and this is I
> believe
> >> an important thing.
> >>
> >> The modification I had to made was caused by "Duplicate Key" exception
> >> from deserialize(int version, byte[] serialized) method from line 143 of
> >> [5] where we process a stream of SubtaskCommittableManager objects and
> >> collect it into to the Map. The map key is a subtaskId
> >> from SubtaskCommittableManager object.
> >>
> >> After Task Manager recovery it may happen that List of
> >> SubtaskCommittableManager that is processed in that  deserialize method
> >> will contain two SubtaskCommittableManager for the same subtask ID.
> What I
> >> did is that for such a case I call SubtaskCommittableManager .merge(...)
> >> method.
> >>
> >> With those modifications our Delta test [7] started to pass on Flink
> 1.15.
> >>
> >> I do not know whether setting parallelism level of the Committer to 1 is
> >> a right thing to do. Like I mentioned, Committer is doing some work in
> our
> >> Sink implementation and we might have more usage for it in next
> features we
> >> would like to add that would benefit from keeping parallelism level
> equal
> >> to writers count.
> >>
> >> I still think there is some issue with the V2 architecture for
> topologies
> >> with GlobalCommitter and failover scenarios [4] and even that duplicated
> >> key in [5] described above is another case, maybe we should never have
> two
> >> entries for same subtaskId. That I don't know.
> >>
> >> P.S.
> >> Steven, apologies for hijacking the thread a little bit.
> >>
> >> Thanks,
> >> Krzysztof Chmielewski
> >>
> >> [1]
> >>
> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaCommitter.java
> >> [2]
> >>
> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
> >> [3]
> >>
> https://drive.google.com/file/d/1kU0R9nLZneJBDAkgNiaRc90dLGycyTec/view?usp=sharing
> >> [4] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc
> >> [5]
> >>
> https://github.com/apache/flink/blob/release-1.15/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java
> >> [7]
> >>
> https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java
> >>
> >> śr., 14 wrz 2022 o 05:26 Steven Wu <stevenz...@gmail.com> napisał(a):
> >> > setting the committer parallelism to 1.
> >>
> >> Yun, setting the parallelism to 1 is essentially a global committer.
> That
> >> would work. not sure about the implications to other parts of the v2
> sink
> >> interface.
> >>
> >> On Tue, Sep 13, 2022 at 2:17 PM Krzysztof Chmielewski <
> >> krzysiek.chmielew...@gmail.com> wrote:
> >> Hi  Martijn
> >> Could you clarify a little bit what do you mean by:
> >>
> >> "The important part to remember is that this
> >> topology is lagging one checkpoint behind in terms of fault-tolerance:
> it
> >> only receives data once the committer committed"
> >>
> >> What are the implications?
> >>
> >> Thanks,
> >> Krzysztof Chmielewski
> >>
> >> wt., 13 wrz 2022 o 09:57 Yun Gao <yungao...@aliyun.com.invalid>
> >> napisał(a):
> >> Hi,
> >> Very sorry for the late reply for being in the holiday.
> >> And also very thanks for the discussion, it also reminds me
> >> one more background on the change of the GlobalCommitter:
> >> When we are refactoring the job finish process in FLIP-147 to
> >> ensures all the records could be committed at the end of bounded
> >> streaming job, we have to desert the support for the cascade commits,
> >> which makes the cascade commit of `committer -> global committer` not
> work
> >> in all cases.
> >> For the current issues, one possible alternative option from my side is
> >> that we
> >> may support setting the committer parallelism to 1. Could this option
> >> solves
> >> the issue in the current scenarios? I'll also have a double check with
> if
> >> it could be implemented and the failed tests Krzysztof met.
> >> Best,
> >> Yun
> >> ------------------------------------------------------------------
> >> From:Steven Wu <stevenz...@gmail.com>
> >> Send Time:2022 Sep. 10 (Sat.) 11:31
> >> To:dev <dev@flink.apache.org>
> >> Cc:Yun Gao <yungao...@aliyun.com>; hililiwei <hilili...@gmail.com>
> >> Subject:Re: Sink V2 interface replacement for GlobalCommitter
> >> Martjin, thanks a lot for chiming in!
> >> Here are my concerns with adding GlobalCommitter in the
> >> PostCommitTopology
> >> 1. when we use TwoPhaseCommittingSink. We would need to create a
> >> noop/dummy committer. Actual Iceberg/DeltaLake commits happen in the
> >> PostCommit stage. The PostCommit stage should be doing some work after
> the
> >> commit (not for the commit).
> >> 2. GlobalCommitter is marked as @deprecated. It will be removed at a
> >> certain point. What then?
> >> Thanks,
> >> Steven
> >> On Fri, Sep 9, 2022 at 1:20 PM Krzysztof Chmielewski <
> >> krzysiek.chmielew...@gmail.com <mailto:krzysiek.chmielew...@gmail.com
> >>
> >> wrote:
> >> Thanks Martijn,
> >>  I'm actually trying to run our V1 Delta connector on Flink 1.15 using
> >>  SinkV1Adapter with GlobalCommitterOperator.
> >>  Having said that, I might have found a potential issue with
> >>  GlobalCommitterOperator, checkpoitining and failover recovery [1].
> >>  For "normal" scenarios it does look good though.
> >>  Regards,
> >>  Krzysztof Chmielewski
> >>  [1] https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc <
> >> https://lists.apache.org/thread/otscy199g1l9t3llvo8s2slntyn2r1jc >
> >>  pt., 9 wrz 2022 o 20:49 Martijn Visser <martijnvis...@apache.org
> >> <mailto:martijnvis...@apache.org >>
> >>  napisał(a):
> >>  > Hi all,
> >>  >
> >>  > A couple of bits from when work was being done on the new sink: V1 is
> >>  > completely simulated as V2 [1]. V2 is strictly more expressive.
> >>  >
> >>  > If there's desire to stick to the `GlobalCommitter` interface, have a
> >>  > look at the StandardSinkTopologies. Or you can just add your own more
> >>  > fitting PostCommitTopology. The important part to remember is that
> this
> >>  > topology is lagging one checkpoint behind in terms of
> fault-tolerance:
> >> it
> >>  > only receives data once the committer committed
> >>  > on notifyCheckpointComplete. Thus, the global committer needs to be
> >>  > idempotent and able to restore the actual state on recovery. That
> >>  > limitation is coming in from Flink's checkpointing behaviour and
> >> applies to
> >>  > both V1 and V2. GlobalCommitterOperator is abstracting these issues
> >> along
> >>  > with handling retries (so commits that happen much later). So it's
> >> probably
> >>  > a good place to start just with the standard topology.
> >>  >
> >>  > Best regards,
> >>  >
> >>  > Martijn
> >>  >
> >>  > [1]
> >>  >
> >>  >
> >>
> https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370
> >> <
> >>
> https://github.com/apache/flink/blob/955e5ff34082ff8a4a46bb74889612235458eb76/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L359-L370
> >> >
> >>  >
> >>  > Op do 8 sep. 2022 om 20:51 schreef Krzysztof Chmielewski <
> >>  > krzysiek.chmielew...@gmail.com <mailto:
> krzysiek.chmielew...@gmail.com
> >> >>:
> >>  >
> >>  > > Hi,
> >>  > > Krzysztof Chmielewski [1] from Delta-Flink connector open source
> >>  > community
> >>  > > here [2].
> >>  > >
> >>  > > I'm totally agree with Steven on this. Sink's V1 GlobalCommitter is
> >>  > > something exactly what Flink-Delta Sink needs since it is the place
> >> where
> >>  > > we do an actual commit to the Delta Log which should be done from a
> >> one
> >>  > > place/instance.
> >>  > >
> >>  > > Currently I'm evaluating V2 for our connector and having, how
> Steven
> >>  > > described it a "more natural, built-in concept/support of
> >> GlobalCommitter
> >>  > > in the sink v2 interface" would be greatly appreciated.
> >>  > >
> >>  > > Cheers,
> >>  > > Krzysztof Chmielewski
> >>  > >
> >>  > > [1] https://github.com/kristoffSC <https://github.com/kristoffSC >
> >>  > > [2] https://github.com/delta-io/connectors/tree/master/flink <
> >> https://github.com/delta-io/connectors/tree/master/flink >
> >>  > >
> >>  > > czw., 8 wrz 2022 o 19:51 Steven Wu <stevenz...@gmail.com <mailto:
> >> stevenz...@gmail.com >> napisał(a):
> >>  > >
> >>  > > > Hi Yun,
> >>  > > >
> >>  > > > Thanks a lot for the reply!
> >>  > > >
> >>  > > > While we can add the global committer in the
> >> WithPostCommitTopology,
> >>  > the
> >>  > > > semantics are weird. The Commit stage actually didn't commit
> >> anything
> >>  > to
> >>  > > > the Iceberg table, and the PostCommit stage is where the Iceberg
> >> commit
> >>  > > > happens.
> >>  > > >
> >>  > > > I just took a quick look at DeltaLake Flink sink. It still uses
> >> the V1
> >>  > > sink
> >>  > > > interface [1]. I think it might have the same issue when
> switching
> >> to
> >>  > the
> >>  > > > V2 sink interface.
> >>  > > >
> >>  > > > For data lake storages (like Iceberg, DeltaLake) or any storage
> >> with
> >>  > > global
> >>  > > > transactional commit, it would be more natural to have a built-in
> >>  > > > concept/support of GlobalCommitter in the sink v2 interface.
> >>  > > >
> >>  > > > Thanks,
> >>  > > > Steven
> >>  > > >
> >>  > > > [1]
> >>  > > >
> >>  > > >
> >>  > >
> >>  >
> >>
> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
> >> <
> >>
> https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/committer/DeltaGlobalCommitter.java
> >> >
> >>  > > >
> >>  > > >
> >>  > > > On Wed, Sep 7, 2022 at 2:15 AM Yun Gao
> >> <yungao...@aliyun.com.invalid>
> >>  > > > wrote:
> >>  > > >
> >>  > > > > Hi Steven, Liwei,
> >>  > > > > Very sorry for missing this mail and response very late.
> >>  > > > > I think the initial thought is indeed to use
> >> `WithPostCommitTopology`
> >>  > > as
> >>  > > > > a replacement of the original GlobalCommitter, and currently
> the
> >>  > > adapter
> >>  > > > of
> >>  > > > > Sink v1 on top of Sink v2 also maps the GlobalCommitter in Sink
> >> V1
> >>  > > > > interface
> >>  > > > > onto an implementation of `WithPostCommitTopology`.
> >>  > > > > Since `WithPostCommitTopology` supports arbitrary subgraph,
> thus
> >> It
> >>  > > seems
> >>  > > > > to
> >>  > > > > me it could support both global committer and small file
> >> compaction?
> >>  > We
> >>  > > > > might
> >>  > > > > have an `WithPostCommitTopology` implementation like
> >>  > > > > DataStream ds = add global committer;
> >>  > > > > if (enable file compaction) {
> >>  > > > > build the compaction subgraph from ds
> >>  > > > > }
> >>  > > > > Best,
> >>  > > > > Yun
> >>  > > > > [1]
> >>  > > > >
> >>  > > >
> >>  > >
> >>  >
> >>
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> >> <
> >>
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> >> >
> >>  > > > > <
> >>  > > > >
> >>  > > >
> >>  > >
> >>  >
> >>
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> >> <
> >>
> https://github.com/apache/flink/blob/a8ca381c57788cd1a1527e4ebdc19bdbcd132fc4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java#L365
> >> >
> >>  > > > > >
> >>  > > > >
> >> ------------------------------------------------------------------
> >>  > > > > From:Steven Wu <stevenz...@gmail.com <mailto:
> >> stevenz...@gmail.com >>
> >>  > > > > Send Time:2022 Aug. 17 (Wed.) 07:30
> >>  > > > > To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org >>;
> >> hililiwei <hilili...@gmail.com <mailto:hilili...@gmail.com >>
> >>  > > > > Subject:Re: Sink V2 interface replacement for GlobalCommitter
> >>  > > > > > Plus, it will disable the future capability of small file
> >>  > compaction
> >>  > > > > stage post commit.
> >>  > > > > I should clarify this comment. if we are using the
> >>  > > > `WithPostCommitTopology`
> >>  > > > > for global committer, we would lose the capability of using the
> >> post
> >>  > > > commit
> >>  > > > > stage for small files compaction.
> >>  > > > > On Tue, Aug 16, 2022 at 9:53 AM Steven Wu <
> stevenz...@gmail.com
> >> <mailto:stevenz...@gmail.com >>
> >>  > > wrote:
> >>  > > > > >
> >>  > > > > > In the V1 sink interface, there is a GlobalCommitter for
> >> Iceberg.
> >>  > > With
> >>  > > > > the
> >>  > > > > > V2 sink interface, GlobalCommitter has been deprecated by
> >>  > > > > > WithPostCommitTopology. I thought the post commit stage is
> >> mainly
> >>  > for
> >>  > > > > async
> >>  > > > > > maintenance (like compaction).
> >>  > > > > >
> >>  > > > > > Are we supposed to do sth similar to the
> >>  > GlobalCommittingSinkAdapter?
> >>  > > > It
> >>  > > > > > seems like a temporary transition plan for bridging v1 sinks
> >> to v2
> >>  > > > > > interfaces.
> >>  > > > > >
> >>  > > > > > private class GlobalCommittingSinkAdapter extends
> >>  > > > > TwoPhaseCommittingSinkAdapter
> >>  > > > > > implements WithPostCommitTopology<InputT, CommT> {
> >>  > > > > > @Override
> >>  > > > > > public void
> >>  > > addPostCommitTopology(DataStream<CommittableMessage<CommT>>
> >>  > > > > committables) {
> >>  > > > > > StandardSinkTopologies.addGlobalCommitter(
> >>  > > > > > committables,
> >>  > > > > > GlobalCommitterAdapter::new,
> >>  > > > > > () -> sink.getCommittableSerializer().get());
> >>  > > > > > }
> >>  > > > > > }
> >>  > > > > >
> >>  > > > > >
> >>  > > > > > In the Iceberg PR [1] for adopting the new sink interface,
> >> Liwei
> >>  > used
> >>  > > > the
> >>  > > > > > "global" partitioner to force all committables go to a single
> >>  > > committer
> >>  > > > > > task 0. It will effectively force a global committer
> disguised
> >> in
> >>  > the
> >>  > > > > > parallel committers. It is a little weird and also can lead
> to
> >>  > > > questions
> >>  > > > > > why other committer tasks are not getting any messages. Plus,
> >> it
> >>  > will
> >>  > > > > > disable the future capability of small file compaction stage
> >> post
> >>  > > > commit.
> >>  > > > > > Hence, I am asking what is the right approach to achieve
> global
> >>  > > > committer
> >>  > > > > > behavior.
> >>  > > > > >
> >>  > > > > > Thanks,
> >>  > > > > > Steven
> >>  > > > > >
> >>  > > > > > [1]
> >> https://github.com/apache/iceberg/pull/4904/files#r946975047 <
> >> https://github.com/apache/iceberg/pull/4904/files#r946975047 > <
> >>  > > > > https://github.com/apache/iceberg/pull/4904/files#r946975047 <
> >> https://github.com/apache/iceberg/pull/4904/files#r946975047 > >
> >>  > > > > >
> >>  > > > >
> >>  > > >
> >>  > >
> >>  >
> >>
> >>
> >> --
> liwei li
>

Reply via email to