Hi Peter! Thank you for the analysis of the options.
I don't really have a strong opinion, but in general I am in favor of the mix in style interfaces. We follow the same approach for table sources / sinks as well. Some other benefits I see in addition to what you mentioned: - Easier to introduce new experimental / public-evolving interfaces in the future - Easier to declare parts of the api stable going forward as it's not all or nothing The ability to do proper compile time validation is definitely a downside but this should mostly make initial development a little harder I believe. Cheers, Gyula On Thu, Nov 23, 2023 at 1:25 PM Péter Váry <peter.vary.apa...@gmail.com> wrote: > We had a longer discussion with Gordon yesterday. > The main conclusion was that moving to a completely new interface is not > justified, and we try to improve the current one. > > Another ask from Gordon was to check when the user will be notified if the > parameter types are incorrect using the mixin approach. > Imagine the type definition below: > > private static class > TestTwoPhaseCommittingSinkWithPreCommitTopologyWrongMixin > implements > TwoPhaseCommittingSinkWithPreCommitTopology<Integer, Long, String>, > WithPreCommitTopology<Boolean, Void> { > > The parametrization of the above interfaces contradicts each other: > > - TwoPhaseCommittingSinkWithPreCommitToplogy > - Input - Interger > - WriterResult - Long > - Committable - String) > - WithPreCommitToplogy > - WriteResult - Boolean > - Committable - Void > > > Sadly, I was not able to find a solution where we could notify the user at > job startup time. The first error the user will get is when the first > record is processed/committed. Talked with Gyula Fora, and we discussed the > possibility to use the TypeExtractor to get the types. We have decided that > it could work in some cases, but would not be a generic solution. See the > "NOTES FOR USERS OF THIS CLASS" [1] > > This missing feature would justify abandoning the mixin solution, and > sticking to creating individual interfaces, like: > > - *TwoPhaseCommittingSink* - When no pre-commit topology is needed - > kept because it is enough for most of the use-cases. > - *TwoPhaseCommittingSinkWithPreCommitTopology* - When pre-commit > topology is needed with transformation in the pre commit stage - the new > generic interface (could be internal) > - *WithPreWriteTopology* - kept as it is > - *WithPreCommitTopology* - extends > TwoPhaseCommittingSinkWithPreCommitTopology with the transformation > method > (classes from streaming package is needed, so can not be merged with > TwoPhaseCommittingSinkWithPreCommitTopology) > - *WithPostCommitTopology* - kept as it is (extends only > TwoPhaseCommittingSink, so no pre-commit topology is allowed) > - *WithPostCommitTopologyWithPreCommitTopology* - extends > WithPreCommitTopology with the same method as WithPostCommitTopology > > I don't really like the `WithPostCommitTopologyWithPreCommitTopology` > complex interface, and if we start adding new features then the number of > the interfaces could exponentially grow, but I agree that the type checking > is important. I don't have a strong opinion, but I am inclined to vote for > moving in the direction of the individual intefaces. > > What do you prefer? > > 1. Go with the mixin approach > 1. Better extendability > 2. Fewer interfaces (only with 1 now, but later this could be more) > 3. Easier to understand (IMHO) > 2. Stick with the combined interfaces approach (some mixin, like > WithPreWriteTopology, some combined like > WithPostCommitTopologyWithPreCommitTopology) > 1. Better error messages > 2. Less disruptive change (still breaking for implementations of > WithPreCommitTopology) > 3. Do you have a better idea? > > > Thanks, > Peter > > CC: Jiabao Sun - as he might be interested in this discussion > > [1] - > > https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/java/typeutils/TypeExtractor.html > > > Péter Váry <peter.vary.apa...@gmail.com> ezt írta (időpont: 2023. okt. > 25., > Sze, 16:02): > > > Hi Gordon, > > > > Thanks for the review, here are my thoughts: > > > > > In terms of the abstraction layering, I was wondering if you've also > > considered this approach which I've quickly sketched in my local fork: > > > https://github.com/tzulitai/flink/commit/e84e3ac57ce023c35037a8470fefdfcad877bcae > > > > I think we have a few design issues here: > > - How to handle the old interface where the transformation is not needed > > in the pre-commit phase? - As you have proposed, default method > > implementation is a nice solution here, as we do not really have to > change > > everything in the transformation process. > > - How to handle the WithPostCommitTopology interface? - Currently the > > parent interface for the sink with a post commit topology is strictly a > > single interface (TwoPhaseCommittingSink) and we want to add this to both > > type of sinks (new - with transformation / old - without transformation). > > In this case we could get away with creating OldTwoPhaseCommittingSink > > WithPostCommitTopology and > NewTwoPhaseCommittingSinkWithPostCommitTopology, > > but this is not a good approach for future extensibility. I tend to > prefer > > a real mixin approach to creating multiple interfaces for this. > > > > > Quick thought: regarding the awkwardness you mention in the end with > > sinks that have post commit topologies, but no pre-commit topologies - > > Alternative to the mixin approach in the FLIP, it might make sense to > > consider a builder approach for constructing 2PC sinks > > > > TBH, after providing the possibility to transform in the pre-commit > phase, > > I have started to think about the possible different generalizations: > > - Why not have the possibility to have a different return type of the > > pre-write phase? - While we have the possibility to transform the data > in a > > preceding map phase before the Sink, but for some Sinks might want to > > encapsulate these transformations before the writes. > > - Why not have the explicit possibility to change the return type of the > > committer? - We might not want to emit the incoming Committable, we might > > want to use the commit hash - or any other data generated by the > committer > > - in the post-commit topology. So in some cases it might make sense for > the > > committer to emit elements with different types than the input. > > - Why not have everything as a mixin interface and define a Sink this way > > (very-very similar to your builder approach) > > > > But I currently do not see explicit requirements for these features, and > > it would result in another full rewrite of the Sink API which had a > really > > troubled history with several rewrites in the recent releases, so I > decided > > against these big changes and kept the changes minimal. > > > > So, while I personally would love to see the Builder solution, I am > afraid > > that the Flink community needs some stability around the Sink API for > now, > > so the different Sinks could start to use this new feature. > > > > What do you think? > > > > Tzu-Li (Gordon) Tai <tzuli...@apache.org> ezt írta (időpont: 2023. okt. > > 25., Sze, 2:01): > > > >> Hi Peter, > >> > >> Thanks a lot for starting this FLIP! > >> > >> I agree that the current TwoPhaseCommittingSink interfaces is limiting > in > >> that it assumes 1) committers have the same parallelism as writers, and > 2) > >> writers immediately produce finalized committables. This FLIP captures > the > >> problem pretty well, and I do think there are use cases for a more > general > >> flexible interface outside of the Iceberg connector as well. > >> > >> In terms of the abstraction layering, I was wondering if you've also > >> considered this approach which I've quickly sketched in my local fork: > >> > >> > https://github.com/tzulitai/flink/commit/e84e3ac57ce023c35037a8470fefdfcad877bcae > >> > >> With this approach, the sink translator always expect that 2PC sink > >> implementations should extend `TwoPhaseCommittingSinkBase` and therefore > >> assumes that a pre-commit topology always exist. For simple 2PC sinks > that > >> do not require transforming committables, we would ship (for > convenience) > >> an additional `SimpleTwoPhaseCommittingSinkBase` where the pre-commit > >> topology is a no-op passthrough. With that we avoid some of the > >> "boilerplates" where 2PC sinks with pre-commit topology requires > >> implementing two interfaces, as proposed in the FLIP. > >> > >> Quick thought: regarding the awkwardness you mention in the end with > sinks > >> that have post commit topologies, but no pre-commit topologies - > >> Alternative to the mixin approach in the FLIP, it might make sense to > >> consider a builder approach for constructing 2PC sinks, which should > also > >> give users type-safety at compile time while not having the awkwardness > >> with all the types involved. Something along the lines of: > >> > >> ``` > >> new TwoPhaseCommittingSinkBuilder(writerSupplier, committerSupplier) > >> .withPreCommitTopology(writerResultsStream -> ...) // > >> Function<DataStream<WriterResultT>, DataStream<CommT>> > >> .withPostCommitTopology(committablesStream -> ...) // > >> Consumer<DataStream<CommT>> > >> .withPreWriteTopology(inputStream -> ...) // > >> Function<DataStream<InputT>, DataStream<InputT>> > >> .build(); > >> ``` > >> > >> We could probably do some validation in the build() method, e.g. if > writer > >> / committer have different types, then clearly a pre-commit topology > >> should > >> have been defined to transform intermediate writer results. > >> > >> Obviously, this would take generalization of the TwoPhaseCommittingSink > >> interface to the extreme, where we just have one interface with all of > the > >> pre-commit / pre-write / post-commit methods built-in, and users would > use > >> the builder as the entrypoint to opt-in / opt-out as needed. The upside > is > >> that the SinkTransformationTranslator logic will become much less > >> cluttered. > >> > >> I'll need to experiment the builder approach a bit more to see if it > makes > >> sense at all, but wanted to throw out the idea earlier to see what you > >> think. > >> > >> On Mon, Oct 9, 2023 at 6:59 AM Péter Váry <peter.vary.apa...@gmail.com> > >> wrote: > >> > >> > Hi Team, > >> > > >> > Did some experimenting and found the originally proposed solution to > be > >> a > >> > bit awkward for cases where WithPostCommitTopology was needed but we > do > >> not > >> > need the WithPreCommitTopology transformation. > >> > The flexibility of the new API would be better if we would use a mixin > >> like > >> > approach. The interfaces would only be used to define the specific > >> required > >> > methods, and they would not need to extend the original > >> > TwoPhaseCommittingSink interface too. > >> > > >> > Since the interfaces WithPreCommitTopology and the > >> WithPostCommitTopology > >> > interfaces are still Experimental, after talking to Gyula offline, I > >> have > >> > updated the FLIP to use this new approach. > >> > > >> > Any comments, thoughts are welcome. > >> > > >> > Thanks, > >> > Peter > >> > > >> > Péter Váry <peter.vary.apa...@gmail.com> ezt írta (időpont: 2023. > okt. > >> 5., > >> > Cs, 16:04): > >> > > >> > > Hi Team, > >> > > > >> > > In my previous email[1] I have described our challenges migrating > the > >> > > existing Iceberg SinkFunction based implementation, to the new > SinkV2 > >> > based > >> > > implementation. > >> > > > >> > > As a result of the discussion around that topic, I have created the > >> > > FLIP-371 [2] to address the Committer related changes, and now I > have > >> > > created a companion FLIP-372 [3] to address the > WithPreCommitTopology > >> > > related issues. > >> > > > >> > > FLIP-372: Allow TwoPhaseCommittingSink WithPreCommitTopology to > alter > >> the > >> > > type of the Committable > >> > > > >> > > The main goal of the FLIP-372 is to extend the currently existing > >> > > TwoPhaseCommittingSink API by adding the possibility to have a > >> > > PreCommitTopology where the input of and the output types of the pre > >> > commit > >> > > transformation are different. > >> > > > >> > > Here is the FLIP: FLIP-372: Allow TwoPhaseCommittingSink > >> > > WithPreCommitTopology to alter the type of the Committable > >> > > < > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable > >> > > > >> > > > >> > > Please use this thread to discuss the FLIP related questions, > >> proposals, > >> > > and feedback. > >> > > > >> > > Thanks, > >> > > Peter > >> > > > >> > > - [1] > >> https://lists.apache.org/thread/h3kg7jcgjstpvwlhnofq093vk93ylgsn > >> > > - [2] > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-371%3A+Provide+initialization+context+for+Committer+creation+in+TwoPhaseCommittingSink > >> > > - [3] > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable > >> > > > >> > > > >> > > > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable > >> > > > >> > > >> > > >