Thanks for pushing this effort. I'd actually be in favor of 1b). I fully agree that decorator interfaces should be avoided but I'm also not a big fan of overloading the base interfaces (they are hard to implement as is). The usual feedback to Source-related interfaces are always that they are overwhelming and too hard to implement. However, I'd also not oppose 1c) as scattered interfaces also have drawbacks. I'd just dislike 1a) and 1d). While I also like Becket's capability approach, I fear that it doesn't work for this particular use case: Sources can always be aligned cross-task and this is just about intra-task alignment. So it's plausible to put sources into an alignment group even though they do not use any of the presented API of FLIP-217. They should just issue a warning, if they handle multiple splits (see motivation section).
I think renaming alignSplits to facilitate future use cases makes sense but then all interfaces (if 1c) is chosen) should be adjusted accordingly. AlignedSourceReader could be PausingSourceReader and I'd go for pauseOrResumeSplits (Becket's proposal afaik). We could also split it into pauseSplit and resumeSplit. While pauseOrResumeSplits may allow Sources to just use 1 instead of 2 library calls (as written in the Javadoc), both Kafka and Pulsar can't use it and I'm not sure if there is a system that can. Some nit for the FLIP: - Please replace "stop" with "pause". - Not sure if it's worth it in the capability section: Sources that adopt this interface cannot be used in earlier versions. So it feels like we are only forward compatible (old sources can be used after the change); but I guess this holds for any API addition. - You might want to add what happens when all splits are paused. - You may want to describe how the 3 flavors of SourceReaderBase interact with the interface. - I'm not sure if it makes sense to include Kafka and Pulsar in the FLIP. For me, this is rather immediate follow-up work. (could be in the same umbrella ticket) Best, Arvid On Mon, Apr 25, 2022 at 12:52 PM Dawid Wysakowicz <dwysakow...@apache.org> wrote: > a) "MySourceReader implements SourceReader, WithSplitsAlignment", along > with "MySplitReader implements SplitReader, WithSplitsAlignment", or > b) "MySourceReader implements AlignedSourceReader" and "MySplitReader > implements AlignedSplitReader", or > c) "MySourceReader implements SourceReader" and "MySplitReader implements > SplitReader". > > I think the latest proposal according to Dawid would be: > d) "MySourceReader implements SourceReader" and "MySplitReader implements > AlignedSplitReader". > I am fine with this API, although personally speaking I think it is simpler > to just add a new method to the split reader with default impl. > > > I think that is a good idea to have it aligned as much as possible. I'd be > +1 for your option c). We can merge AlignedSplitReader with SplitReader. We > will update the FLIP shortly. > > Best, > > Dawid > > On 25/04/2022 12:43, Becket Qin wrote: > > Thanks for the comment, Jark. > > 3. Interface/Method Name. > > Can the interface be used to align other things in the future? For example, > align read speed, I have > seen users requesting global rate limits. This feature may also need an > interface like this. > If we don't plan to extend this interface to support align other things, I > suggest explicitly declaring > the purpose of the methods, such as `alignWatermarksForSplits` instead of > `alignSplits`. > > This is a good point. Naming wise, it would usually be more extensible to > just describe what the method actually does, instead of assuming the > purpose of doing this. For example, in this case, pauseOrResumeSplits() > would be more extensible because this can be used for any kind of flow > control, be it watermark alignment or simple rate limiting. > > 4. Interface or Method. > > I don't have a strong opinion on this. I think they have their own > advantages. > In Flink SQL, we heavily use Interfaces for extending abilities > (SupportsXxxx) for TableSource/TableSink, > and I prefer Interfaces rather than methods in this case. When you have a > bunch of abilities and each ability > has more than one method, Interfaces can help to organize them and make > users clear which methods > need to implement when you want to have an ability. > > I am OK with decorative interfaces if this is a general design pattern in > the other components in Flink. But it looks like the current API proposal > is not symmetric. > > The current proposal is essentially "MySourceReader implements > SourceReader, WithSplitsAlignment", along with "MySplitReader implements > AlignedSplitsReader". > > Should we make the API symmetric? I'd consider any one of the following as > symmetric. > > a) "MySourceReader implements SourceReader, WithSplitsAlignment", along > with "MySplitReader implements SplitReader, WithSplitsAlignment", or > b) "MySourceReader implements AlignedSourceReader" and "MySplitReader > implements AlignedSplitReader", or > c) "MySourceReader implements SourceReader" and "MySplitReader implements > SplitReader". > > I think the latest proposal according to Dawid would be: > d) "MySourceReader implements SourceReader" and "MySplitReader implements > AlignedSplitReader". > I am fine with this API, although personally speaking I think it is simpler > to just add a new method to the split reader with default impl. > > @Dawid Wysakowicz <dwysakow...@apache.org> <dwysakow...@apache.org>, thanks > for the reply. > > Having said that, as I don't have a preference and I agree most of the > > sources will support the alignment I am fine following your suggestion to > have the SourceReader extending from WithWatermarksSplitsAlignment, but > would put the "supportsXXX" there, not in the Source to keep the two > methods together. > > One benefit of having the "supportsXXX" in Source is that this allows some > compile time check. For example, if a user enabled watermark alignment > while it is not supported by the Source, an exception can be thrown at > compile time. It seems in general useful. That said, I agree that API > cleanliness wise it is better to put the two methods together. > > Thanks, > > Jiangjie (Becket) Qin > > On Mon, Apr 25, 2022 at 5:56 PM Jark Wu <imj...@gmail.com> <imj...@gmail.com> > wrote: > > > Thank Dawid for the reminder on FLIP-182. Sorry I did miss it. > I don't have other concerns then. > > Best, > Jark > > On Mon, 25 Apr 2022 at 15:40, Dawid Wysakowicz <dwysakow...@apache.org> > <dwysakow...@apache.org> > wrote: > > > @Jark: > > 1. Will the framework always align with watermarks when the source > implements the interface? > I'm afraid not every case needs watermark alignment even if Kafka > implements the interface, > and this will affect the throughput somehow. I agree with Becket > we may need a > `supportSplitsAlignment()` method for users to configure the source to > enable/disable the alignment. > > 2. How does the framework calculate maxDesiredWatermark? > I think the algorithm of maxDesiredWatermark will greatly affect > > throughput > > if the reader is constantly > switching between pause and resume. Can users configure the alignment > offset? > > > This is covered in the previous FLIP[1] which has been already > > implemented > > in 1.15. In short, it must be enabled with the watermark strategy which > also configures drift and update interval. > > If we don't plan to extend this interface to support align other things, > > I > > suggest explicitly declaring > the purpose of the methods, such as `alignWatermarksForSplits` instead of > `alignSplits`. > > > Sure let's rename it. > > @Becket: > > I understand your point. On the other hand putting all methods, even with > "supportsXXX" methods for enabling certain features, makes the entry > threshold for writing a new source higher. Instead of focusing on the > > basic > > and required properties of the Source, the person implementing a source > must bother with and need to figure out what all of the extra features > > are > > about and how to deal with them. It makes it also harder to organize > methods in coupled groups as Jark said. > > Having said that, as I don't have a preference and I agree most of the > sources will support the alignment I am fine following your suggestion to > have the SourceReader extending from WithWatermarksSplitsAlignment, but > would put the "supportsXXX" there, not in the Source to keep the two > methods together. > > Lastly, I agree it is really unfortunate the "alignSplits" methods differ > slightly for SourceReader and SpitReader. The reason for that is > SourceReaderBase deals only with SplitIds, whereas SplitReader needs the > actual splits to pause them. I found the discrepancy acceptable for the > sake of simplifying changes significantly, especially as they would > > highly > > likely impact performance as we would have to perform additional lookups. > Moreover the SplitReader is a secondary interface. > > Best, > > Dawid > > [1] https://cwiki.apache.org/confluence/x/hQYBCw > > On 24/04/2022 17:15, Jark Wu wrote: > > Thanks for the effort, Dawid and Sebastian! > > I just have some minor questions (maybe I missed something). > > 1. Will the framework always align with watermarks when the source > implements the interface? > I'm afraid not every case needs watermark alignment even if Kafka > implements the interface, > and this will affect the throughput somehow. I agree with Becket > we may need a > `supportSplitsAlignment()` method for users to configure the source to > enable/disable the alignment. > > 2. How does the framework calculate maxDesiredWatermark? > I think the algorithm of maxDesiredWatermark will greatly affect > > throughput > > if the reader is constantly > switching between pause and resume. Can users configure the alignment > offset? > > 3. Interface/Method Name. > Can the interface be used to align other things in the future? For > > example, > > align read speed, I have > seen users requesting global rate limits. This feature may also need an > interface like this. > If we don't plan to extend this interface to support align other things, > > I > > suggest explicitly declaring > the purpose of the methods, such as `alignWatermarksForSplits` instead of > `alignSplits`. > > 4. Interface or Method. > I don't have a strong opinion on this. I think they have their own > advantages. > In Flink SQL, we heavily use Interfaces for extending abilities > (SupportsXxxx) for TableSource/TableSink, > and I prefer Interfaces rather than methods in this case. When you have a > bunch of abilities and each ability > has more than one method, Interfaces can help to organize them and make > users clear which methods > need to implement when you want to have an ability. > > > Best, > Jark > > On Sun, 24 Apr 2022 at 18:13, Becket Qin <becket....@gmail.com> > <becket....@gmail.com> < > > becket....@gmail.com> wrote: > > Hi Dawid, > > Thanks for the explanation. Apologies that I somehow misread a bunch of > "align" and thought they were "assign". > > Regarding 1, by default implementation, I was thinking of the default > > no-op > > implementation. I am a little worried about the proliferation of > > decorative > > interfaces. I think the most important thing about interfaces is that > > they > > are easy to understand. In this case, I prefer adding new method to the > existing interface for the following reasons: > > a) I feel the biggest drawback of decorative interfaces is which > > interface > > they can decorate and which combinations of multiple decorative > > interfaces > > are valid. In the current FLIP, the withSplitsAlignment interface is only > applicable to the SourceReader which means it can't decorate any other > interface. From an interface design perspective, a natural question is > > why > > not let "AlignedSplitReader" extend "withSplitsAlignment"? And it is also > natural to assume that a split reader implementing both SplitReader and > WithSplitAlignment would work, because a source reader implementing > SourceReader and withSplitsAlignment works. So why isn't there an > > interface > > of AlignedSourceReader? In the future, if there is a new feature added > (e.g. sorted or pre-partitioned data aware), are we going to create > > another > > interface of SplitReader such as SortedSplitReader or > > PrePartitionedAware? > > Can they be combined? So I think the additional decorative interface like > withSplitsAlignment actually increases the understanding cost of users > because they have to know what decorative interfaces are there, which > interface they can decorate and which combinations of the decorative > interfaces are valid and which are not. Ideally we want to avoid that. To > be clear, I am not opposing having an interface of withSplitsAlignment, > > it > > is completely OK to have it as an internal interface and let SourceReader > and SplitReader both extend it. > > b) Adding a new method to the SourceReader with a default implementation > > of > > no-op would help avoid logic branching in the source logic, especially > given that we agree that the vast majority of the SourceReader > implementations, if not all, would just extend from the SourceReaderBase. > That means adding a new method to the interface would effectively give > > the > > same user experience, but simpler. > > c) A related design principle that may be worth discussing is how do we > > let > > the Source implementations tell Flink what capability is supported and > > what > > is not. Personally speaking I feel the most intuitive place to me is in > > the > > Source itself, because that is the entrance of the entire Source > > connector > > logic. > > Based on the above thoughts, I am wondering if the following interface > would be easier to understand by the users. > > - Change "withSplitsAlignment" to internal interface, let both > > SourceReader > > and SplitReader extend from it, with a default no-op implementation. > - Add a new method "boolean supportSplitsAlignment()" to the Source > interface, with a default implementation returning false. Sources that > > have > > implemented the alignment logic can change this to return true, and > override the alignSplits() methods in the SourceReader / SplitReader if > needed. > - In the future, if a new optional feature is going to be added to the > Source, and that feature requires the awareness from Flink, we can add > > more > > such methods to the Source. > > What do you think? > > Thanks, > > Jiangjie (Becket) Qin > > > > > > On Fri, Apr 22, 2022 at 4:05 PM Dawid Wysakowicz <dwysakow...@apache.org> > <dwysakow...@apache.org> > > <dwysakow...@apache.org> <dwysakow...@apache.org> > > wrote: > > > @Konstantin: > > As part of this FLIP, the `AlignedSplitReader` interface (aka the stop & > resume behavior) will be implemented for Kafka and Pulsar only, correct? > > Correct, as far as I know though, those are the only sources which > > consume > > concurrently from multiple splits and thus alignment applies. > > @Thomas: > > I wonder if "supporting" split alignment in SourceReaderBase and then > > doing > > nothing if the split reader does not implement AlignedSplitReader could > > be > > misleading? Perhaps WithSplitsAlignment can instead be added to the > specific source reader (i.e. KafkaSourceReader) to make it explicit that > the source actually supports it. > > I understand your concern. Hmm, I think we could actually do that. Given > the actual implementation of the SourceReaderBase#alignSplits is rather > short (just a forward to the corresponding method of SplitFetcher), we > could reimplement it in the actual source implementations. This solution > has the downside though. Authors of new sources would have to do two > things: extend from AlignedSplitReader and implement > > WithSplitsAssignment, > > instead of just extending AlignedSplitReader. I would be fine with such a > tradeoff though. What others think? > > @Steven: > > For this part from the motivation section, is it accurate? Let's assume > > one > > source task consumes from 3 partitions and one of the partition is > significantly slower. In this situation, watermark for this source task > won't hold back as it is reading recent data from other two Kafka > partitions. As a result, it won't hold back the overall watermark. I > thought the problem is that we may have late data for this slow > > partition. > > It will hold back the watermark. Watermark of an operator is the minimum > of watermarks of all splits[1] > > I have another question about the restart. Say split alignment is > triggered. checkpoint is completed. job failed and restored from the last > checkpoint. because alignment decision is not checkpointed, initially > alignment won't be enforced until we get a cycle of watermark aggregation > and propagation, right? Not saying this corner is a problem. Just want to > understand it more. > > Your understanding is correct. > > @Becket: > > 1. I think watermark alignment is sort of a general use case, so should > > we > > just add the related methods to SourceReader directly instead of > introducing the new interface of WithSplitAssignment? We can provide > default implementations, so backwards compatibility won't be an issue. > > I don't think we can provide a default implementation. How would we do > that? Would it be just a no-op? Is it better than having an opt-in > interface? The default implementation would have to be added exclusively > > in > > a *Public* SourceReader interface. By the way notice SourceReaderBase > does extend from WithSplitsAlignment, so effectively all implementations > > do > > handle the alignment case. To be honest I think it is impossible to > implement the SourceReader interface directly by end users. > > 2. As you mentioned, the SplitReader interface probably also needs some > change to support throttling at the split granularity. Can you add that > interface change into the public interface section as well? > > It has been added from the beginning. See *AlignedSplitReader.* > > 3. Nit, can we avoid using the method name assignSplits here, given that > > it > > is not actually changing the split assignments? It seems something like > pauseOrResumeSplits(), or adjustSplitsThrottling() is more accurate. > > The method's called *alignSplits*, not assign. Do you still prefer a > different name for that? Personally, I am open for suggestions here. > > Best, > > Dawid > > [1] > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/#watermark-generation > > On 22/04/2022 05:59, Becket Qin wrote: > > Thanks for driving the effort, Sebastion. I think the motivation makes a > lot of sense. Just a few suggestions / questions. > > 1. I think watermark alignment is sort of a general use case, so should > > we > > just add the related methods to SourceReader directly instead of > introducing the new interface of WithSplitAssignment? We can provide > default implementations, so backwards compatibility won't be an issue. > > 2. As you mentioned, the SplitReader interface probably also needs some > change to support throttling at the split granularity. Can you add that > interface change into the public interface section as well? > > 3. Nit, can we avoid using the method name assignSplits here, given that > > it > > is not actually changing the split assignments? It seems something like > pauseOrResumeSplits(), or adjustSplitsThrottling() is more accurate. > > Thanks, > > Jiangjie (Becket) Qin > > On Thu, Apr 21, 2022 at 11:39 PM Steven Wu <stevenz...@gmail.com> > <stevenz...@gmail.com> < > > stevenz...@gmail.com> < > > stevenz...@gmail.com> wrote: > > However, a single source operator may read data from multiple > > splits/partitions, e.g., multiple Kafka partitions, such that even with > watermark alignment the source operator may need to buffer excessive > > amount > > of data if one split emits data faster than another. > > For this part from the motivation section, is it accurate? Let's assume > > one > > source task consumes from 3 partitions and one of the partition is > significantly slower. In this situation, watermark for this source task > won't hold back as it is reading recent data from other two Kafka > partitions. As a result, it won't hold back the overall watermark. I > thought the problem is that we may have late data for this slow > > partition. > > I have another question about the restart. Say split alignment is > triggered. checkpoint is completed. job failed and restored from the last > checkpoint. because alignment decision is not checkpointed, initially > alignment won't be enforced until we get a cycle of watermark aggregation > and propagation, right? Not saying this corner is a problem. Just want to > understand it more. > > > > On Thu, Apr 21, 2022 at 8:20 AM Thomas Weise <t...@apache.org> > <t...@apache.org> < > > t...@apache.org> < > > t...@apache.org> wrote: > > Thanks for working on this! > > I wonder if "supporting" split alignment in SourceReaderBase and then > > doing > > nothing if the split reader does not implement AlignedSplitReader could > > be > > misleading? Perhaps WithSplitsAlignment can instead be added to the > specific source reader (i.e. KafkaSourceReader) to make it explicit that > the source actually supports it. > > Thanks, > Thomas > > > On Thu, Apr 21, 2022 at 4:57 AM Konstantin Knauf <kna...@apache.org> > <kna...@apache.org> < > > kna...@apache.org> < > > kna...@apache.org> > > wrote: > > > Hi Sebastian, Hi Dawid, > > As part of this FLIP, the `AlignedSplitReader` interface (aka the stop > > & > > resume behavior) will be implemented for Kafka and Pulsar only, > > correct? > > +1 in general. I believe it is valuable to complete the watermark > > aligned > > story with this FLIP. > > Cheers, > > Konstantin > > > > > > > > On Thu, Apr 21, 2022 at 12:36 PM Dawid Wysakowicz <dwysakow...@apache.org> > > wrote: > > > To be explicit, having worked on it, I support it ;) I think we can > start a vote thread soonish, as there are no concerns so far. > > Best, > > Dawid > > On 13/04/2022 11:27, Sebastian Mattheis wrote: > > Dear Flink developers, > > I would like to open a discussion on FLIP 217 [1] for an extension > > of > > Watermark Alignment to perform alignment also in SplitReaders. To > > do > > so, > > SplitReaders must be able to suspend and resume reading from split > > sources > > where the SourceOperator coordinates and controlls suspend and > > resume. > > To > > gather information about current watermarks of the SplitReaders, we > > extend > > the internal WatermarkOutputMulitplexer and report watermarks to > > the > > SourceOperator. > > There is a PoC for this FLIP [2], prototyped by Arvid Heise and > > revised > > and > > reworked by Dawid Wysakowicz (He did most of the work.) and me. The > > changes > > are backwards compatible in a way that if affected components do > > not > > support split alignment the behavior is as before. > > Best, > Sebastian > > [1] > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-217+Support+watermark+alignment+of+source+splits > > [2] https://github.com/dawidwys/flink/tree/aligned-splits > > -- > > Konstantin Knaufhttps://twitter.com/snntrablehttps://github.com/knaufk > >