I had an offline discussion with Piotr and here is the summary. Please correct me if I miss something, Piotr.
There are two things we would like to seek more opinions from the community, so we can make progress on this FLIP. 1. The General pattern to add obligatory features to existing interfaces. *********************************************************************************** For interfaces exposed to the developers for implementation, they are either intended to be *optional* or *obligatory. *While it is quite clear about how to convey that intention when creating the interfaces, it is not as commonly agreed when we are adding new features to an existing interface. In general, Flink uses decorative interfaces when adding optional features to existing interfaces. Both Piotr and I agree that looks good. Different opinions are mainly about how to add obligatory features to the existing interfaces, probably due to different understandings of "obligatory". We have discussed about four options: *Option 1:* - Just add a new method to the existing interface. - For backwards compatibility, the method would have a default implementation throwing "UnsupportedOperationException". - In the next major version, remove the default implementation. - For the developers, any method with a default implementation throwing an "UnsupportedOperationException" should be taken as obligatory. *Option 2:* - Always make the features optional by adding a decorative interface, just like ordinary optional features. - Inform the developers via documentation that this feature is obligatory, although it looks like optional from the code. - In case the developers did not implement the decorative interface, throw an exception - In the next major version, move the methods in the decorative interface to the base interface, and deprecate the decorative interface. *Option 3:* - Always bump the major version when a new obligatory feature is added, even if we may have to do it frequently. *Option 4:* - Add a V2, V3... of the interface affected by the new obligatory feature. - In the next major versions, deprecate old versions of the interfaces. Both Piotr and me agreed that option 3 and option 4 have a big side effect and should be avoided. We have different preference between option 1 and option 2. Personally I prefer option 1, the reasons are: a) simple and intuitive. Java 8 introduced the default impl in interfaces exactly for interface evolving, and this is a common pattern in many projects. b) prominent to the developers that the feature is expected to be implemented, because it explicitly throws an exception in the default impl. c) low maintenance overhead - the Flink framework can always assume the method exists, so no special handling logic is needed. d) communicate a clear semantic boundary between optional and obligatory features in the Flink to the developers. - Optional: Jobs still run without exception if these methods are not implemented. e.g. all the SupportsXXXPushDown interfaces. - Obligatory: Jobs may fail if these methods are not implemented properly. e..g SourceReader#pauseOrResumeSplits(). This is a common pattern in Java, e.g. Iterator.remove() by default throws "UnsupportedOperationException", informing the implementation that things may go wrong if this method is not implemented. As for option 2, Although the API itself sounds clean, it misleads people to think of an obligatory feature to be optional - from the code the feature is optional, but the documents say it is obligatory. We probably should avoid such code-doc inconsistency, as people will be confused. And I would actually be bewildered that sometimes not implementing an "optional" feature is fine, but sometimes it causes the jobs to fail. In response to the argument that the method with a default implementation is always optional, if that is true, it actually means all the interfaces should be immutable once they are created. If we want to add a method to an existing interface, for backwards compatibility, we will have to provide a default implementation. And the fact it has a default implementation indicates the method is optional. If that method is optional, it should reside in a separate decorative interface, otherwise it clogs that existing interface. Therefore, people should never add a method to an existing interface. I find this conclusion a bit extreme. Piotr prefers option 2, his opinions are: a) Obligatory methods are the methods that fail the code compilation if not implemented. b) All obligatory methods should reside in the base interface, without a default implementation. And all the optional methods should be in decorative interfaces. This is a clean API. c) due to b), there isn't a viable solution to add an obligatory method to an existing interface in a backwards compatible way. Unless we are OK with breaking backwards compatibility, all the interfaces should be treated as immutable. As a compromise, we might as well just treat all the features added later as optional features. This way we keep the API clean. d) based on b) and c), option 2 has a clean API, while option 1 does not. e) It is OK that the code itself in option 2 indicates the developers that a feature is optional. We will rely on the documentation to correct that and clarify that the feature is actually obligatory. f) Regarding the effectiveness of making people aware that the feature is obligatory, Option 1 and Option 2 are similar. For people that do not read the release note / documentation, they will mistake the feature to be optional anyways. As for option 1: For developers, the feature is still optional due to the default implementation in the interface, regardless of what the default implementation does, because the code compiles without overriding these methods. Also, another problem of this option is that for users that do not know about the history of the interface, they may be confused by the default implementation throwing an exception. 2. For this particular FLIP, should it be optional or not? *********************************************************************************** As mentioned in the previous email, I feel this FLIP should be obligatory, for the following reasons: 1. The Flink framework exposes the watermark alignment API to the end users. From the end users' perspective, the feature should be available regardless of the implementation details in the pluggables. This is true for any other methods exposed as the Flink API. 2. If a Source is not pausable, the end user should receive an exception when enable the watermark alignment, (both Piotr and me agree on this). In that case, it meets my criteria of obligatory feature because not implementing the feature causes a framework API to throw exception and fails the job. On the other hand, Piotr does not have a strong opinion regarding whether this feature should be optional or not. Thanks for reading through this long email. So basically in order to make progress on this FLIP, we want to see what do people feel about the above two topics. Thanks, Jiangjie (Becket) Qin On Thu, May 26, 2022 at 3:06 PM Piotr Nowojski <pnowoj...@apache.org> wrote: > Hi Becket, > > I still sustain what I wrote before: > > I think I would still vote soft -1 on this option, but I wouldn't block > it in case I am out-voted. > > > I think it might be helpful to agree on the definition of optional in our > case. > > For me it doesn't matter whether a default method throwing an exception we > call optional or non-optional. As long as we keep it this way, the effect > is the same. It's effectively a method that a user doesn't have to > implement. If interface/system allows some methods to be not implemented, > some users will do just that, regardless if we call it and document as > non-optional. And at the same time it's clogging the base interface. > > By the way, just the need for a java-doc/documentation explaining the > existence of some construct is a bad smell (code should be self-documenting > and default method throwing an UnsupportedOperationException is not). > > > Please note that so far we do not assume whether the feature is in > > the original API or it is added later. A newly added feature can also be > > non-optional, although it might take some time for all the pluggable > > developers to catch up, and they should still work if the new feature is > > not used until they catch up. In contrast, we may never expect an > optional > > feature to catch up, because leaving it unimplemented is also blessed. > > > > Let's take the checkpointing as an example. Imagine Flink did not support > > checkpointing before release 1.16. And now we are trying to add > > checkpointing to Flink. So we exposed the checkpoint configuration to the > > end users. In the meantime, will we tell the pluggable (e.g. operators, > > connectors) developers that methods like "snapshotState()" is optional? > If > > we do that, the availability of checkpointing in Flink would be severely > > weakened. But apparently we should still allow the existing > implementations > > to work without checkpointing. It looks to me that adding the method to > the > > pluggable interfaces with a default implementation throwing > > "UnsupportedOperationException" would be the solution here. Please note > > that in this case, having the default implementation does not mean this > is > > optional. It is just the technique to support backwards compatibility in > > the feature evolution. The fact that this method is in the base interface > > suggests it is not optional, so the developers SHOULD implement it. > > I would soft vote -1 for having the default method throwing > UnsupportedOperationException as one of thing for this (FLIP-217) special > circumstances. > > At the moment, without thinking this over too much, I would vote harder -1 > for having this as a general rule when adding new features. If we ever end > up with an API that is littered with default methods throwing > UnsupportedOperationException that are documented as "non optional" it > would be IMO a big design failure. I would be ok-ish with that, only if > that was a temporary thing and we had an aggressive plan to release more > often new major Flink versions (2.x.y, 3.x.y, ...) breaking API > compatibility, that would get rid of those default methods. Adding > checkpointing and methods like "snapshotState()" would IMO easily justify a > new major Flink release. In that case we could add those methods with > default implementation for some transition period, a one or two minor > releases, followed by a clean up in a major release. However I would still > argue that it would be cleaner/better to add a decorative interface like > `CheckpointedOperator` interface instead of adding those default methods to > the base `Operator` interface. > > I think I can sum up our disagreement here is that I would like to keep the > interfaces simpler, with only obligatory methods/features on one side and > clearly optional features on the other. While you would like to add an > extra third state in between those two? > > Best, > Piotrek > > > > czw., 12 maj 2022 o 04:25 Becket Qin <becket....@gmail.com> napisał(a): > > > Thanks for the clarification, Piotr and Sebastian. > > > > It looks like the key problem is still whether the implementation of > > pausable splits in the Sources should be optional or not. > > > > I think it might be helpful to agree on the definition of optional in our > > case. To me: > > Optional = "You CAN leave the method unimplemented, and that is fine." > > Non-Optional = "You CAN leave the method unimplemented, but you SHOULD > NOT, > > because people assume this works." > > > > I think one sufficient condition of a Non-Optional feature is that if the > > feature is exposed through the framework API, Flink should expect the > > pluggables to support this feature by default. Otherwise the availability > > of that feature becomes undefined. > > > > Please note that so far we do not assume whether the feature is in > > the original API or it is added later. A newly added feature can also be > > non-optional, although it might take some time for all the pluggable > > developers to catch up, and they should still work if the new feature is > > not used until they catch up. In contrast, we may never expect an > optional > > feature to catch up, because leaving it unimplemented is also blessed. > > > > Let's take the checkpointing as an example. Imagine Flink did not support > > checkpointing before release 1.16. And now we are trying to add > > checkpointing to Flink. So we exposed the checkpoint configuration to the > > end users. In the meantime, will we tell the pluggable (e.g. operators, > > connectors) developers that methods like "snapshotState()" is optional? > If > > we do that, the availability of checkpointing in Flink would be severely > > weakened. But apparently we should still allow the existing > implementations > > to work without checkpointing. It looks to me that adding the method to > the > > pluggable interfaces with a default implementation throwing > > "UnsupportedOperationException" would be the solution here. Please note > > that in this case, having the default implementation does not mean this > is > > optional. It is just the technique to support backwards compatibility in > > the feature evolution. The fact that this method is in the base interface > > suggests it is not optional, so the developers SHOULD implement it. > > > > When it comes to this FLIP, I think it meets the criteria of non-optional > > features, so we should just use the evolution path of non-optional > > features. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > > > On Wed, May 11, 2022 at 9:14 PM Piotr Nowojski <pnowoj...@apache.org> > > wrote: > > > > > Hi, > > > > > > Actually previously I thought about having a decorative interface and > > > whenever watermark alignment is enabled, checking that the source > > > implements the decorative interface. If not, throwing an exception. > > > > > > The option with default methods in the source interfaces throwing > > > `UnsupportedOperationException` I think still suffers from the same > > > problems I mentioned before. It's still an optional implementation and > at > > > the same time it's clogging the base interface. I think I would still > > vote > > > soft -1 on this option, but I wouldn't block it in case I am out-voted. > > > > > > Best, > > > Piotrek > > > > > > śr., 11 maj 2022 o 14:22 Sebastian Mattheis <sebast...@ververica.com> > > > napisał(a): > > > > > > > Hi Becket, > > > > > > > > Thanks a lot for your fast and detailed response. For me, it > converges > > > and > > > > dropping the supportsX method sounds very reasonable to me. (Side > note: > > > > With "pausable splits" enabled as "default" I think we misunderstood. > > As > > > > you described now "default" I understand as that it should be the new > > > > recommended way of implementation, and I think that is fully valid. > > > Before, > > > > I understood "default" here as the default implementation, i.e., > > throwing > > > > UnsupportedOperationException, which is the exact opposite. :) ) > > > > > > > > Nevertheless: As mentioned, an open question for me is if watermark > > > > alignment should enforce pausable splits. For clarification, the > > current > > > > documentation [1] says: > > > > > > > > *Note:* As of 1.15, Flink supports aligning across tasks of the same > > > >> source and/or different sources. It does not support aligning > > > >> splits/partitions/shards in the same task. > > > >> > > > >> In a case where there are e.g. two Kafka partitions that produce > > > >> watermarks at different pace, that get assigned to the same task > > > watermark > > > >> might not behave as expected. Fortunately, worst case it should not > > > perform > > > >> worse than without alignment. > > > >> > > > >> Given the limitation above, we suggest applying watermark alignment > in > > > >> two situations: > > > >> > > > >> 1. You have two different sources (e.g. Kafka and File) that > > produce > > > >> watermarks at different speeds > > > >> 2. You run your source with parallelism equal to the number of > > > >> splits/shards/partitions, which results in every subtask being > > > assigned a > > > >> single unit of work. > > > >> > > > >> I personally see no issue in implementing and I see no reason > against > > > > implementing this dependency of watermark alignment and pausable > > splits. > > > (I > > > > think this would even be a good path towards shaping watermark > > alignment > > > in > > > > 1.16.) However, "I don't see" means that I would be happy to hear > > Dawid's > > > > and Piotrek's opinions as they implemented watermark alignment based > on > > > > FLIP-182 [2] and I don't want to miss relevant rationale/background > > info > > > > from their side. > > > > > > > > *@Piotrek* *@Dawid *What do you think? > > > > > > > > Regards, > > > > Sebastian > > > > > > > > [1] > > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment-_beta_ > > > > [2] > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources?src=contextnavpagetreemode > > > > > > > > On Wed, May 11, 2022 at 1:30 PM Becket Qin <becket....@gmail.com> > > wrote: > > > > > > > >> +dev > > > >> > > > >> Hi Sebastian, > > > >> > > > >> Thank you for the summary. Please see the detailed replies inline. > As > > a > > > >> recap of my suggestions. > > > >> > > > >> 1. Pausable splits API. > > > >> a) Add default implementations to methods "pauseOrResumeSplits" in > > > both > > > >> SourceReader and SplitReader where both default implementations > throw > > > >> UnsupportedOperationException. > > > >> > > > >> 2. User story. > > > >> a) We tell users to enable the watermark alignment as they like. > > > This > > > >> is exactly what the current Flink API is. > > > >> b) We tell the source developers, please implement pausable > > splits, > > > >> otherwise bad things may happen. Think of it like you are expected > to > > > >> implement SourceReader#snapshotState() properly, otherwise > exceptions > > > will > > > >> be thrown when users enable checkpointing. > > > >> > > > >> Thanks, > > > >> > > > >> Jiangjie (Becket) Qin > > > >> > > > >> On Wed, May 11, 2022 at 4:45 PM Sebastian Mattheis < > > > >> sebast...@ververica.com> wrote: > > > >> > > > >>> Hi Becket, Hi everybody, > > > >>> > > > >>> I'm sorry if I misread the messages but I could not derive an > > agreement > > > >>> from the mailing list. Nevertheless, if I understand you right the > > > >>> suggestion is: > > > >>> > > > >>> * Add default implementations to methods "pauseOrResumeSplits" in > > both > > > >>> SourceReader and SplitReader where both default implementations > throw > > > >>> UnsupportedOperationException. > > > >>> > > > >> Yes. > > > >> > > > >> * Add "supportsPauseOrResumeSplits" to the Source interface. (In the > > > >>> following, I refer to supporting this as "pausable splits".) > > > >>> > > > >> We may no longer need this if pausable splits are expected to be > > > >> implemented by the source developers, i.e. non-optional. Having this > > > method > > > >> would then be somewhat misleading as it looks like the sources that > do > > > not > > > >> support pausable splits are also acceptable in the long term. So API > > > wise, > > > >> I'd say maybe we should remove this for this FLIP, although I > believe > > > this > > > >> supportXXX pattern itself is still attractive for optional features. > > > >> > > > >> > > > >>> > > > >>> To make the conclusions explicit: > > > >>> > > > >>> 1. The implementation of pauseOrResumeSplits in both interfaces > > > >>> SourceReader and SplitReader are optional where the default is that > > it > > > >>> doesn't support it. (--> This means that the implementation is > still > > > >>> optional for the source developer.) > > > >>> > > > >> It is optional for backwards compatibility with existing sources, as > > > they > > > >> may still compile without code change. But starting from this FLIP, > > > Flink > > > >> will always optimistically assume that all the sources support > > pausable > > > >> splits. If a source does not support pausable splits, it goes to an > > > error > > > >> handling path when watermark alignment is enabled on it. This is > > > different > > > >> from a usual optional feature, where no error is expected. > > > >> > > > >> > > > >>> 2. If watermark alignment is enabled in the application code by > > adding > > > >>> withWatermarkAlignment to the WatermarkStrategy while SourceReader > or > > > >>> SplitReader do not support pausableSplits, we throw an > > > >>> UnsupportedOperationException. > > > >>> > > > >> Yes. > > > >> > > > >> > > > >>> 3. With regard to your statement: > > > >>> > > > >>>> [...] basically means watermark alignment is an non-optional > feature > > > to > > > >>>> the end users. > > > >>> > > > >>> You actually mean that "pausable splits" are non-optional for the > app > > > >>> developer if watermark alignment is enabled. However, watermark > > > alignment > > > >>> is optional and can be enabled/disabled. > > > >>> > > > >> Yes, watermark alignment can be enabled/disabled in individual > sources > > > in > > > >> Flink jobs, which basically means the code supporting watermark > > > alignment > > > >> has to already be there. That again means the Source developers are > > also > > > >> expected to support pausable splits by default. So this way we > > > essentially > > > >> tell the end users that you may enable / disable this feature as you > > > wish, > > > >> and tell the source developers that you SHOULD implement this > because > > > the > > > >> end users may turn it on/off at will. And if the source does not > > support > > > >> pausable splits, that goes to an error handling path when watermark > > > >> alignment is enabled on it. So users know they have to explicitly > > > exclude > > > >> this source. > > > >> > > > >> > > > >>> > > > >>> So far it's totally clear to me and I hope this is what you mean. I > > > also > > > >>> agree with both statements: > > > >>> > > > >>> So making that expectation aligned with the source developers seems > > > >>>> reasonable. > > > >>>> > > > >>> > > > >>> I think this is a simple and clean solution from both the end user > > and > > > >>>> source developers' standpoint. > > > >>>> > > > >>> > > > >>> However, a last conclusion derives from 3. and is an open question > > for > > > >>> me: > > > >>> > > > >>> 4. The feature of "pausable splits" is now tightly bound to > watermark > > > >>> alignment, i.e., if sources do not support "pausable splits" one > can > > > not > > > >>> enable watermark alignment for these sources. This dependency is > not > > > the > > > >>> current status of watermark alignment implementation because it > > is/was > > > >>> implemented without pausable splits. Do we want to introduce this > > > >>> dependency? (This is an open question. I cannot judge that.) > > > >>> > > > >> The watermark alignment basically relies on the pausable splits, > > right? > > > >> So personally I found it quite reasonable that if the source does > not > > > >> support pausable splits, end users cannot enable watermark alignment > > on > > > it. > > > >> > > > >> > > > >>> If something is wrong, please correct me. > > > >>> > > > >>> Regards, > > > >>> Sebastian > > > >>> > > > >>> On Wed, May 11, 2022 at 9:05 AM Becket Qin <becket....@gmail.com> > > > wrote: > > > >>> > > > >>>> Hi Sebastian, > > > >>>> > > > >>>> Thanks for the reply and patient discussion. I agree this is a > > tricky > > > >>>> decision. > > > >>>> > > > >>>> > > > >>>>> Nevertheless, Piotr has valid concerns about Option c) which I > see > > as > > > >>>>> follows: > > > >>>>> (1) An interface with default NOOP implementation makes the > > > >>>>> implementation optional. And in my opinion, a default > > implementation > > > is and > > > >>>>> will remain a way of making implementation optional because even > in > > > future > > > >>>>> a developer can decide to implement the "old flavor" without > > support > > > for > > > >>>>> pausable splits. > > > >>>>> (2) It may not be too critical but I also find it suboptimal that > > > with > > > >>>>> a NOOP default implementation there is no way to check at runtime > > if > > > >>>>> SourceReader or SplitReader actually support pausing. (To do so, > > one > > > would > > > >>>>> need a supportsX method which makes it again more complicated.)\ > > > >>>> > > > >>>> > > > >>>> Based on the last few messages in the mailing list. Piotr and I > > > agreed > > > >>>> that the default implementation should just throw an > > > >>>> UnsupportedOperationException if the source is unpausable. So this > > > >>>> basically tells the Source developers that this feature is > expected > > > to be > > > >>>> supported. Because we cannot prevent end users from putting an > > > unpausable > > > >>>> source into the watermark alignment group, that basically means > > > watermark > > > >>>> alignment is an non-optional feature to the end users. So making > > that > > > >>>> expectation aligned with the source developers seems reasonable. > > And > > > if a > > > >>>> source does not support this feature, the end users should > > explicitly > > > >>>> remove that source from the watermark alignment group. > > > >>>> > > > >>>> Personally speaking I think this is a simple and clean solution > from > > > >>>> both the end user and source developers' standpoint. > > > >>>> > > > >>>> Does this address your concerns? > > > >>>> > > > >>>> Thanks, > > > >>>> > > > >>>> Jiangjie (Becket) Qin > > > >>>> > > > >>>> On Wed, May 11, 2022 at 2:52 PM Sebastian Mattheis < > > > >>>> sebast...@ververica.com> wrote: > > > >>>> > > > >>>>> Hi Piotr, Hi Becket, Hi everybody, > > > >>>>> > > > >>>>> we, Dawid and I, discussed the various suggestions/options and we > > > >>>>> would be okay either way because we find neither solution is > > perfect > > > just > > > >>>>> because of the already present complexity. > > > >>>>> > > > >>>>> Option c) Adding methods to the interfaces of SourceReader and > > > >>>>> SplitReader > > > >>>>> Option a) Adding decorative interfaces to be used by SourceReader > > and > > > >>>>> SplitReader > > > >>>>> > > > >>>>> As of the current status (v. 12) of the FLIP [1], it is based on > > > >>>>> Option c) which we find acceptable because the complexity added > is > > > only a > > > >>>>> single method. > > > >>>>> > > > >>>>> Nevertheless, Piotr has valid concerns about Option c) which I > see > > as > > > >>>>> follows: > > > >>>>> (1) An interface with default NOOP implementation makes the > > > >>>>> implementation optional. And in my opinion, a default > > implementation > > > is and > > > >>>>> will remain a way of making implementation optional because even > in > > > future > > > >>>>> a developer can decide to implement the "old flavor" without > > support > > > for > > > >>>>> pausable splits. > > > >>>>> (2) It may not be too critical but I also find it suboptimal that > > > with > > > >>>>> a NOOP default implementation there is no way to check at runtime > > if > > > >>>>> SourceReader or SplitReader actually support pausing. (To do so, > > one > > > would > > > >>>>> need a supportsX method which makes it again more complicated.) > > > >>>>> > > > >>>>> However, we haven't changed it because Option a) is also not > > optimal > > > >>>>> or straight-forward: > > > >>>>> (1) We need to add two distinct yet similar decorative interfaces > > > >>>>> since, as mentioned, the signatures of the methods are different. > > For > > > >>>>> example, we would need decorative interfaces like > > > >>>>> `SplitReaderWithPausableSplits` and > > `SourceReaderWithPausableSplits`. > > > >>>>> (2) As a consequence, we would need to somehow document how/where > > to > > > >>>>> implement both interfaces and how this relates to each other. > This > > > we could > > > >>>>> solve by adding a note in the interface of SourceReader and > > > SplitReader and > > > >>>>> reference to the decorative interfaces but it still increases > > > complexity > > > >>>>> too. > > > >>>>> > > > >>>>> In summary, we see both as acceptable and preferred over other > > > >>>>> options. The question is if we can find a solution or compromise > > > that is > > > >>>>> acceptable for everybody to reach consensus. > > > >>>>> > > > >>>>> Please let us know what you think because we would be happy if we > > can > > > >>>>> conclude the discussion to avoid dropping the initiative on this > > > FLIP. > > > >>>>> > > > >>>>> Regards, > > > >>>>> Sebastian > > > >>>>> > > > >>>>> [1] > > > >>>>> > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=199540438 > > > >>>>> (v. 12) > > > >>>>> > > > >>>>> On Thu, May 5, 2022 at 10:13 AM Piotr Nowojski < > > pnowoj...@apache.org > > > > > > > >>>>> wrote: > > > >>>>> > > > >>>>>> Hi Guowei, > > > >>>>>> > > > >>>>>> as Dawid wrote a couple of messages back: > > > >>>>>> > > > >>>>>> > This is covered in the previous FLIP[1] which has been already > > > >>>>>> implemented in 1.15. In short, it must be enabled with the > > watermark > > > >>>>>> strategy which also configures drift and update interval > > > >>>>>> > > > >>>>>> So by default watermark alignment is disabled, regardless if a > > > source > > > >>>>>> supports it or not. > > > >>>>>> > > > >>>>>> Best, > > > >>>>>> Piotrek > > > >>>>>> > > > >>>>>> czw., 5 maj 2022 o 09:56 Guowei Ma <guowei....@gmail.com> > > > napisał(a): > > > >>>>>> > > > >>>>>>> Hi, > > > >>>>>>> > > > >>>>>>> We know that in the case of Bounded input Flink supports the > > Batch > > > >>>>>>> execution mode. Currently in Batch execution mode, flink is > > > executed > > > >>>>>>> on a > > > >>>>>>> stage-by-stage basis. In this way, perhaps watermark alignment > > > might > > > >>>>>>> not > > > >>>>>>> gain much. > > > >>>>>>> > > > >>>>>>> So my question is: Is watermark alignment the default > > behavior(for > > > >>>>>>> implemented source only)? If so, have you considered evaluating > > the > > > >>>>>>> impact > > > >>>>>>> of this behavior on the Batch execution mode? Or thinks it is > not > > > >>>>>>> necessary. > > > >>>>>>> > > > >>>>>>> Correct me if I miss something. > > > >>>>>>> > > > >>>>>>> Best, > > > >>>>>>> Guowei > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> On Thu, May 5, 2022 at 1:01 PM Piotr Nowojski < > > > >>>>>>> piotr.nowoj...@gmail.com> > > > >>>>>>> wrote: > > > >>>>>>> > > > >>>>>>> > Hi Becket and Dawid, > > > >>>>>>> > > > > >>>>>>> > > I feel that no matter which option we choose this can not > be > > > >>>>>>> solved > > > >>>>>>> > entirely in either of the options, because of the point above > > and > > > >>>>>>> because > > > >>>>>>> > the signature of SplitReader#pauseOrResumeSplits and > > > >>>>>>> > SourceReader#pauseOrResumeSplits are slightly different (one > > > >>>>>>> identifies > > > >>>>>>> > splits with splitId the other one passes the splits > directly). > > > >>>>>>> > > > > >>>>>>> > Yes, that's a good point in this case and for features that > > need > > > >>>>>>> to be > > > >>>>>>> > implemented in more than one place. > > > >>>>>>> > > > > >>>>>>> > > Is there any reason for pausing reading from a split an > > > optional > > > >>>>>>> feature, > > > >>>>>>> > > other than that this was not included in the original > > > interface? > > > >>>>>>> > > > > >>>>>>> > An additional argument in favor of making it optional is to > > > >>>>>>> simplify source > > > >>>>>>> > implementation. But on its own I'm not sure if that would be > > > >>>>>>> enough to > > > >>>>>>> > justify making this feature optional. Maybe. > > > >>>>>>> > > > > >>>>>>> > > I think it would be way simpler and clearer to just let end > > > >>>>>>> users and > > > >>>>>>> > Flink > > > >>>>>>> > > assume all the connectors will implement this feature. > > > >>>>>>> > > > > >>>>>>> > As I wrote above that would be an interesting choice to make > > > (ease > > > >>>>>>> of > > > >>>>>>> > implementation for new users, vs system consistency). > > Regardless > > > >>>>>>> of that, > > > >>>>>>> > yes, for me the main argument is the API backward > > compatibility. > > > >>>>>>> But let's > > > >>>>>>> > clear a couple of points: > > > >>>>>>> > - The current proposal adding methods to the base interface > > with > > > >>>>>>> default > > > >>>>>>> > implementations is an OPTIONAL feature. Same as the > decorative > > > >>>>>>> version > > > >>>>>>> > would be. > > > >>>>>>> > - Decorative version could implement "throw > > > >>>>>>> UnsupportedOperationException" > > > >>>>>>> > if user enabled watermark alignment just as well and I agree > > > >>>>>>> that's a > > > >>>>>>> > better option compared to logging a warning. > > > >>>>>>> > > > > >>>>>>> > Best, > > > >>>>>>> > Piotrek > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> > śr., 4 maj 2022 o 15:40 Becket Qin <becket....@gmail.com> > > > >>>>>>> napisał(a): > > > >>>>>>> > > > > >>>>>>> > > Thanks for the reply and patient discussion, Piotr and > Dawid. > > > >>>>>>> > > > > > >>>>>>> > > Is there any reason for pausing reading from a split an > > > optional > > > >>>>>>> feature, > > > >>>>>>> > > other than that this was not included in the original > > > interface? > > > >>>>>>> > > > > > >>>>>>> > > To be honest I am really worried about the complexity of > the > > > >>>>>>> user story > > > >>>>>>> > > here. Optional features like this have a high overhead. > > Imagine > > > >>>>>>> this > > > >>>>>>> > > feature is optional, now a user enabled watermark alignment > > and > > > >>>>>>> defined a > > > >>>>>>> > > few watermark groups. Would it work? Hmm, that depends on > > > >>>>>>> whether the > > > >>>>>>> > > involved Source has implmemented this feature. If the > Sources > > > >>>>>>> are well > > > >>>>>>> > > documented, good luck. Otherwise end users may have to look > > > into > > > >>>>>>> the code > > > >>>>>>> > > of the Source to see whether the feature is supported. > Which > > is > > > >>>>>>> something > > > >>>>>>> > > they shouldn't have to do. > > > >>>>>>> > > > > > >>>>>>> > > I think it would be way simpler and clearer to just let end > > > >>>>>>> users and > > > >>>>>>> > Flink > > > >>>>>>> > > assume all the connectors will implement this feature. > After > > > all > > > >>>>>>> the > > > >>>>>>> > > watermark group is not optinoal to the end users. If in > some > > > >>>>>>> rare cases, > > > >>>>>>> > > the feature cannot be supported, a clear > > > >>>>>>> UnsupportedOperationException > > > >>>>>>> > will > > > >>>>>>> > > be thrown to tell users to explicitly remove this Source > from > > > the > > > >>>>>>> > watermark > > > >>>>>>> > > group. I don't think we should have a warning message here, > > as > > > >>>>>>> they tend > > > >>>>>>> > to > > > >>>>>>> > > be ignored in many cases. If we do this, we don't even need > > the > > > >>>>>>> > supportXXX > > > >>>>>>> > > method in the Source for this feature. In fact this is > > exactly > > > >>>>>>> how many > > > >>>>>>> > > interfaces works today. For example, > > > >>>>>>> SplitEnumerator#addSplitsBack() is > > > >>>>>>> > not > > > >>>>>>> > > supported by Pravega source because it does not support > > partial > > > >>>>>>> failover. > > > >>>>>>> > > In that case, it simply throws an exception to trigger a > > global > > > >>>>>>> recovery. > > > >>>>>>> > > > > > >>>>>>> > > The reason we add a default implementation in this case > would > > > >>>>>>> just for > > > >>>>>>> > the > > > >>>>>>> > > sake of backwards compatibility so the old source can still > > > >>>>>>> compile. > > > >>>>>>> > Sure, > > > >>>>>>> > > in short term, this feature might not be supported by many > > > >>>>>>> existing > > > >>>>>>> > > sources. That is OK, and it is quite visible to the source > > > >>>>>>> developers > > > >>>>>>> > that > > > >>>>>>> > > they did not override the default impl which throws an > > > >>>>>>> > > UnsupportedOperationException. > > > >>>>>>> > > > > > >>>>>>> > > @Dawid, > > > >>>>>>> > > > > > >>>>>>> > > the Java doc of the SupportXXX() method in the Source would > > be > > > >>>>>>> the single > > > >>>>>>> > > >> source of truth regarding how to implement this feature. > > > >>>>>>> > > > > > > >>>>>>> > > > > > > >>>>>>> > > > > > >>>>>>> > > I also don't find it entirely true. Half of the classes are > > > >>>>>>> theoretically > > > >>>>>>> > > > optional and are utility classes from the point of view > how > > > the > > > >>>>>>> > > interfaces > > > >>>>>>> > > > are organized. Theoretically users do not need to use any > > of > > > >>>>>>> > > > SourceReaderBase & SplitReader. Would be weird to list > > their > > > >>>>>>> methods in > > > >>>>>>> > > the > > > >>>>>>> > > > Source interface. > > > >>>>>>> > > > > > >>>>>>> > > I think the ultimate goal of java docs is to guide users to > > > >>>>>>> implement the > > > >>>>>>> > > Source. If SourceReaderBase is the preferred way to > > implement a > > > >>>>>>> > > SourceReader, it seems worth mentioning that. Even the Java > > > >>>>>>> language > > > >>>>>>> > > documentation interfaces lists the konwn implementations > [1] > > so > > > >>>>>>> people > > > >>>>>>> > can > > > >>>>>>> > > leverage them. But for this particular case, if we make the > > > >>>>>>> feature > > > >>>>>>> > > non-optional, we don't even need the supportXXX() method > for > > > now. > > > >>>>>>> > > > > > >>>>>>> > > Thanks, > > > >>>>>>> > > > > > >>>>>>> > > Jiangjie (Becket) Qin > > > >>>>>>> > > > > > >>>>>>> > > > > > >>>>>>> > > > > > >>>>>>> > > On Wed, May 4, 2022 at 4:37 PM Dawid Wysakowicz < > > > >>>>>>> dwysakow...@apache.org> > > > >>>>>>> > > wrote: > > > >>>>>>> > > > > > >>>>>>> > > > Hey Piotr and Becket, > > > >>>>>>> > > > > > > >>>>>>> > > > First of all, let me say I am happy with whichever option > > is > > > >>>>>>> agreed in > > > >>>>>>> > > the > > > >>>>>>> > > > discussion. > > > >>>>>>> > > > > > > >>>>>>> > > > I wanted to clarify a few points from the discussion > > though: > > > >>>>>>> > > > > > > >>>>>>> > > > @Becket: > > > >>>>>>> > > > > > > >>>>>>> > > > The main argument for adding the methods to the > > SourceReader > > > >>>>>>> is that > > > >>>>>>> > > these > > > >>>>>>> > > > methods are effectively NON-OPTIONAL to SourceReader > impl, > > > i.e. > > > >>>>>>> > starting > > > >>>>>>> > > > from this FLIP, all the SourceReaders impl are expected > to > > > >>>>>>> support this > > > >>>>>>> > > > method, although some old implementations may not have > > > >>>>>>> implemented this > > > >>>>>>> > > > feature. I think we should distinguish the new features > > from > > > >>>>>>> the > > > >>>>>>> > optional > > > >>>>>>> > > > features. While the public decorative interface is a > > solution > > > >>>>>>> to the > > > >>>>>>> > > > optional features. We should not use it for the features > > that > > > >>>>>>> are > > > >>>>>>> > > > non-optional. > > > >>>>>>> > > > > > > >>>>>>> > > > I don't think that this feature is NON-OPTIONAL. Even > > though > > > >>>>>>> > preferred, I > > > >>>>>>> > > > still think it can be simply optional. > > > >>>>>>> > > > > > > >>>>>>> > > > the Java doc of the SupportXXX() method in the Source > would > > > be > > > >>>>>>> the > > > >>>>>>> > single > > > >>>>>>> > > > source of truth regarding how to implement this feature. > > > >>>>>>> > > > > > > >>>>>>> > > > I also don't find it entirely true. Half of the classes > are > > > >>>>>>> > theoretically > > > >>>>>>> > > > optional and are utility classes from the point of view > how > > > the > > > >>>>>>> > > interfaces > > > >>>>>>> > > > are organized. Theoretically users do not need to use any > > of > > > >>>>>>> > > > SourceReaderBase & SplitReader. Would be weird to list > > their > > > >>>>>>> methods in > > > >>>>>>> > > the > > > >>>>>>> > > > Source interface. > > > >>>>>>> > > > > > > >>>>>>> > > > @Piotr > > > >>>>>>> > > > > > > >>>>>>> > > > If we have all of the methods with default implementation > > in > > > >>>>>>> the base > > > >>>>>>> > > > interface, the API doesn't give any clue to the user > which > > > set > > > >>>>>>> of > > > >>>>>>> > methods > > > >>>>>>> > > > are required to be implemented at the same time. > > > >>>>>>> > > > > > > >>>>>>> > > > I feel that no matter which option we choose this can not > > be > > > >>>>>>> solved > > > >>>>>>> > > > entirely in either of the options, because of the point > > above > > > >>>>>>> and > > > >>>>>>> > because > > > >>>>>>> > > > the signature of SplitReader#pauseOrResumeSplits and > > > >>>>>>> > > > SourceReader#pauseOrResumeSplits are slightly different > > (one > > > >>>>>>> identifies > > > >>>>>>> > > > splits with splitId the other one passes the splits > > > directly). > > > >>>>>>> > > > > > > >>>>>>> > > > Best, > > > >>>>>>> > > > > > > >>>>>>> > > > Dawid > > > >>>>>>> > > > On 03/05/2022 14:30, Becket Qin wrote: > > > >>>>>>> > > > > > > >>>>>>> > > > Hi Piotr, > > > >>>>>>> > > > > > > >>>>>>> > > > Thanks for the comment. > > > >>>>>>> > > > > > > >>>>>>> > > > Just to clarify, I am not against the decorative > > interfaces, > > > >>>>>>> but I do > > > >>>>>>> > > > think we should use them with caution. The main argument > > for > > > >>>>>>> adding the > > > >>>>>>> > > > methods to the SourceReader is that these methods are > > > >>>>>>> > > > effectively NON-OPTIONAL to SourceReader impl, i.e. > > starting > > > >>>>>>> from this > > > >>>>>>> > > > FLIP, all the SourceReaders impl are expected to support > > this > > > >>>>>>> > > > method, although some old implementations may not have > > > >>>>>>> implemented this > > > >>>>>>> > > > feature. I think we should distinguish the new features > > from > > > >>>>>>> the > > > >>>>>>> > optional > > > >>>>>>> > > > features. While the public decorative interface is a > > solution > > > >>>>>>> to the > > > >>>>>>> > > > optional features. We should not use it for the features > > that > > > >>>>>>> are > > > >>>>>>> > > > non-optional. > > > >>>>>>> > > > > > > >>>>>>> > > > That said, this feature is optional for SplitReaders. > > > Arguably > > > >>>>>>> we can > > > >>>>>>> > > have > > > >>>>>>> > > > a decorative interface for that, but for simplicity and > > > >>>>>>> symmetry of the > > > >>>>>>> > > > interface, personally I prefer just adding a new method. > > > >>>>>>> > > > > > > >>>>>>> > > > Regarding the advantages you mentioned about the > decorative > > > >>>>>>> interfaces, > > > >>>>>>> > > > they would make sense if: > > > >>>>>>> > > > 1. The feature is optional. > > > >>>>>>> > > > 2. There is only one decorative interface involved for a > > > >>>>>>> feature. > > > >>>>>>> > > > Otherwise the argument that all the methods are grouped > > > >>>>>>> together will > > > >>>>>>> > not > > > >>>>>>> > > > stand. > > > >>>>>>> > > > > > > >>>>>>> > > > Compared with that, I think the current solution works > fine > > > in > > > >>>>>>> all > > > >>>>>>> > cases, > > > >>>>>>> > > > i.e. "having supportXXX() method in Source, and default > > > >>>>>>> methods / > > > >>>>>>> > > > decorative interfaces in base interfaces.". > > > >>>>>>> > > > > > > >>>>>>> > > > The advantages are: > > > >>>>>>> > > >> - clean and easy to implement base interface > > > >>>>>>> > > > > > > >>>>>>> > > > In the current approach, the Java doc of the SupportXXX() > > > >>>>>>> method in the > > > >>>>>>> > > > Source would be the single source of truth regarding how > to > > > >>>>>>> implement > > > >>>>>>> > > this > > > >>>>>>> > > > feature. It lists the method that has to be implemented > to > > > >>>>>>> support this > > > >>>>>>> > > > feature, regardless of how many classes / interfaces are > > > >>>>>>> involved. > > > >>>>>>> > > > > > > >>>>>>> > > > When implementing the base interface, users do not need > to > > > >>>>>>> implement a > > > >>>>>>> > > > method with default implementation. If they are curious > > what > > > >>>>>>> the method > > > >>>>>>> > > is > > > >>>>>>> > > > for, the java doc of that method simply points users to > the > > > >>>>>>> > SupportXXX() > > > >>>>>>> > > > method in the Source. It seems not adding work to the > users > > > >>>>>>> compared > > > >>>>>>> > with > > > >>>>>>> > > > decorative interfaces, but gives much better > > discoverability. > > > >>>>>>> > > > > > > >>>>>>> > > > - all of the methods from a single feature are grouped > in a > > > >>>>>>> single > > > >>>>>>> > > >> decorator interface, together with their dedicated java > > doc. > > > >>>>>>> It's also > > > >>>>>>> > > >> easier to google search for help using the decorator > name > > > >>>>>>> > > > > > > >>>>>>> > > > - if an optional feature requires two methods to be > > > >>>>>>> implemented at > > > >>>>>>> > once, > > > >>>>>>> > > >> decorator can guarantee that > > > >>>>>>> > > > > > > >>>>>>> > > > These two points are not true when multiple components > and > > > >>>>>>> classes are > > > >>>>>>> > > > involved collaboratively to provide a feature. In our > case, > > > we > > > >>>>>>> have > > > >>>>>>> > both > > > >>>>>>> > > > SourceReader and SplitReader involved. And there might be > > > other > > > >>>>>>> > > interfaces > > > >>>>>>> > > > on the JM side involved for some future features. So the > > > >>>>>>> relevant > > > >>>>>>> > methods > > > >>>>>>> > > > can actually be scattered over the places. That said, we > > may > > > >>>>>>> still use > > > >>>>>>> > > > decorative interfaces for each component, if the feature > is > > > >>>>>>> optional, > > > >>>>>>> > > given > > > >>>>>>> > > > there is a single source of truth for the feature. > > > >>>>>>> > > > > > > >>>>>>> > > > Here I would strongly lean towards making life easier for > > new > > > >>>>>>> users, > > > >>>>>>> > > >> lowering the entry barrier, at the (imo) slight expense > > for > > > >>>>>>> the power > > > >>>>>>> > > >> users. > > > >>>>>>> > > > > > > >>>>>>> > > > I actually think the current approach is simpler, more > > > >>>>>>> extensible and > > > >>>>>>> > > more > > > >>>>>>> > > > general for all the users. Can you articulate a bit more > on > > > >>>>>>> which part > > > >>>>>>> > > you > > > >>>>>>> > > > think makes users harder to understand? > > > >>>>>>> > > > > > > >>>>>>> > > > > > > >>>>>>> > > > There is another benefit of the decorative interfaces > which > > > is > > > >>>>>>> not > > > >>>>>>> > > > mentioned, but might be worth considering here. Usually > the > > > >>>>>>> decorative > > > >>>>>>> > > > interfaces give slightly better backwards compatibility > > than > > > >>>>>>> the new > > > >>>>>>> > > > default method in the interfaces. That is when users are > > > using > > > >>>>>>> a jar > > > >>>>>>> > that > > > >>>>>>> > > > was compiled with an older version of Flink which does > not > > > >>>>>>> have the > > > >>>>>>> > > default > > > >>>>>>> > > > method in the interfaces in question. A decorative > > interface > > > >>>>>>> may still > > > >>>>>>> > > > provide backwards compatibility in that case, while > default > > > >>>>>>> method impl > > > >>>>>>> > > > cannot. > > > >>>>>>> > > > > > > >>>>>>> > > > I think in Flink we in general do not guarantee custom > > > >>>>>>> components > > > >>>>>>> > > compiled > > > >>>>>>> > > > with an older version can run with a newer version of > > Flink. > > > A > > > >>>>>>> > recompile > > > >>>>>>> > > > with a newer version would be required. That said, if we > do > > > >>>>>>> care about > > > >>>>>>> > > > this, we can just change the "supportXXX()" method in the > > > >>>>>>> Source > > > >>>>>>> > > interface > > > >>>>>>> > > > to use decorative interfaces, and leave the other parts > > > >>>>>>> unchanged. > > > >>>>>>> > > > > > > >>>>>>> > > > Thanks, > > > >>>>>>> > > > > > > >>>>>>> > > > Jiangjie (Becket) Qin > > > >>>>>>> > > > > > > >>>>>>> > > > > > > >>>>>>> > > > > > > >>>>>>> > > > > > > >>>>>>> > > > On Tue, May 3, 2022 at 6:25 PM Piotr Nowojski < > > > >>>>>>> pnowoj...@apache.org> > > > >>>>>>> > > > wrote: > > > >>>>>>> > > > > > > >>>>>>> > > >> Hi, > > > >>>>>>> > > >> > > > >>>>>>> > > >> Sorry for chipping in so late, but I was OoO for the > last > > > two > > > >>>>>>> weeks. > > > >>>>>>> > > >> Regarding the interfaces, I would be actually against > > adding > > > >>>>>>> those > > > >>>>>>> > > methods > > > >>>>>>> > > >> to the base interfaces for the reasons mentioned above. > > > >>>>>>> Clogging the > > > >>>>>>> > > base > > > >>>>>>> > > >> interface for new users with tons of methods that they > do > > > not > > > >>>>>>> need, do > > > >>>>>>> > > not > > > >>>>>>> > > >> understand and do not know what to do with them. > Moreover, > > > >>>>>>> such > > > >>>>>>> > > decorative > > > >>>>>>> > > >> interfaces are solving a problem if a feature requires > two > > > or > > > >>>>>>> more > > > >>>>>>> > > methods > > > >>>>>>> > > >> to be implemented at the same time. If we have all of > the > > > >>>>>>> methods with > > > >>>>>>> > > >> default implementation in the base interface, the API > > > doesn't > > > >>>>>>> give any > > > >>>>>>> > > >> clue > > > >>>>>>> > > >> to the user which set of methods are required to be > > > >>>>>>> implemented at the > > > >>>>>>> > > >> same > > > >>>>>>> > > >> time. > > > >>>>>>> > > >> > > > >>>>>>> > > >> > a) I feel the biggest drawback of decorative > interfaces > > is > > > >>>>>>> which > > > >>>>>>> > > >> interface > > > >>>>>>> > > >> > they can decorate and which combinations of multiple > > > >>>>>>> decorative > > > >>>>>>> > > >> interfaces > > > >>>>>>> > > >> > are valid. (...) > > > >>>>>>> > > >> > In the future, if there is a new feature added > > > >>>>>>> > > >> > (e.g. sorted or pre-partitioned data aware), are we > > going > > > >>>>>>> to create > > > >>>>>>> > > >> another > > > >>>>>>> > > >> > interface of SplitReader such as SortedSplitReader or > > > >>>>>>> > > >> PrePartitionedAware? > > > >>>>>>> > > >> > Can they be combined? So I think the additional > > decorative > > > >>>>>>> interface > > > >>>>>>> > > >> like > > > >>>>>>> > > >> > withSplitsAlignment actually increases the > understanding > > > >>>>>>> cost of > > > >>>>>>> > users > > > >>>>>>> > > >> > because they have to know what decorative interfaces > are > > > >>>>>>> there, > > > >>>>>>> > which > > > >>>>>>> > > >> > interface they can decorate and which combinations of > > the > > > >>>>>>> decorative > > > >>>>>>> > > >> > interfaces are valid and which are not. Ideally we > want > > to > > > >>>>>>> avoid > > > >>>>>>> > that. > > > >>>>>>> > > >> > > > >>>>>>> > > >> I'm not sure if I understand how embedding default > methods > > > in > > > >>>>>>> the base > > > >>>>>>> > > >> interface is solving the problem: what can be combined > or > > > >>>>>>> not? If > > > >>>>>>> > there > > > >>>>>>> > > >> are > > > >>>>>>> > > >> two conflicting features, having decorative interfaces > > that > > > >>>>>>> can not be > > > >>>>>>> > > >> mixed together actually makes much more sense to me > rather > > > >>>>>>> than having > > > >>>>>>> > > >> them > > > >>>>>>> > > >> all in one base class. How would you allow users to > > > implement > > > >>>>>>> only one > > > >>>>>>> > > of > > > >>>>>>> > > >> those two features? > > > >>>>>>> > > >> > > > >>>>>>> > > >> To reiterate on the issue. Yes, there are drawbacks: > > > >>>>>>> > > >> - how can a user discover what decorators are there? > > > >>>>>>> > > >> - how can a user know where the decorator can be applied > > to? > > > >>>>>>> > > >> > > > >>>>>>> > > >> However those are drawbacks for more power users, that > can > > > be > > > >>>>>>> > mitigated > > > >>>>>>> > > by > > > >>>>>>> > > >> the documentation. For example listing all of the > > decorators > > > >>>>>>> with > > > >>>>>>> > > >> detailed explanation both in the docs and in the java > > docs. > > > >>>>>>> More > > > >>>>>>> > > >> experienced users will be able to deal with those issues > > > >>>>>>> easier, as > > > >>>>>>> > they > > > >>>>>>> > > >> will already have some basic understanding of Flink. > Also > > if > > > >>>>>>> user has > > > >>>>>>> > a > > > >>>>>>> > > >> problem that he wants to solve, he will google search a > > > >>>>>>> potential > > > >>>>>>> > > solution > > > >>>>>>> > > >> to his problem anyway, and while doing that he is very > > > likely > > > >>>>>>> to > > > >>>>>>> > > discover > > > >>>>>>> > > >> the decorator that he needs anyway in the docs. > > > >>>>>>> > > >> > > > >>>>>>> > > >> The advantages are: > > > >>>>>>> > > >> - clean and easy to implement base interface > > > >>>>>>> > > >> - all of the methods from a single feature are grouped > in > > a > > > >>>>>>> single > > > >>>>>>> > > >> decorator interface, together with their dedicated java > > doc. > > > >>>>>>> It's also > > > >>>>>>> > > >> easier to google search for help using the decorator > name > > > >>>>>>> > > >> - if an optional feature requires two methods to be > > > >>>>>>> implemented at > > > >>>>>>> > once, > > > >>>>>>> > > >> decorator can guarantee that > > > >>>>>>> > > >> > > > >>>>>>> > > >> Here I would strongly lean towards making life easier > for > > > new > > > >>>>>>> users, > > > >>>>>>> > > >> lowering the entry barrier, at the (imo) slight expense > > for > > > >>>>>>> the power > > > >>>>>>> > > >> users. > > > >>>>>>> > > >> > > > >>>>>>> > > >> Best, > > > >>>>>>> > > >> Piotrek > > > >>>>>>> > > >> > > > >>>>>>> > > >> > > > >>>>>>> > > >> wt., 26 kwi 2022 o 15:32 Becket Qin < > becket....@gmail.com > > > > > > >>>>>>> > napisał(a): > > > >>>>>>> > > >> > > > >>>>>>> > > >> > Thanks for the reply Sebastian and Dawid, > > > >>>>>>> > > >> > > > > >>>>>>> > > >> > I think Sebastion has a good summary. This is a really > > > >>>>>>> helpful > > > >>>>>>> > > >> discussion. > > > >>>>>>> > > >> > > > > >>>>>>> > > >> > Thinking a bit more, I feel that it might still be > > better > > > >>>>>>> to add the > > > >>>>>>> > > >> > supportsXXX() method in the Source rather than > > > SourceReader. > > > >>>>>>> > > >> > > > > >>>>>>> > > >> > Generally speaking, what we are trying to do here is > to > > > let > > > >>>>>>> the > > > >>>>>>> > Flink > > > >>>>>>> > > >> > framework know what the Source is capable of. In this > > > FLIP, > > > >>>>>>> it > > > >>>>>>> > happens > > > >>>>>>> > > >> to > > > >>>>>>> > > >> > be the capability that only involves SourceReader. But > > in > > > >>>>>>> the > > > >>>>>>> > future, > > > >>>>>>> > > >> it is > > > >>>>>>> > > >> > possible that another functionality involves both the > > > >>>>>>> > SplitEnumerator > > > >>>>>>> > > >> and > > > >>>>>>> > > >> > SourceReader. In that case, following the current > > > approach, > > > >>>>>>> we > > > >>>>>>> > should > > > >>>>>>> > > >> put > > > >>>>>>> > > >> > the "supportsXXX()" method in both SplitEnumerator and > > > >>>>>>> SourceReader. > > > >>>>>>> > > >> > Because if we only put this in the SourceReader, then > > the > > > >>>>>>> JM would > > > >>>>>>> > > have > > > >>>>>>> > > >> to > > > >>>>>>> > > >> > create a SourceReader in order to know whether this > > > feature > > > >>>>>>> is > > > >>>>>>> > > >> supported, > > > >>>>>>> > > >> > which is a little ugly. But if we put the > > "supportsXXX()" > > > >>>>>>> method in > > > >>>>>>> > > the > > > >>>>>>> > > >> > Source, we will break the "symmetric" design because > > this > > > >>>>>>> FLIP > > > >>>>>>> > chose a > > > >>>>>>> > > >> > different way. > > > >>>>>>> > > >> > > > > >>>>>>> > > >> > This is also why I think supportsXXX() method seems a > > good > > > >>>>>>> thing to > > > >>>>>>> > > >> have, > > > >>>>>>> > > >> > because when there are a few interfaces / methods that > > are > > > >>>>>>> expected > > > >>>>>>> > to > > > >>>>>>> > > >> be > > > >>>>>>> > > >> > implemented at the same time in order to deliver a > > > feature, > > > >>>>>>> it is > > > >>>>>>> > > always > > > >>>>>>> > > >> > good to have a single source of truth to tell the > > > framework > > > >>>>>>> what to > > > >>>>>>> > > do, > > > >>>>>>> > > >> so > > > >>>>>>> > > >> > the framework can do consistent things in different > > parts. > > > >>>>>>> > > >> > > > > >>>>>>> > > >> > @Sebastian Mattheis <sebast...@ververica.com> > > > >>>>>>> > > >> > > > > >>>>>>> > > >> > Regarding interface flavor b), i.e. > AlignedSourceReader > > + > > > >>>>>>> > > >> > AlignedSplitReader, what I feel awkward about is that > we > > > are > > > >>>>>>> > > essentially > > > >>>>>>> > > >> > expecting almost all the SourceReader implementations > to > > > >>>>>>> extend > > > >>>>>>> > > >> > SourceReaderBase, which effectively makes the > > SourceReader > > > >>>>>>> interface > > > >>>>>>> > > >> > without the pausing support useless. So this indicates > > > that > > > >>>>>>> public > > > >>>>>>> > > >> > decorative interfaces (or sub-interfaces for the same > > > >>>>>>> purpose) only > > > >>>>>>> > > >> > make sense if the original interface is also expected > to > > > be > > > >>>>>>> used. > > > >>>>>>> > > >> > Otherwise, it seems makes more sense to add the method > > to > > > >>>>>>> the > > > >>>>>>> > original > > > >>>>>>> > > >> > interface itself. > > > >>>>>>> > > >> > > > > >>>>>>> > > >> > Cheers, > > > >>>>>>> > > >> > > > > >>>>>>> > > >> > Jiangjie (Becket) Qin > > > >>>>>>> > > >> > > > > >>>>>>> > > >> > > > > >>>>>>> > > >> > > > > >>>>>>> > > >> > > > > >>>>>>> > > >> > On Tue, Apr 26, 2022 at 6:05 PM Dawid Wysakowicz < > > > >>>>>>> > > >> dwysakow...@apache.org> > > > >>>>>>> > > >> > wrote: > > > >>>>>>> > > >> > > > > >>>>>>> > > >> > > Thanks @Sebastian for the nice summary. > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > I think most of your points aligned with the > > suggestions > > > >>>>>>> I made to > > > >>>>>>> > > the > > > >>>>>>> > > >> > > FLIP, while you were writing your reply (I believe > we > > > hit > > > >>>>>>> enter > > > >>>>>>> > > >> nearly at > > > >>>>>>> > > >> > > the same time ;) ) > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > Two points after we synced offline > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > 1. I changed also the > supportsWatermarksSplitAlignment > > > to > > > >>>>>>> > > >> > > supportsPausingSplits to express the general > > capability > > > of > > > >>>>>>> > pausing. > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > 2. As for if we should > > > >>>>>>> PausingSourceReader/PausingSplitReader > > > >>>>>>> > > (option > > > >>>>>>> > > >> b) > > > >>>>>>> > > >> > > or if we should just add the methods (option c), I > > > >>>>>>> suggest to > > > >>>>>>> > simply > > > >>>>>>> > > >> add > > > >>>>>>> > > >> > > the two methods as I felt this is much preferred > > > approach > > > >>>>>>> Becket, > > > >>>>>>> > > >> which > > > >>>>>>> > > >> > > others do not object. Unless there is an opposition > > > let's > > > >>>>>>> go with > > > >>>>>>> > > this > > > >>>>>>> > > >> > > option c. > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > Best, > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > Dawid > > > >>>>>>> > > >> > > On 26/04/2022 10:06, Sebastian Mattheis wrote: > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > Hi folks, > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > Sorry for being a bit silent. Many thanks for all > the > > > >>>>>>> input and > > > >>>>>>> > > >> > > suggestions. As I'm a bit new, I needed some time to > > > >>>>>>> catch up and > > > >>>>>>> > > >> > structure > > > >>>>>>> > > >> > > (for myself) the discussion and I wanted to find a > way > > > to > > > >>>>>>> > structure > > > >>>>>>> > > >> the > > > >>>>>>> > > >> > > conclusions. (Also because I had the feeling that > some > > > >>>>>>> concerns > > > >>>>>>> > got > > > >>>>>>> > > >> lost > > > >>>>>>> > > >> > in > > > >>>>>>> > > >> > > the discussion.) This is my attempt and please > correct > > > me > > > >>>>>>> if > > > >>>>>>> > > >> something is > > > >>>>>>> > > >> > > wrong or misunderstood. I tried to collect and > > assemble > > > >>>>>>> the > > > >>>>>>> > > opinions, > > > >>>>>>> > > >> > > suggestions, and conclusions (to the best of my > > > >>>>>>> knowledge): > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > # Top A: Should split alignment (pause/resume > > behavior) > > > >>>>>>> be a > > > >>>>>>> > general > > > >>>>>>> > > >> > > capability? > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > I personally don't see any reason no to have it a > > > general > > > >>>>>>> > capability > > > >>>>>>> > > >> > > because for the alignSplit method it is actually > > > >>>>>>> independent of > > > >>>>>>> > the > > > >>>>>>> > > >> > > watermarks. If we agree here to have it a general > > > >>>>>>> capability, we > > > >>>>>>> > > >> should > > > >>>>>>> > > >> > > also agree on the right wording. Does > > > >>>>>>> "alignSplits(splitsToResume, > > > >>>>>>> > > >> > > splitsToPause)" refer to what is then actually > meant? > > (I > > > >>>>>>> see it as > > > >>>>>>> > > >> okay. > > > >>>>>>> > > >> > I > > > >>>>>>> > > >> > > don't have any better idea whilst Arvid suggested > > > >>>>>>> > > >> "pauseOrResumeSplits".) > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > # Top B: Should it be possible do enable/disable > split > > > >>>>>>> alignment? > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > I would personally not disable the split alignment > on > > > the > > > >>>>>>> source > > > >>>>>>> > > >> reader > > > >>>>>>> > > >> > > side because if split alignment is used for some > other > > > >>>>>>> use case > > > >>>>>>> > (see > > > >>>>>>> > > >> A) > > > >>>>>>> > > >> > it > > > >>>>>>> > > >> > > could have nasty side effects on other/future use > > cases. > > > >>>>>>> Instead, > > > >>>>>>> > I > > > >>>>>>> > > >> would > > > >>>>>>> > > >> > > disable "watermark split alignment" where I think it > > > >>>>>>> should > > > >>>>>>> > disable > > > >>>>>>> > > >> the > > > >>>>>>> > > >> > > watermark-dependent trigger for split alignment. > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > # Top C: Should we add a supportsX method? > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > I find it difficult to define the scope of a > supportsX > > > >>>>>>> method > > > >>>>>>> > w.r.t. > > > >>>>>>> > > >> to > > > >>>>>>> > > >> > > the following questions: a) Where is it used? and b) > > > What > > > >>>>>>> is the > > > >>>>>>> > > >> expected > > > >>>>>>> > > >> > > output? To b), it's not straight-forward to provide > a > > > >>>>>>> meaningful > > > >>>>>>> > > >> output, > > > >>>>>>> > > >> > > e.g., if SourceReader supports split alignment but > > > >>>>>>> SplitReader > > > >>>>>>> > not. > > > >>>>>>> > > >> This > > > >>>>>>> > > >> > is > > > >>>>>>> > > >> > > because with the current implementation, we can > > > determine > > > >>>>>>> whether > > > >>>>>>> > > >> split > > > >>>>>>> > > >> > > alignment is fully supported only during runtime and > > > >>>>>>> specifically > > > >>>>>>> > > >> > actually > > > >>>>>>> > > >> > > only when calling alignSplits down the call > hierarchy > > up > > > >>>>>>> to the > > > >>>>>>> > > actual > > > >>>>>>> > > >> > > SplitReaders. > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > Therefore, I would suggest to either raise an error > or > > > >>>>>>> warning if > > > >>>>>>> > > the > > > >>>>>>> > > >> > > alignment is called but not supported at some > point. I > > > >>>>>>> know we > > > >>>>>>> > > should > > > >>>>>>> > > >> > > carefully think about when this could be the case > > > because > > > >>>>>>> we don't > > > >>>>>>> > > >> want > > > >>>>>>> > > >> > to > > > >>>>>>> > > >> > > flood anybody with such warnings. However, warnings > > > could > > > >>>>>>> be an > > > >>>>>>> > > >> indicator > > > >>>>>>> > > >> > > for the user that for watermark split alignment use > > case > > > >>>>>>> split > > > >>>>>>> > > >> reading is > > > >>>>>>> > > >> > > imbalanced with the conclusion to either disable the > > > >>>>>>> trigger for > > > >>>>>>> > > >> > watermark > > > >>>>>>> > > >> > > split alignment (see Top B) or to use/implement a > > source > > > >>>>>>> and > > > >>>>>>> > reader > > > >>>>>>> > > >> that > > > >>>>>>> > > >> > > fully supports split alignment. > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > # Top D: How to design interfaces? > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > Thanks for structuring the discussion with the the > > > various > > > >>>>>>> > > >> possibilities > > > >>>>>>> > > >> > > (a-d). From the discussion and emails, I would like > to > > > >>>>>>> summarize > > > >>>>>>> > the > > > >>>>>>> > > >> > > following requirements: > > > >>>>>>> > > >> > > - Interfaces should be consistent ("symmetric"), > i.e., > > > >>>>>>> similar > > > >>>>>>> > > >> semantics > > > >>>>>>> > > >> > > should have similar interfaces with similar usage. > > > >>>>>>> > > >> > > - Make explicit which implementations implement > > > >>>>>>> interfaces/support > > > >>>>>>> > > >> > > behavior. > > > >>>>>>> > > >> > > - Make clear what are default implementations and > how > > to > > > >>>>>>> implement > > > >>>>>>> > > >> > > interfaces with desired behavior. > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > This is a simplified view of the relations between > > > >>>>>>> relevant > > > >>>>>>> > classes > > > >>>>>>> > > of > > > >>>>>>> > > >> > the > > > >>>>>>> > > >> > > PoC implementation: > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > SourceReader (Public) <|-- SourceReaderBase > (Internal) > > > >>>>>>> <|-- .. > > > >>>>>>> > <|-- > > > >>>>>>> > > >> > > MySourceReader > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > MySourceReader <>-- SplitFetcherManager (Internal) > > <>-- > > > >>>>>>> > SplitFetcher > > > >>>>>>> > > >> > > (Internal) <>-- SplitReader (Public) <|-- > > MySplitReader > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > (A <|-- B: B inherits from A; A <>-- B: A "has a" B) > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > Note that SourceReaderBase and SplitFetcherManager > > > >>>>>>> implement most > > > >>>>>>> > of > > > >>>>>>> > > >> the > > > >>>>>>> > > >> > > "logic" for split alignment just because we wanted > to > > > >>>>>>> implement > > > >>>>>>> > > split > > > >>>>>>> > > >> > > alignment and wanted it to be available as kind of a > > > >>>>>>> default. As a > > > >>>>>>> > > >> > > consequence, we have a "default implementation" for > > > >>>>>>> SourceReader > > > >>>>>>> > > that > > > >>>>>>> > > >> > > implements the actual logic for split alignment. For > > > that > > > >>>>>>> reason, > > > >>>>>>> > I > > > >>>>>>> > > >> find > > > >>>>>>> > > >> > it > > > >>>>>>> > > >> > > very confusing to have a NOOP default implementation > > in > > > >>>>>>> the > > > >>>>>>> > > interface > > > >>>>>>> > > >> for > > > >>>>>>> > > >> > > the SourceReader. As a consequence, interface > strategy > > > c) > > > >>>>>>> is > > > >>>>>>> > > difficult > > > >>>>>>> > > >> > > because this would require NOOP default > > implementations > > > >>>>>>> in the > > > >>>>>>> > > public > > > >>>>>>> > > >> > > interfaces of SourceReader and SplitReader. This is > > the > > > >>>>>>> same for > > > >>>>>>> > > >> strategy > > > >>>>>>> > > >> > > d) because it would require NOOP default > > implementation > > > >>>>>>> in the > > > >>>>>>> > > >> > > SourceReader. Further, as Dawid described method > > > >>>>>>> signatures of > > > >>>>>>> > > >> alignSplit > > > >>>>>>> > > >> > > for SourceReader and SplitReader differ and it would > > be > > > >>>>>>> extremely > > > >>>>>>> > > >> > difficult > > > >>>>>>> > > >> > > to make the signatures the same (with even potential > > > >>>>>>> performance > > > >>>>>>> > > >> impact > > > >>>>>>> > > >> > > because of additional loop-ups of split ids). > > Therefore, > > > >>>>>>> having a > > > >>>>>>> > > >> > symmetric > > > >>>>>>> > > >> > > decorative interface as of strategy a) is actually > not > > > >>>>>>> possible > > > >>>>>>> > and > > > >>>>>>> > > >> > having > > > >>>>>>> > > >> > > two decorative interfaces with different method > > > >>>>>>> signatures is > > > >>>>>>> > > >> confusing. > > > >>>>>>> > > >> > My > > > >>>>>>> > > >> > > conclusion is that we are best with strategy b) > which > > > >>>>>>> means to > > > >>>>>>> > have > > > >>>>>>> > > >> > > specializing sub-interfaces that inherit from the > > parent > > > >>>>>>> > interface: > > > >>>>>>> > > >> > > SourceReader <|-- AlignedSourceReader, SplitReader > > <|-- > > > >>>>>>> > > >> > AlignedSplitReader > > > >>>>>>> > > >> > > With this option, I'm not 100% sure what the > > > implications > > > >>>>>>> are and > > > >>>>>>> > if > > > >>>>>>> > > >> this > > > >>>>>>> > > >> > > could get nasty. I would suggest that Dawid and I > just > > > >>>>>>> try to > > > >>>>>>> > > >> implement > > > >>>>>>> > > >> > and > > > >>>>>>> > > >> > > see if we like it. :) > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > # Summary > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > In conclusion, please let me know your perspectives. > > > >>>>>>> Please > > > >>>>>>> > correct > > > >>>>>>> > > >> me, > > > >>>>>>> > > >> > if > > > >>>>>>> > > >> > > something is wrong or if I misunderstood something. > My > > > >>>>>>> perspective > > > >>>>>>> > > >> would > > > >>>>>>> > > >> > be: > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > Top A: Yes > > > >>>>>>> > > >> > > Top B: Yes (but disable watermark trigger for split > > > >>>>>>> alignment) > > > >>>>>>> > > >> > > Top C: No > > > >>>>>>> > > >> > > Top D: b) > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > Best, > > > >>>>>>> > > >> > > Sebastian > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > On Tue, Apr 26, 2022 at 9:55 AM Dawid Wysakowicz < > > > >>>>>>> > > >> dwysakow...@apache.org > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > wrote: > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > >> @Arvid: > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> While I also like Becket's capability approach, I > > fear > > > >>>>>>> that it > > > >>>>>>> > > >> doesn't > > > >>>>>>> > > >> > work > > > >>>>>>> > > >> > >> for this particular use case: Sources can always be > > > >>>>>>> aligned > > > >>>>>>> > > >> cross-task > > > >>>>>>> > > >> > and > > > >>>>>>> > > >> > >> this is just about intra-task alignment. So it's > > > >>>>>>> plausible to put > > > >>>>>>> > > >> > sources > > > >>>>>>> > > >> > >> into an alignment group even though they do not use > > any > > > >>>>>>> of the > > > >>>>>>> > > >> presented > > > >>>>>>> > > >> > >> API of FLIP-217. They should just issue a warning, > if > > > >>>>>>> they handle > > > >>>>>>> > > >> > multiple > > > >>>>>>> > > >> > >> splits (see motivation section). > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Yes, but the "supportXXX" method would be for > telling > > > if > > > >>>>>>> it > > > >>>>>>> > > supports > > > >>>>>>> > > >> > that > > > >>>>>>> > > >> > >> intra-task alignment. Cross-task alignment would > > always > > > >>>>>>> be > > > >>>>>>> > > supported. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> I updated interfaces to what I believe to be > closest > > > to a > > > >>>>>>> > consensus > > > >>>>>>> > > >> > >> between all participants. Do you mind taking a > look? > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> @Sebastian Do you mind addressing the nits? > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Best, > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Dawid > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> On 25/04/2022 13:39, Arvid Heise wrote: > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Thanks for pushing this effort. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> I'd actually be in favor of 1b). I fully agree that > > > >>>>>>> decorator > > > >>>>>>> > > >> interfaces > > > >>>>>>> > > >> > >> should be avoided but I'm also not a big fan of > > > >>>>>>> overloading the > > > >>>>>>> > > base > > > >>>>>>> > > >> > >> interfaces (they are hard to implement as is). The > > > usual > > > >>>>>>> feedback > > > >>>>>>> > > to > > > >>>>>>> > > >> > >> Source-related interfaces are always that they are > > > >>>>>>> overwhelming > > > >>>>>>> > and > > > >>>>>>> > > >> too > > > >>>>>>> > > >> > >> hard to implement. However, I'd also not oppose 1c) > > as > > > >>>>>>> scattered > > > >>>>>>> > > >> > interfaces > > > >>>>>>> > > >> > >> also have drawbacks. I'd just dislike 1a) and 1d). > > > >>>>>>> > > >> > >> While I also like Becket's capability approach, I > > fear > > > >>>>>>> that it > > > >>>>>>> > > >> doesn't > > > >>>>>>> > > >> > work > > > >>>>>>> > > >> > >> for this particular use case: Sources can always be > > > >>>>>>> aligned > > > >>>>>>> > > >> cross-task > > > >>>>>>> > > >> > and > > > >>>>>>> > > >> > >> this is just about intra-task alignment. So it's > > > >>>>>>> plausible to put > > > >>>>>>> > > >> > sources > > > >>>>>>> > > >> > >> into an alignment group even though they do not use > > any > > > >>>>>>> of the > > > >>>>>>> > > >> presented > > > >>>>>>> > > >> > >> API of FLIP-217. They should just issue a warning, > if > > > >>>>>>> they handle > > > >>>>>>> > > >> > multiple > > > >>>>>>> > > >> > >> splits (see motivation section). > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> I think renaming alignSplits to facilitate future > use > > > >>>>>>> cases makes > > > >>>>>>> > > >> sense > > > >>>>>>> > > >> > but > > > >>>>>>> > > >> > >> then all interfaces (if 1c) is chosen) should be > > > adjusted > > > >>>>>>> > > >> accordingly. > > > >>>>>>> > > >> > >> AlignedSourceReader could be PausingSourceReader > and > > > I'd > > > >>>>>>> go for > > > >>>>>>> > > >> > >> pauseOrResumeSplits (Becket's proposal afaik). We > > could > > > >>>>>>> also > > > >>>>>>> > split > > > >>>>>>> > > it > > > >>>>>>> > > >> > into > > > >>>>>>> > > >> > >> pauseSplit and resumeSplit. While > pauseOrResumeSplits > > > >>>>>>> may allow > > > >>>>>>> > > >> Sources > > > >>>>>>> > > >> > to > > > >>>>>>> > > >> > >> just use 1 instead of 2 library calls (as written > in > > > the > > > >>>>>>> > Javadoc), > > > >>>>>>> > > >> both > > > >>>>>>> > > >> > >> Kafka and Pulsar can't use it and I'm not sure if > > there > > > >>>>>>> is a > > > >>>>>>> > system > > > >>>>>>> > > >> that > > > >>>>>>> > > >> > >> can. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Some nit for the FLIP: > > > >>>>>>> > > >> > >> - Please replace "stop" with "pause". > > > >>>>>>> > > >> > >> - Not sure if it's worth it in the capability > > section: > > > >>>>>>> Sources > > > >>>>>>> > that > > > >>>>>>> > > >> > adopt > > > >>>>>>> > > >> > >> this interface cannot be used in earlier versions. > So > > > it > > > >>>>>>> feels > > > >>>>>>> > like > > > >>>>>>> > > >> we > > > >>>>>>> > > >> > are > > > >>>>>>> > > >> > >> only forward compatible (old sources can be used > > after > > > >>>>>>> the > > > >>>>>>> > change); > > > >>>>>>> > > >> but > > > >>>>>>> > > >> > I > > > >>>>>>> > > >> > >> guess this holds for any API addition. > > > >>>>>>> > > >> > >> - You might want to add what happens when all > splits > > > are > > > >>>>>>> paused. > > > >>>>>>> > > >> > >> - You may want to describe how the 3 flavors of > > > >>>>>>> SourceReaderBase > > > >>>>>>> > > >> > interact > > > >>>>>>> > > >> > >> with the interface. > > > >>>>>>> > > >> > >> - I'm not sure if it makes sense to include Kafka > and > > > >>>>>>> Pulsar in > > > >>>>>>> > the > > > >>>>>>> > > >> > FLIP. > > > >>>>>>> > > >> > >> For me, this is rather immediate follow-up work. > > (could > > > >>>>>>> be in the > > > >>>>>>> > > >> same > > > >>>>>>> > > >> > >> umbrella ticket) > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Best, > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Arvid > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> On Mon, Apr 25, 2022 at 12:52 PM Dawid Wysakowicz < > > > >>>>>>> > > >> > dwysakow...@apache.org> <dwysakow...@apache.org> > > > >>>>>>> > > >> > >> wrote: > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> a) "MySourceReader implements SourceReader, > > > >>>>>>> WithSplitsAlignment", > > > >>>>>>> > > >> along > > > >>>>>>> > > >> > >> with "MySplitReader implements SplitReader, > > > >>>>>>> WithSplitsAlignment", > > > >>>>>>> > > or > > > >>>>>>> > > >> > >> b) "MySourceReader implements AlignedSourceReader" > > and > > > >>>>>>> > > "MySplitReader > > > >>>>>>> > > >> > >> implements AlignedSplitReader", or > > > >>>>>>> > > >> > >> c) "MySourceReader implements SourceReader" and > > > >>>>>>> "MySplitReader > > > >>>>>>> > > >> > implements > > > >>>>>>> > > >> > >> SplitReader". > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> I think the latest proposal according to Dawid > would > > > be: > > > >>>>>>> > > >> > >> d) "MySourceReader implements SourceReader" and > > > >>>>>>> "MySplitReader > > > >>>>>>> > > >> > implements > > > >>>>>>> > > >> > >> AlignedSplitReader". > > > >>>>>>> > > >> > >> I am fine with this API, although personally > > speaking I > > > >>>>>>> think it > > > >>>>>>> > is > > > >>>>>>> > > >> > simpler > > > >>>>>>> > > >> > >> to just add a new method to the split reader with > > > >>>>>>> default impl. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> I think that is a good idea to have it aligned as > > much > > > as > > > >>>>>>> > possible. > > > >>>>>>> > > >> I'd > > > >>>>>>> > > >> > be > > > >>>>>>> > > >> > >> +1 for your option c). We can merge > > AlignedSplitReader > > > >>>>>>> with > > > >>>>>>> > > >> > SplitReader. We > > > >>>>>>> > > >> > >> will update the FLIP shortly. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Best, > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Dawid > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> On 25/04/2022 12:43, Becket Qin wrote: > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Thanks for the comment, Jark. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> 3. Interface/Method Name. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Can the interface be used to align other things in > > the > > > >>>>>>> future? > > > >>>>>>> > For > > > >>>>>>> > > >> > example, > > > >>>>>>> > > >> > >> align read speed, I have > > > >>>>>>> > > >> > >> seen users requesting global rate limits. This > > feature > > > >>>>>>> may also > > > >>>>>>> > > need > > > >>>>>>> > > >> an > > > >>>>>>> > > >> > >> interface like this. > > > >>>>>>> > > >> > >> If we don't plan to extend this interface to > support > > > >>>>>>> align other > > > >>>>>>> > > >> > things, I > > > >>>>>>> > > >> > >> suggest explicitly declaring > > > >>>>>>> > > >> > >> the purpose of the methods, such as > > > >>>>>>> `alignWatermarksForSplits` > > > >>>>>>> > > >> instead > > > >>>>>>> > > >> > of > > > >>>>>>> > > >> > >> `alignSplits`. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> This is a good point. Naming wise, it would usually > > be > > > >>>>>>> more > > > >>>>>>> > > >> extensible > > > >>>>>>> > > >> > to > > > >>>>>>> > > >> > >> just describe what the method actually does, > instead > > of > > > >>>>>>> assuming > > > >>>>>>> > > the > > > >>>>>>> > > >> > >> purpose of doing this. For example, in this case, > > > >>>>>>> > > >> pauseOrResumeSplits() > > > >>>>>>> > > >> > >> would be more extensible because this can be used > for > > > >>>>>>> any kind of > > > >>>>>>> > > >> flow > > > >>>>>>> > > >> > >> control, be it watermark alignment or simple rate > > > >>>>>>> limiting. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> 4. Interface or Method. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> I don't have a strong opinion on this. I think they > > > have > > > >>>>>>> their > > > >>>>>>> > own > > > >>>>>>> > > >> > >> advantages. > > > >>>>>>> > > >> > >> In Flink SQL, we heavily use Interfaces for > extending > > > >>>>>>> abilities > > > >>>>>>> > > >> > >> (SupportsXxxx) for TableSource/TableSink, > > > >>>>>>> > > >> > >> and I prefer Interfaces rather than methods in this > > > >>>>>>> case. When > > > >>>>>>> > you > > > >>>>>>> > > >> have > > > >>>>>>> > > >> > a > > > >>>>>>> > > >> > >> bunch of abilities and each ability > > > >>>>>>> > > >> > >> has more than one method, Interfaces can help to > > > >>>>>>> organize them > > > >>>>>>> > and > > > >>>>>>> > > >> make > > > >>>>>>> > > >> > >> users clear which methods > > > >>>>>>> > > >> > >> need to implement when you want to have an ability. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> I am OK with decorative interfaces if this is a > > general > > > >>>>>>> design > > > >>>>>>> > > >> pattern > > > >>>>>>> > > >> > in > > > >>>>>>> > > >> > >> the other components in Flink. But it looks like > the > > > >>>>>>> current API > > > >>>>>>> > > >> > proposal > > > >>>>>>> > > >> > >> is not symmetric. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> The current proposal is essentially "MySourceReader > > > >>>>>>> implements > > > >>>>>>> > > >> > >> SourceReader, WithSplitsAlignment", along with > > > >>>>>>> "MySplitReader > > > >>>>>>> > > >> implements > > > >>>>>>> > > >> > >> AlignedSplitsReader". > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Should we make the API symmetric? I'd consider any > > one > > > >>>>>>> of the > > > >>>>>>> > > >> following > > > >>>>>>> > > >> > as > > > >>>>>>> > > >> > >> symmetric. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> a) "MySourceReader implements SourceReader, > > > >>>>>>> WithSplitsAlignment", > > > >>>>>>> > > >> along > > > >>>>>>> > > >> > >> with "MySplitReader implements SplitReader, > > > >>>>>>> WithSplitsAlignment", > > > >>>>>>> > > or > > > >>>>>>> > > >> > >> b) "MySourceReader implements AlignedSourceReader" > > and > > > >>>>>>> > > "MySplitReader > > > >>>>>>> > > >> > >> implements AlignedSplitReader", or > > > >>>>>>> > > >> > >> c) "MySourceReader implements SourceReader" and > > > >>>>>>> "MySplitReader > > > >>>>>>> > > >> > implements > > > >>>>>>> > > >> > >> SplitReader". > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> I think the latest proposal according to Dawid > would > > > be: > > > >>>>>>> > > >> > >> d) "MySourceReader implements SourceReader" and > > > >>>>>>> "MySplitReader > > > >>>>>>> > > >> > implements > > > >>>>>>> > > >> > >> AlignedSplitReader". > > > >>>>>>> > > >> > >> I am fine with this API, although personally > > speaking I > > > >>>>>>> think it > > > >>>>>>> > is > > > >>>>>>> > > >> > simpler > > > >>>>>>> > > >> > >> to just add a new method to the split reader with > > > >>>>>>> default impl. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> @Dawid Wysakowicz <dwysakow...@apache.org> < > > > >>>>>>> > dwysakow...@apache.org > > > >>>>>>> > > > > > > >>>>>>> > > >> < > > > >>>>>>> > > >> > dwysakow...@apache.org> <dwysakow...@apache.org>, > > thanks > > > >>>>>>> for the > > > >>>>>>> > > reply. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Having said that, as I don't have a preference and > I > > > >>>>>>> agree most > > > >>>>>>> > of > > > >>>>>>> > > >> the > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> sources will support the alignment I am fine > > following > > > >>>>>>> your > > > >>>>>>> > > >> suggestion > > > >>>>>>> > > >> > to > > > >>>>>>> > > >> > >> have the SourceReader extending from > > > >>>>>>> > WithWatermarksSplitsAlignment, > > > >>>>>>> > > >> but > > > >>>>>>> > > >> > >> would put the "supportsXXX" there, not in the > Source > > to > > > >>>>>>> keep the > > > >>>>>>> > > two > > > >>>>>>> > > >> > >> methods together. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> One benefit of having the "supportsXXX" in Source > is > > > >>>>>>> that this > > > >>>>>>> > > allows > > > >>>>>>> > > >> > some > > > >>>>>>> > > >> > >> compile time check. For example, if a user enabled > > > >>>>>>> watermark > > > >>>>>>> > > >> alignment > > > >>>>>>> > > >> > >> while it is not supported by the Source, an > exception > > > >>>>>>> can be > > > >>>>>>> > thrown > > > >>>>>>> > > >> at > > > >>>>>>> > > >> > >> compile time. It seems in general useful. That > said, > > I > > > >>>>>>> agree that > > > >>>>>>> > > API > > > >>>>>>> > > >> > >> cleanliness wise it is better to put the two > methods > > > >>>>>>> together. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Thanks, > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Jiangjie (Becket) Qin > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> On Mon, Apr 25, 2022 at 5:56 PM Jark Wu < > > > >>>>>>> imj...@gmail.com> < > > > >>>>>>> > > >> > imj...@gmail.com> <imj...@gmail.com> < > imj...@gmail.com> > > > >>>>>>> wrote: > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Thank Dawid for the reminder on FLIP-182. Sorry I > did > > > >>>>>>> miss it. > > > >>>>>>> > > >> > >> I don't have other concerns then. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Best, > > > >>>>>>> > > >> > >> Jark > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> On Mon, 25 Apr 2022 at 15:40, Dawid Wysakowicz < > > > >>>>>>> > > >> dwysakow...@apache.org> > > > >>>>>>> > > >> > <dwysakow...@apache.org> <dwysakow...@apache.org> < > > > >>>>>>> > > >> dwysakow...@apache.org> > > > >>>>>>> > > >> > >> wrote: > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> @Jark: > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> 1. Will the framework always align with watermarks > > when > > > >>>>>>> the > > > >>>>>>> > source > > > >>>>>>> > > >> > >> implements the interface? > > > >>>>>>> > > >> > >> I'm afraid not every case needs watermark alignment > > > even > > > >>>>>>> if Kafka > > > >>>>>>> > > >> > >> implements the interface, > > > >>>>>>> > > >> > >> and this will affect the throughput somehow. I > agree > > > >>>>>>> with Becket > > > >>>>>>> > > >> > >> we may need a > > > >>>>>>> > > >> > >> `supportSplitsAlignment()` method for users to > > > configure > > > >>>>>>> the > > > >>>>>>> > source > > > >>>>>>> > > >> to > > > >>>>>>> > > >> > >> enable/disable the alignment. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> 2. How does the framework calculate > > > maxDesiredWatermark? > > > >>>>>>> > > >> > >> I think the algorithm of maxDesiredWatermark will > > > >>>>>>> greatly affect > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> throughput > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> if the reader is constantly > > > >>>>>>> > > >> > >> switching between pause and resume. Can users > > > configure > > > >>>>>>> the > > > >>>>>>> > > >> alignment > > > >>>>>>> > > >> > >> offset? > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> This is covered in the previous FLIP[1] which has > > been > > > >>>>>>> already > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> implemented > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> in 1.15. In short, it must be enabled with the > > > watermark > > > >>>>>>> strategy > > > >>>>>>> > > >> which > > > >>>>>>> > > >> > >> also configures drift and update interval. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> If we don't plan to extend this interface to > support > > > >>>>>>> align other > > > >>>>>>> > > >> things, > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> I > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> suggest explicitly declaring > > > >>>>>>> > > >> > >> the purpose of the methods, such as > > > >>>>>>> `alignWatermarksForSplits` > > > >>>>>>> > > >> instead > > > >>>>>>> > > >> > of > > > >>>>>>> > > >> > >> `alignSplits`. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Sure let's rename it. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> @Becket: > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> I understand your point. On the other hand putting > > all > > > >>>>>>> methods, > > > >>>>>>> > > even > > > >>>>>>> > > >> > with > > > >>>>>>> > > >> > >> "supportsXXX" methods for enabling certain > features, > > > >>>>>>> makes the > > > >>>>>>> > > entry > > > >>>>>>> > > >> > >> threshold for writing a new source higher. Instead > of > > > >>>>>>> focusing on > > > >>>>>>> > > the > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> basic > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> and required properties of the Source, the person > > > >>>>>>> implementing a > > > >>>>>>> > > >> source > > > >>>>>>> > > >> > >> must bother with and need to figure out what all of > > the > > > >>>>>>> extra > > > >>>>>>> > > >> features > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> are > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> about and how to deal with them. It makes it also > > > harder > > > >>>>>>> to > > > >>>>>>> > > organize > > > >>>>>>> > > >> > >> methods in coupled groups as Jark said. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Having said that, as I don't have a preference and > I > > > >>>>>>> agree most > > > >>>>>>> > of > > > >>>>>>> > > >> the > > > >>>>>>> > > >> > >> sources will support the alignment I am fine > > following > > > >>>>>>> your > > > >>>>>>> > > >> suggestion > > > >>>>>>> > > >> > to > > > >>>>>>> > > >> > >> have the SourceReader extending from > > > >>>>>>> > WithWatermarksSplitsAlignment, > > > >>>>>>> > > >> but > > > >>>>>>> > > >> > >> would put the "supportsXXX" there, not in the > Source > > to > > > >>>>>>> keep the > > > >>>>>>> > > two > > > >>>>>>> > > >> > >> methods together. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Lastly, I agree it is really unfortunate the > > > >>>>>>> "alignSplits" > > > >>>>>>> > methods > > > >>>>>>> > > >> > differ > > > >>>>>>> > > >> > >> slightly for SourceReader and SpitReader. The > reason > > > for > > > >>>>>>> that is > > > >>>>>>> > > >> > >> SourceReaderBase deals only with SplitIds, whereas > > > >>>>>>> SplitReader > > > >>>>>>> > > needs > > > >>>>>>> > > >> the > > > >>>>>>> > > >> > >> actual splits to pause them. I found the > discrepancy > > > >>>>>>> acceptable > > > >>>>>>> > for > > > >>>>>>> > > >> the > > > >>>>>>> > > >> > >> sake of simplifying changes significantly, > especially > > > as > > > >>>>>>> they > > > >>>>>>> > would > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> highly > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> likely impact performance as we would have to > perform > > > >>>>>>> additional > > > >>>>>>> > > >> > lookups. > > > >>>>>>> > > >> > >> Moreover the SplitReader is a secondary interface. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Best, > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Dawid > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> [1] https://cwiki.apache.org/confluence/x/hQYBCw > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> On 24/04/2022 17:15, Jark Wu wrote: > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Thanks for the effort, Dawid and Sebastian! > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> I just have some minor questions (maybe I missed > > > >>>>>>> something). > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> 1. Will the framework always align with watermarks > > when > > > >>>>>>> the > > > >>>>>>> > source > > > >>>>>>> > > >> > >> implements the interface? > > > >>>>>>> > > >> > >> I'm afraid not every case needs watermark alignment > > > even > > > >>>>>>> if Kafka > > > >>>>>>> > > >> > >> implements the interface, > > > >>>>>>> > > >> > >> and this will affect the throughput somehow. I > agree > > > >>>>>>> with Becket > > > >>>>>>> > > >> > >> we may need a > > > >>>>>>> > > >> > >> `supportSplitsAlignment()` method for users to > > > configure > > > >>>>>>> the > > > >>>>>>> > source > > > >>>>>>> > > >> to > > > >>>>>>> > > >> > >> enable/disable the alignment. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> 2. How does the framework calculate > > > maxDesiredWatermark? > > > >>>>>>> > > >> > >> I think the algorithm of maxDesiredWatermark will > > > >>>>>>> greatly affect > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> throughput > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> if the reader is constantly > > > >>>>>>> > > >> > >> switching between pause and resume. Can users > > > configure > > > >>>>>>> the > > > >>>>>>> > > >> alignment > > > >>>>>>> > > >> > >> offset? > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> 3. Interface/Method Name. > > > >>>>>>> > > >> > >> Can the interface be used to align other things in > > the > > > >>>>>>> future? > > > >>>>>>> > For > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> example, > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> align read speed, I have > > > >>>>>>> > > >> > >> seen users requesting global rate limits. This > > feature > > > >>>>>>> may also > > > >>>>>>> > > need > > > >>>>>>> > > >> an > > > >>>>>>> > > >> > >> interface like this. > > > >>>>>>> > > >> > >> If we don't plan to extend this interface to > support > > > >>>>>>> align other > > > >>>>>>> > > >> things, > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> I > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> suggest explicitly declaring > > > >>>>>>> > > >> > >> the purpose of the methods, such as > > > >>>>>>> `alignWatermarksForSplits` > > > >>>>>>> > > >> instead > > > >>>>>>> > > >> > of > > > >>>>>>> > > >> > >> `alignSplits`. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> 4. Interface or Method. > > > >>>>>>> > > >> > >> I don't have a strong opinion on this. I think they > > > have > > > >>>>>>> their > > > >>>>>>> > own > > > >>>>>>> > > >> > >> advantages. > > > >>>>>>> > > >> > >> In Flink SQL, we heavily use Interfaces for > extending > > > >>>>>>> abilities > > > >>>>>>> > > >> > >> (SupportsXxxx) for TableSource/TableSink, > > > >>>>>>> > > >> > >> and I prefer Interfaces rather than methods in this > > > >>>>>>> case. When > > > >>>>>>> > you > > > >>>>>>> > > >> have > > > >>>>>>> > > >> > a > > > >>>>>>> > > >> > >> bunch of abilities and each ability > > > >>>>>>> > > >> > >> has more than one method, Interfaces can help to > > > >>>>>>> organize them > > > >>>>>>> > and > > > >>>>>>> > > >> make > > > >>>>>>> > > >> > >> users clear which methods > > > >>>>>>> > > >> > >> need to implement when you want to have an ability. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Best, > > > >>>>>>> > > >> > >> Jark > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> On Sun, 24 Apr 2022 at 18:13, Becket Qin < > > > >>>>>>> becket....@gmail.com> > > > >>>>>>> > < > > > >>>>>>> > > >> > becket....@gmail.com> <becket....@gmail.com> < > > > >>>>>>> becket....@gmail.com> > > > >>>>>>> > < > > > >>>>>>> > > >> > >> becket....@gmail.com> wrote: > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Hi Dawid, > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Thanks for the explanation. Apologies that I > somehow > > > >>>>>>> misread a > > > >>>>>>> > > bunch > > > >>>>>>> > > >> of > > > >>>>>>> > > >> > >> "align" and thought they were "assign". > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Regarding 1, by default implementation, I was > > thinking > > > >>>>>>> of the > > > >>>>>>> > > default > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> no-op > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> implementation. I am a little worried about the > > > >>>>>>> proliferation of > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> decorative > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> interfaces. I think the most important thing about > > > >>>>>>> interfaces is > > > >>>>>>> > > that > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> they > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> are easy to understand. In this case, I prefer > adding > > > >>>>>>> new method > > > >>>>>>> > to > > > >>>>>>> > > >> the > > > >>>>>>> > > >> > >> existing interface for the following reasons: > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> a) I feel the biggest drawback of decorative > > interfaces > > > >>>>>>> is which > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> interface > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> they can decorate and which combinations of > multiple > > > >>>>>>> decorative > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> interfaces > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> are valid. In the current FLIP, the > > withSplitsAlignment > > > >>>>>>> interface > > > >>>>>>> > > is > > > >>>>>>> > > >> > only > > > >>>>>>> > > >> > >> applicable to the SourceReader which means it can't > > > >>>>>>> decorate any > > > >>>>>>> > > >> other > > > >>>>>>> > > >> > >> interface. From an interface design perspective, a > > > >>>>>>> natural > > > >>>>>>> > question > > > >>>>>>> > > >> is > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> why > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> not let "AlignedSplitReader" extend > > > >>>>>>> "withSplitsAlignment"? And it > > > >>>>>>> > > is > > > >>>>>>> > > >> > also > > > >>>>>>> > > >> > >> natural to assume that a split reader implementing > > both > > > >>>>>>> > SplitReader > > > >>>>>>> > > >> and > > > >>>>>>> > > >> > >> WithSplitAlignment would work, because a source > > reader > > > >>>>>>> > implementing > > > >>>>>>> > > >> > >> SourceReader and withSplitsAlignment works. So why > > > isn't > > > >>>>>>> there an > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> interface > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> of AlignedSourceReader? In the future, if there is > a > > > new > > > >>>>>>> feature > > > >>>>>>> > > >> added > > > >>>>>>> > > >> > >> (e.g. sorted or pre-partitioned data aware), are we > > > >>>>>>> going to > > > >>>>>>> > create > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> another > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> interface of SplitReader such as SortedSplitReader > or > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> PrePartitionedAware? > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Can they be combined? So I think the additional > > > >>>>>>> decorative > > > >>>>>>> > > interface > > > >>>>>>> > > >> > like > > > >>>>>>> > > >> > >> withSplitsAlignment actually increases the > > > understanding > > > >>>>>>> cost of > > > >>>>>>> > > >> users > > > >>>>>>> > > >> > >> because they have to know what decorative > interfaces > > > are > > > >>>>>>> there, > > > >>>>>>> > > which > > > >>>>>>> > > >> > >> interface they can decorate and which combinations > of > > > the > > > >>>>>>> > > decorative > > > >>>>>>> > > >> > >> interfaces are valid and which are not. Ideally we > > want > > > >>>>>>> to avoid > > > >>>>>>> > > >> that. > > > >>>>>>> > > >> > To > > > >>>>>>> > > >> > >> be clear, I am not opposing having an interface of > > > >>>>>>> > > >> withSplitsAlignment, > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> it > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> is completely OK to have it as an internal > interface > > > and > > > >>>>>>> let > > > >>>>>>> > > >> > SourceReader > > > >>>>>>> > > >> > >> and SplitReader both extend it. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> b) Adding a new method to the SourceReader with a > > > default > > > >>>>>>> > > >> implementation > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> of > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> no-op would help avoid logic branching in the > source > > > >>>>>>> logic, > > > >>>>>>> > > >> especially > > > >>>>>>> > > >> > >> given that we agree that the vast majority of the > > > >>>>>>> SourceReader > > > >>>>>>> > > >> > >> implementations, if not all, would just extend from > > the > > > >>>>>>> > > >> > SourceReaderBase. > > > >>>>>>> > > >> > >> That means adding a new method to the interface > would > > > >>>>>>> effectively > > > >>>>>>> > > >> give > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> the > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> same user experience, but simpler. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> c) A related design principle that may be worth > > > >>>>>>> discussing is how > > > >>>>>>> > > do > > > >>>>>>> > > >> we > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> let > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> the Source implementations tell Flink what > capability > > > is > > > >>>>>>> > supported > > > >>>>>>> > > >> and > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> what > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> is not. Personally speaking I feel the most > intuitive > > > >>>>>>> place to me > > > >>>>>>> > > is > > > >>>>>>> > > >> in > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> the > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Source itself, because that is the entrance of the > > > >>>>>>> entire Source > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> connector > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> logic. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Based on the above thoughts, I am wondering if the > > > >>>>>>> following > > > >>>>>>> > > >> interface > > > >>>>>>> > > >> > >> would be easier to understand by the users. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> - Change "withSplitsAlignment" to internal > interface, > > > >>>>>>> let both > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> SourceReader > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> and SplitReader extend from it, with a default > no-op > > > >>>>>>> > > implementation. > > > >>>>>>> > > >> > >> - Add a new method "boolean > supportSplitsAlignment()" > > > to > > > >>>>>>> the > > > >>>>>>> > Source > > > >>>>>>> > > >> > >> interface, with a default implementation returning > > > >>>>>>> false. Sources > > > >>>>>>> > > >> that > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> have > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> implemented the alignment logic can change this to > > > >>>>>>> return true, > > > >>>>>>> > and > > > >>>>>>> > > >> > >> override the alignSplits() methods in the > > SourceReader > > > / > > > >>>>>>> > > SplitReader > > > >>>>>>> > > >> if > > > >>>>>>> > > >> > >> needed. > > > >>>>>>> > > >> > >> - In the future, if a new optional feature is going > > to > > > >>>>>>> be added > > > >>>>>>> > to > > > >>>>>>> > > >> the > > > >>>>>>> > > >> > >> Source, and that feature requires the awareness > from > > > >>>>>>> Flink, we > > > >>>>>>> > can > > > >>>>>>> > > >> add > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> more > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> such methods to the Source. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> What do you think? > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Thanks, > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Jiangjie (Becket) Qin > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> On Fri, Apr 22, 2022 at 4:05 PM Dawid Wysakowicz < > > > >>>>>>> > > >> > dwysakow...@apache.org> <dwysakow...@apache.org> < > > > >>>>>>> > > >> dwysakow...@apache.org> > > > >>>>>>> > > >> > <dwysakow...@apache.org> > > > >>>>>>> > > >> > >> <dwysakow...@apache.org> <dwysakow...@apache.org> > < > > > >>>>>>> > > >> > dwysakow...@apache.org> <dwysakow...@apache.org> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> wrote: > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> @Konstantin: > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> As part of this FLIP, the `AlignedSplitReader` > > > interface > > > >>>>>>> (aka the > > > >>>>>>> > > >> stop & > > > >>>>>>> > > >> > >> resume behavior) will be implemented for Kafka and > > > >>>>>>> Pulsar only, > > > >>>>>>> > > >> correct? > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Correct, as far as I know though, those are the > only > > > >>>>>>> sources > > > >>>>>>> > which > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> consume > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> concurrently from multiple splits and thus > alignment > > > >>>>>>> applies. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> @Thomas: > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> I wonder if "supporting" split alignment in > > > >>>>>>> SourceReaderBase and > > > >>>>>>> > > then > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> doing > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> nothing if the split reader does not implement > > > >>>>>>> AlignedSplitReader > > > >>>>>>> > > >> could > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> be > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> misleading? Perhaps WithSplitsAlignment can instead > > be > > > >>>>>>> added to > > > >>>>>>> > the > > > >>>>>>> > > >> > >> specific source reader (i.e. KafkaSourceReader) to > > make > > > >>>>>>> it > > > >>>>>>> > explicit > > > >>>>>>> > > >> that > > > >>>>>>> > > >> > >> the source actually supports it. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> I understand your concern. Hmm, I think we could > > > >>>>>>> actually do > > > >>>>>>> > that. > > > >>>>>>> > > >> Given > > > >>>>>>> > > >> > >> the actual implementation of the > > > >>>>>>> SourceReaderBase#alignSplits is > > > >>>>>>> > > >> rather > > > >>>>>>> > > >> > >> short (just a forward to the corresponding method > of > > > >>>>>>> > SplitFetcher), > > > >>>>>>> > > >> we > > > >>>>>>> > > >> > >> could reimplement it in the actual source > > > >>>>>>> implementations. This > > > >>>>>>> > > >> solution > > > >>>>>>> > > >> > >> has the downside though. Authors of new sources > would > > > >>>>>>> have to do > > > >>>>>>> > > two > > > >>>>>>> > > >> > >> things: extend from AlignedSplitReader and > implement > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> WithSplitsAssignment, > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> instead of just extending AlignedSplitReader. I > would > > > be > > > >>>>>>> fine > > > >>>>>>> > with > > > >>>>>>> > > >> such > > > >>>>>>> > > >> > a > > > >>>>>>> > > >> > >> tradeoff though. What others think? > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> @Steven: > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> For this part from the motivation section, is it > > > >>>>>>> accurate? Let's > > > >>>>>>> > > >> assume > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> one > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> source task consumes from 3 partitions and one of > the > > > >>>>>>> partition > > > >>>>>>> > is > > > >>>>>>> > > >> > >> significantly slower. In this situation, watermark > > for > > > >>>>>>> this > > > >>>>>>> > source > > > >>>>>>> > > >> task > > > >>>>>>> > > >> > >> won't hold back as it is reading recent data from > > other > > > >>>>>>> two Kafka > > > >>>>>>> > > >> > >> partitions. As a result, it won't hold back the > > overall > > > >>>>>>> > watermark. > > > >>>>>>> > > I > > > >>>>>>> > > >> > >> thought the problem is that we may have late data > for > > > >>>>>>> this slow > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> partition. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> It will hold back the watermark. Watermark of an > > > >>>>>>> operator is the > > > >>>>>>> > > >> minimum > > > >>>>>>> > > >> > >> of watermarks of all splits[1] > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> I have another question about the restart. Say > split > > > >>>>>>> alignment is > > > >>>>>>> > > >> > >> triggered. checkpoint is completed. job failed and > > > >>>>>>> restored from > > > >>>>>>> > > the > > > >>>>>>> > > >> > last > > > >>>>>>> > > >> > >> checkpoint. because alignment decision is not > > > >>>>>>> checkpointed, > > > >>>>>>> > > initially > > > >>>>>>> > > >> > >> alignment won't be enforced until we get a cycle of > > > >>>>>>> watermark > > > >>>>>>> > > >> > aggregation > > > >>>>>>> > > >> > >> and propagation, right? Not saying this corner is a > > > >>>>>>> problem. Just > > > >>>>>>> > > >> want > > > >>>>>>> > > >> > to > > > >>>>>>> > > >> > >> understand it more. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Your understanding is correct. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> @Becket: > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> 1. I think watermark alignment is sort of a general > > use > > > >>>>>>> case, so > > > >>>>>>> > > >> should > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> we > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> just add the related methods to SourceReader > directly > > > >>>>>>> instead of > > > >>>>>>> > > >> > >> introducing the new interface of > WithSplitAssignment? > > > We > > > >>>>>>> can > > > >>>>>>> > > provide > > > >>>>>>> > > >> > >> default implementations, so backwards compatibility > > > >>>>>>> won't be an > > > >>>>>>> > > >> issue. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> I don't think we can provide a default > > implementation. > > > >>>>>>> How would > > > >>>>>>> > we > > > >>>>>>> > > >> do > > > >>>>>>> > > >> > >> that? Would it be just a no-op? Is it better than > > > having > > > >>>>>>> an > > > >>>>>>> > opt-in > > > >>>>>>> > > >> > >> interface? The default implementation would have to > > be > > > >>>>>>> added > > > >>>>>>> > > >> exclusively > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> in > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> a *Public* SourceReader interface. By the way > notice > > > >>>>>>> > > SourceReaderBase > > > >>>>>>> > > >> > >> does extend from WithSplitsAlignment, so > effectively > > > all > > > >>>>>>> > > >> implementations > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> do > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> handle the alignment case. To be honest I think it > is > > > >>>>>>> impossible > > > >>>>>>> > to > > > >>>>>>> > > >> > >> implement the SourceReader interface directly by > end > > > >>>>>>> users. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> 2. As you mentioned, the SplitReader interface > > probably > > > >>>>>>> also > > > >>>>>>> > needs > > > >>>>>>> > > >> some > > > >>>>>>> > > >> > >> change to support throttling at the split > > granularity. > > > >>>>>>> Can you > > > >>>>>>> > add > > > >>>>>>> > > >> that > > > >>>>>>> > > >> > >> interface change into the public interface section > as > > > >>>>>>> well? > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> It has been added from the beginning. See > > > >>>>>>> *AlignedSplitReader.* > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> 3. Nit, can we avoid using the method name > > assignSplits > > > >>>>>>> here, > > > >>>>>>> > given > > > >>>>>>> > > >> that > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> it > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> is not actually changing the split assignments? It > > > seems > > > >>>>>>> > something > > > >>>>>>> > > >> like > > > >>>>>>> > > >> > >> pauseOrResumeSplits(), or adjustSplitsThrottling() > is > > > >>>>>>> more > > > >>>>>>> > > accurate. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> The method's called *alignSplits*, not assign. Do > you > > > >>>>>>> still > > > >>>>>>> > prefer > > > >>>>>>> > > a > > > >>>>>>> > > >> > >> different name for that? Personally, I am open for > > > >>>>>>> suggestions > > > >>>>>>> > > here. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Best, > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Dawid > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> [1] > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > > > > >>>>>>> > > >> > > > >>>>>>> > > > > > >>>>>>> > > > > >>>>>>> > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/#watermark-generation > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> On 22/04/2022 05:59, Becket Qin wrote: > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Thanks for driving the effort, Sebastion. I think > the > > > >>>>>>> motivation > > > >>>>>>> > > >> makes a > > > >>>>>>> > > >> > >> lot of sense. Just a few suggestions / questions. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> 1. I think watermark alignment is sort of a general > > use > > > >>>>>>> case, so > > > >>>>>>> > > >> should > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> we > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> just add the related methods to SourceReader > directly > > > >>>>>>> instead of > > > >>>>>>> > > >> > >> introducing the new interface of > WithSplitAssignment? > > > We > > > >>>>>>> can > > > >>>>>>> > > provide > > > >>>>>>> > > >> > >> default implementations, so backwards compatibility > > > >>>>>>> won't be an > > > >>>>>>> > > >> issue. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> 2. As you mentioned, the SplitReader interface > > probably > > > >>>>>>> also > > > >>>>>>> > needs > > > >>>>>>> > > >> some > > > >>>>>>> > > >> > >> change to support throttling at the split > > granularity. > > > >>>>>>> Can you > > > >>>>>>> > add > > > >>>>>>> > > >> that > > > >>>>>>> > > >> > >> interface change into the public interface section > as > > > >>>>>>> well? > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> 3. Nit, can we avoid using the method name > > assignSplits > > > >>>>>>> here, > > > >>>>>>> > given > > > >>>>>>> > > >> that > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> it > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> is not actually changing the split assignments? It > > > seems > > > >>>>>>> > something > > > >>>>>>> > > >> like > > > >>>>>>> > > >> > >> pauseOrResumeSplits(), or adjustSplitsThrottling() > is > > > >>>>>>> more > > > >>>>>>> > > accurate. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Thanks, > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Jiangjie (Becket) Qin > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> On Thu, Apr 21, 2022 at 11:39 PM Steven Wu < > > > >>>>>>> stevenz...@gmail.com > > > >>>>>>> > > > > > >>>>>>> > > < > > > >>>>>>> > > >> > stevenz...@gmail.com> <stevenz...@gmail.com> < > > > >>>>>>> stevenz...@gmail.com> > > > >>>>>>> > < > > > >>>>>>> > > >> > >> stevenz...@gmail.com> < > > > >>>>>>> > > >> > >> stevenz...@gmail.com> wrote: > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> However, a single source operator may read data > from > > > >>>>>>> multiple > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> splits/partitions, e.g., multiple Kafka partitions, > > > such > > > >>>>>>> that > > > >>>>>>> > even > > > >>>>>>> > > >> with > > > >>>>>>> > > >> > >> watermark alignment the source operator may need to > > > >>>>>>> buffer > > > >>>>>>> > > excessive > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> amount > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> of data if one split emits data faster than > another. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> For this part from the motivation section, is it > > > >>>>>>> accurate? Let's > > > >>>>>>> > > >> assume > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> one > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> source task consumes from 3 partitions and one of > the > > > >>>>>>> partition > > > >>>>>>> > is > > > >>>>>>> > > >> > >> significantly slower. In this situation, watermark > > for > > > >>>>>>> this > > > >>>>>>> > source > > > >>>>>>> > > >> task > > > >>>>>>> > > >> > >> won't hold back as it is reading recent data from > > other > > > >>>>>>> two Kafka > > > >>>>>>> > > >> > >> partitions. As a result, it won't hold back the > > overall > > > >>>>>>> > watermark. > > > >>>>>>> > > I > > > >>>>>>> > > >> > >> thought the problem is that we may have late data > for > > > >>>>>>> this slow > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> partition. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> I have another question about the restart. Say > split > > > >>>>>>> alignment is > > > >>>>>>> > > >> > >> triggered. checkpoint is completed. job failed and > > > >>>>>>> restored from > > > >>>>>>> > > the > > > >>>>>>> > > >> > last > > > >>>>>>> > > >> > >> checkpoint. because alignment decision is not > > > >>>>>>> checkpointed, > > > >>>>>>> > > initially > > > >>>>>>> > > >> > >> alignment won't be enforced until we get a cycle of > > > >>>>>>> watermark > > > >>>>>>> > > >> > aggregation > > > >>>>>>> > > >> > >> and propagation, right? Not saying this corner is a > > > >>>>>>> problem. Just > > > >>>>>>> > > >> want > > > >>>>>>> > > >> > to > > > >>>>>>> > > >> > >> understand it more. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> On Thu, Apr 21, 2022 at 8:20 AM Thomas Weise < > > > >>>>>>> t...@apache.org> < > > > >>>>>>> > > >> > t...@apache.org> <t...@apache.org> <t...@apache.org> < > > > >>>>>>> > > >> > >> t...@apache.org> < > > > >>>>>>> > > >> > >> t...@apache.org> wrote: > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Thanks for working on this! > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> I wonder if "supporting" split alignment in > > > >>>>>>> SourceReaderBase and > > > >>>>>>> > > then > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> doing > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> nothing if the split reader does not implement > > > >>>>>>> AlignedSplitReader > > > >>>>>>> > > >> could > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> be > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> misleading? Perhaps WithSplitsAlignment can instead > > be > > > >>>>>>> added to > > > >>>>>>> > the > > > >>>>>>> > > >> > >> specific source reader (i.e. KafkaSourceReader) to > > make > > > >>>>>>> it > > > >>>>>>> > explicit > > > >>>>>>> > > >> that > > > >>>>>>> > > >> > >> the source actually supports it. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Thanks, > > > >>>>>>> > > >> > >> Thomas > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> On Thu, Apr 21, 2022 at 4:57 AM Konstantin Knauf < > > > >>>>>>> > > kna...@apache.org> > > > >>>>>>> > > >> < > > > >>>>>>> > > >> > kna...@apache.org> <kna...@apache.org> < > > kna...@apache.org > > > > > > > >>>>>>> < > > > >>>>>>> > > >> > >> kna...@apache.org> < > > > >>>>>>> > > >> > >> kna...@apache.org> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> wrote: > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Hi Sebastian, Hi Dawid, > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> As part of this FLIP, the `AlignedSplitReader` > > > interface > > > >>>>>>> (aka the > > > >>>>>>> > > >> stop > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> & > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> resume behavior) will be implemented for Kafka and > > > >>>>>>> Pulsar only, > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> correct? > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> +1 in general. I believe it is valuable to complete > > the > > > >>>>>>> watermark > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> aligned > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> story with this FLIP. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Cheers, > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Konstantin > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> On Thu, Apr 21, 2022 at 12:36 PM Dawid Wysakowicz < > > > >>>>>>> > > >> > dwysakow...@apache.org> <dwysakow...@apache.org> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> wrote: > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> To be explicit, having worked on it, I support it > ;) > > I > > > >>>>>>> think we > > > >>>>>>> > can > > > >>>>>>> > > >> > >> start a vote thread soonish, as there are no > concerns > > > so > > > >>>>>>> far. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Best, > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Dawid > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> On 13/04/2022 11:27, Sebastian Mattheis wrote: > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Dear Flink developers, > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> I would like to open a discussion on FLIP 217 [1] > for > > > an > > > >>>>>>> > extension > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> of > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Watermark Alignment to perform alignment also in > > > >>>>>>> SplitReaders. To > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> do > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> so, > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> SplitReaders must be able to suspend and resume > > reading > > > >>>>>>> from > > > >>>>>>> > split > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> sources > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> where the SourceOperator coordinates and controlls > > > >>>>>>> suspend and > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> resume. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> To > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> gather information about current watermarks of the > > > >>>>>>> SplitReaders, > > > >>>>>>> > we > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> extend > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> the internal WatermarkOutputMulitplexer and report > > > >>>>>>> watermarks to > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> the > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> SourceOperator. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> There is a PoC for this FLIP [2], prototyped by > Arvid > > > >>>>>>> Heise and > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> revised > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> and > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> reworked by Dawid Wysakowicz (He did most of the > > work.) > > > >>>>>>> and me. > > > >>>>>>> > The > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> changes > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> are backwards compatible in a way that if affected > > > >>>>>>> components do > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> not > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> support split alignment the behavior is as before. > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Best, > > > >>>>>>> > > >> > >> Sebastian > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> [1] > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > > > > >>>>>>> > > >> > > > >>>>>>> > > > > > >>>>>>> > > > > >>>>>>> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-217+Support+watermark+alignment+of+source+splits > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> [2] > > > >>>>>>> https://github.com/dawidwys/flink/tree/aligned-splits > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> -- > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> Konstantin Knaufhttps:// > > > >>>>>>> > > >> twitter.com/snntrablehttps://github.com/knaufk > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > >> > > > >>>>>>> > > >> > > > > >>>>>>> > > >> > > > >>>>>>> > > > > > > >>>>>>> > > > > > >>>>>>> > > > > >>>>>>> > > > >>>>>> > > > > > >