Hi Gordon,

Thanks for the review, here are my thoughts:

> In terms of the abstraction layering, I was wondering if you've also
considered this approach which I've quickly sketched in my local fork:
https://github.com/tzulitai/flink/commit/e84e3ac57ce023c35037a8470fefdfcad877bcae

I think we have a few design issues here:
- How to handle the old interface where the transformation is not needed
in the pre-commit phase? - As you have proposed, default method
implementation is a nice solution here, as we do not really have to change
everything in the transformation process.
- How to handle the WithPostCommitTopology interface? - Currently the
parent interface for the sink with a post commit topology is strictly a
single interface (TwoPhaseCommittingSink) and we want to add this to both
type of sinks (new - with transformation / old - without transformation).
In this case we could get away with creating OldTwoPhaseCommittingSink
WithPostCommitTopology and NewTwoPhaseCommittingSinkWithPostCommitTopology,
but this is not a good approach for future extensibility. I tend to prefer
a real mixin approach to creating multiple interfaces for this.

> Quick thought: regarding the awkwardness you mention in the end with
sinks that have post commit topologies, but no pre-commit topologies -
Alternative to the mixin approach in the FLIP, it might make sense to
consider a builder approach for constructing 2PC sinks

TBH, after providing the possibility to transform in the pre-commit phase,
I have started to think about the possible different generalizations:
- Why not have the possibility to have a different return type of the
pre-write phase? - While we have the possibility to transform the data in a
preceding map phase before the Sink, but for some Sinks might want to
encapsulate these transformations before the writes.
- Why not have the explicit possibility to change the return type of the
committer? - We might not want to emit the incoming Committable, we might
want to use the commit hash - or any other data generated by the committer
- in the post-commit topology. So in some cases it might make sense for the
committer to emit elements with different types than the input.
- Why not have everything as a mixin interface and define a Sink this way
(very-very similar to your builder approach)

But I currently do not see explicit requirements for these features, and it
would result in another full rewrite of the Sink API which had a really
troubled history with several rewrites in the recent releases, so I decided
against these big changes and kept the changes minimal.

So, while I personally would love to see the Builder solution, I am afraid
that the Flink community needs some stability around the Sink API for now,
so the different Sinks could start to use this new feature.

What do you think?

Tzu-Li (Gordon) Tai <tzuli...@apache.org> ezt írta (időpont: 2023. okt.
25., Sze, 2:01):

> Hi Peter,
>
> Thanks a lot for starting this FLIP!
>
> I agree that the current TwoPhaseCommittingSink interfaces is limiting in
> that it assumes 1) committers have the same parallelism as writers, and 2)
> writers immediately produce finalized committables. This FLIP captures the
> problem pretty well, and I do think there are use cases for a more general
> flexible interface outside of the Iceberg connector as well.
>
> In terms of the abstraction layering, I was wondering if you've also
> considered this approach which I've quickly sketched in my local fork:
>
> https://github.com/tzulitai/flink/commit/e84e3ac57ce023c35037a8470fefdfcad877bcae
>
> With this approach, the sink translator always expect that 2PC sink
> implementations should extend `TwoPhaseCommittingSinkBase` and therefore
> assumes that a pre-commit topology always exist. For simple 2PC sinks that
> do not require transforming committables, we would ship (for convenience)
> an additional `SimpleTwoPhaseCommittingSinkBase` where the pre-commit
> topology is a no-op passthrough. With that we avoid some of the
> "boilerplates" where 2PC sinks with pre-commit topology requires
> implementing two interfaces, as proposed in the FLIP.
>
> Quick thought: regarding the awkwardness you mention in the end with sinks
> that have post commit topologies, but no pre-commit topologies -
> Alternative to the mixin approach in the FLIP, it might make sense to
> consider a builder approach for constructing 2PC sinks, which should also
> give users type-safety at compile time while not having the awkwardness
> with all the types involved. Something along the lines of:
>
> ```
> new TwoPhaseCommittingSinkBuilder(writerSupplier, committerSupplier)
>     .withPreCommitTopology(writerResultsStream -> ...)       //
> Function<DataStream<WriterResultT>, DataStream<CommT>>
>     .withPostCommitTopology(committablesStream -> ...)     //
> Consumer<DataStream<CommT>>
>     .withPreWriteTopology(inputStream -> ...)      //
> Function<DataStream<InputT>, DataStream<InputT>>
>     .build();
> ```
>
> We could probably do some validation in the build() method, e.g. if writer
> / committer have different types, then clearly a pre-commit topology should
> have been defined to transform intermediate writer results.
>
> Obviously, this would take generalization of the TwoPhaseCommittingSink
> interface to the extreme, where we just have one interface with all of the
> pre-commit / pre-write / post-commit methods built-in, and users would use
> the builder as the entrypoint to opt-in / opt-out as needed. The upside is
> that the SinkTransformationTranslator logic will become much less
> cluttered.
>
> I'll need to experiment the builder approach a bit more to see if it makes
> sense at all, but wanted to throw out the idea earlier to see what you
> think.
>
> On Mon, Oct 9, 2023 at 6:59 AM Péter Váry <peter.vary.apa...@gmail.com>
> wrote:
>
> > Hi Team,
> >
> > Did some experimenting and found the originally proposed solution to be a
> > bit awkward for cases where WithPostCommitTopology was needed but we do
> not
> > need the WithPreCommitTopology transformation.
> > The flexibility of the new API would be better if we would use a mixin
> like
> > approach. The interfaces would only be used to define the specific
> required
> > methods, and they would not need to extend the original
> > TwoPhaseCommittingSink interface too.
> >
> > Since the interfaces WithPreCommitTopology and the WithPostCommitTopology
> > interfaces are still Experimental, after talking to Gyula offline, I have
> > updated the FLIP to use this new approach.
> >
> > Any comments, thoughts are welcome.
> >
> > Thanks,
> > Peter
> >
> > Péter Váry <peter.vary.apa...@gmail.com> ezt írta (időpont: 2023. okt.
> 5.,
> > Cs, 16:04):
> >
> > > Hi Team,
> > >
> > > In my previous email[1] I have described our challenges migrating the
> > > existing Iceberg SinkFunction based implementation, to the new SinkV2
> > based
> > > implementation.
> > >
> > > As a result of the discussion around that topic, I have created the
> > > FLIP-371 [2] to address the Committer related changes, and now I have
> > > created a companion FLIP-372 [3] to address the WithPreCommitTopology
> > > related issues.
> > >
> > > FLIP-372: Allow TwoPhaseCommittingSink WithPreCommitTopology to alter
> the
> > > type of the Committable
> > >
> > > The main goal of the FLIP-372 is to extend the currently existing
> > > TwoPhaseCommittingSink API by adding the possibility to have a
> > > PreCommitTopology where the input of and the output types of the pre
> > commit
> > > transformation are different.
> > >
> > > Here is the FLIP: FLIP-372: Allow TwoPhaseCommittingSink
> > > WithPreCommitTopology to alter the type of the Committable
> > > <
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable
> > >
> > >
> > > Please use this thread to discuss the FLIP related questions,
> proposals,
> > > and feedback.
> > >
> > > Thanks,
> > > Peter
> > >
> > > - [1] https://lists.apache.org/thread/h3kg7jcgjstpvwlhnofq093vk93ylgsn
> > > - [2]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-371%3A+Provide+initialization+context+for+Committer+creation+in+TwoPhaseCommittingSink
> > > - [3]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable
> > >
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable
> > >
> >
>

Reply via email to