Hi Gordon, Thanks for the review, here are my thoughts:
> In terms of the abstraction layering, I was wondering if you've also considered this approach which I've quickly sketched in my local fork: https://github.com/tzulitai/flink/commit/e84e3ac57ce023c35037a8470fefdfcad877bcae I think we have a few design issues here: - How to handle the old interface where the transformation is not needed in the pre-commit phase? - As you have proposed, default method implementation is a nice solution here, as we do not really have to change everything in the transformation process. - How to handle the WithPostCommitTopology interface? - Currently the parent interface for the sink with a post commit topology is strictly a single interface (TwoPhaseCommittingSink) and we want to add this to both type of sinks (new - with transformation / old - without transformation). In this case we could get away with creating OldTwoPhaseCommittingSink WithPostCommitTopology and NewTwoPhaseCommittingSinkWithPostCommitTopology, but this is not a good approach for future extensibility. I tend to prefer a real mixin approach to creating multiple interfaces for this. > Quick thought: regarding the awkwardness you mention in the end with sinks that have post commit topologies, but no pre-commit topologies - Alternative to the mixin approach in the FLIP, it might make sense to consider a builder approach for constructing 2PC sinks TBH, after providing the possibility to transform in the pre-commit phase, I have started to think about the possible different generalizations: - Why not have the possibility to have a different return type of the pre-write phase? - While we have the possibility to transform the data in a preceding map phase before the Sink, but for some Sinks might want to encapsulate these transformations before the writes. - Why not have the explicit possibility to change the return type of the committer? - We might not want to emit the incoming Committable, we might want to use the commit hash - or any other data generated by the committer - in the post-commit topology. So in some cases it might make sense for the committer to emit elements with different types than the input. - Why not have everything as a mixin interface and define a Sink this way (very-very similar to your builder approach) But I currently do not see explicit requirements for these features, and it would result in another full rewrite of the Sink API which had a really troubled history with several rewrites in the recent releases, so I decided against these big changes and kept the changes minimal. So, while I personally would love to see the Builder solution, I am afraid that the Flink community needs some stability around the Sink API for now, so the different Sinks could start to use this new feature. What do you think? Tzu-Li (Gordon) Tai <tzuli...@apache.org> ezt írta (időpont: 2023. okt. 25., Sze, 2:01): > Hi Peter, > > Thanks a lot for starting this FLIP! > > I agree that the current TwoPhaseCommittingSink interfaces is limiting in > that it assumes 1) committers have the same parallelism as writers, and 2) > writers immediately produce finalized committables. This FLIP captures the > problem pretty well, and I do think there are use cases for a more general > flexible interface outside of the Iceberg connector as well. > > In terms of the abstraction layering, I was wondering if you've also > considered this approach which I've quickly sketched in my local fork: > > https://github.com/tzulitai/flink/commit/e84e3ac57ce023c35037a8470fefdfcad877bcae > > With this approach, the sink translator always expect that 2PC sink > implementations should extend `TwoPhaseCommittingSinkBase` and therefore > assumes that a pre-commit topology always exist. For simple 2PC sinks that > do not require transforming committables, we would ship (for convenience) > an additional `SimpleTwoPhaseCommittingSinkBase` where the pre-commit > topology is a no-op passthrough. With that we avoid some of the > "boilerplates" where 2PC sinks with pre-commit topology requires > implementing two interfaces, as proposed in the FLIP. > > Quick thought: regarding the awkwardness you mention in the end with sinks > that have post commit topologies, but no pre-commit topologies - > Alternative to the mixin approach in the FLIP, it might make sense to > consider a builder approach for constructing 2PC sinks, which should also > give users type-safety at compile time while not having the awkwardness > with all the types involved. Something along the lines of: > > ``` > new TwoPhaseCommittingSinkBuilder(writerSupplier, committerSupplier) > .withPreCommitTopology(writerResultsStream -> ...) // > Function<DataStream<WriterResultT>, DataStream<CommT>> > .withPostCommitTopology(committablesStream -> ...) // > Consumer<DataStream<CommT>> > .withPreWriteTopology(inputStream -> ...) // > Function<DataStream<InputT>, DataStream<InputT>> > .build(); > ``` > > We could probably do some validation in the build() method, e.g. if writer > / committer have different types, then clearly a pre-commit topology should > have been defined to transform intermediate writer results. > > Obviously, this would take generalization of the TwoPhaseCommittingSink > interface to the extreme, where we just have one interface with all of the > pre-commit / pre-write / post-commit methods built-in, and users would use > the builder as the entrypoint to opt-in / opt-out as needed. The upside is > that the SinkTransformationTranslator logic will become much less > cluttered. > > I'll need to experiment the builder approach a bit more to see if it makes > sense at all, but wanted to throw out the idea earlier to see what you > think. > > On Mon, Oct 9, 2023 at 6:59 AM Péter Váry <peter.vary.apa...@gmail.com> > wrote: > > > Hi Team, > > > > Did some experimenting and found the originally proposed solution to be a > > bit awkward for cases where WithPostCommitTopology was needed but we do > not > > need the WithPreCommitTopology transformation. > > The flexibility of the new API would be better if we would use a mixin > like > > approach. The interfaces would only be used to define the specific > required > > methods, and they would not need to extend the original > > TwoPhaseCommittingSink interface too. > > > > Since the interfaces WithPreCommitTopology and the WithPostCommitTopology > > interfaces are still Experimental, after talking to Gyula offline, I have > > updated the FLIP to use this new approach. > > > > Any comments, thoughts are welcome. > > > > Thanks, > > Peter > > > > Péter Váry <peter.vary.apa...@gmail.com> ezt írta (időpont: 2023. okt. > 5., > > Cs, 16:04): > > > > > Hi Team, > > > > > > In my previous email[1] I have described our challenges migrating the > > > existing Iceberg SinkFunction based implementation, to the new SinkV2 > > based > > > implementation. > > > > > > As a result of the discussion around that topic, I have created the > > > FLIP-371 [2] to address the Committer related changes, and now I have > > > created a companion FLIP-372 [3] to address the WithPreCommitTopology > > > related issues. > > > > > > FLIP-372: Allow TwoPhaseCommittingSink WithPreCommitTopology to alter > the > > > type of the Committable > > > > > > The main goal of the FLIP-372 is to extend the currently existing > > > TwoPhaseCommittingSink API by adding the possibility to have a > > > PreCommitTopology where the input of and the output types of the pre > > commit > > > transformation are different. > > > > > > Here is the FLIP: FLIP-372: Allow TwoPhaseCommittingSink > > > WithPreCommitTopology to alter the type of the Committable > > > < > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable > > > > > > > > > Please use this thread to discuss the FLIP related questions, > proposals, > > > and feedback. > > > > > > Thanks, > > > Peter > > > > > > - [1] https://lists.apache.org/thread/h3kg7jcgjstpvwlhnofq093vk93ylgsn > > > - [2] > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-371%3A+Provide+initialization+context+for+Committer+creation+in+TwoPhaseCommittingSink > > > - [3] > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable > > > > > >