Hi Dawid,

Thanks for the explanation. Apologies that I somehow misread a bunch of
"align" and thought they were "assign".

Regarding 1, by default implementation, I was thinking of the default no-op
implementation. I am a little worried about the proliferation of decorative
interfaces. I think the most important thing about interfaces is that they
are easy to understand. In this case, I prefer adding new method to the
existing interface for the following reasons:

a) I feel the biggest drawback of decorative interfaces is which interface
they can decorate and which combinations of multiple decorative interfaces
are valid. In the current FLIP, the withSplitsAlignment interface is only
applicable to the SourceReader which means it can't decorate any other
interface. From an interface design perspective, a natural question is why
not let "AlignedSplitReader" extend "withSplitsAlignment"? And it is also
natural to assume that a split reader implementing both SplitReader and
WithSplitAlignment would work, because a source reader implementing
SourceReader and withSplitsAlignment works. So why isn't there an interface
of AlignedSourceReader? In the future, if there is a new feature added
(e.g. sorted or pre-partitioned data aware), are we going to create another
interface of SplitReader such as SortedSplitReader or PrePartitionedAware?
Can they be combined? So I think the additional decorative interface like
withSplitsAlignment actually increases the understanding cost of users
because they have to know what decorative interfaces are there, which
interface they can decorate and which combinations of the decorative
interfaces are valid and which are not. Ideally we want to avoid that. To
be clear, I am not opposing having an interface of withSplitsAlignment, it
is completely OK to have it as an internal interface and let SourceReader
and SplitReader both extend it.

b) Adding a new method to the SourceReader with a default implementation of
no-op would help avoid logic branching in the source logic, especially
given that we agree that the vast majority of the SourceReader
implementations, if not all, would just extend from the SourceReaderBase.
That means adding a new method to the interface would effectively give the
same user experience, but simpler.

c) A related design principle that may be worth discussing is how do we let
the Source implementations tell Flink what capability is supported and what
is not. Personally speaking I feel the most intuitive place to me is in the
Source itself, because that is the entrance of the entire Source connector
logic.

Based on the above thoughts, I am wondering if the following interface
would be easier to understand by the users.

- Change "withSplitsAlignment" to internal interface, let both SourceReader
and SplitReader extend from it, with a default no-op implementation.
- Add a new method "boolean supportSplitsAlignment()" to the Source
interface, with a default implementation returning false. Sources that have
implemented the alignment logic can change this to return true, and
override the alignSplits() methods in the SourceReader / SplitReader if
needed.
- In the future, if a new optional feature is going to be added to the
Source, and that feature requires the awareness from Flink, we can add more
such methods to the Source.

What do you think?

Thanks,

Jiangjie (Becket) Qin





On Fri, Apr 22, 2022 at 4:05 PM Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> @Konstantin:
>
> As part of this FLIP, the `AlignedSplitReader` interface (aka the stop &
> resume behavior) will be implemented for Kafka and Pulsar only, correct?
>
> Correct, as far as I know though, those are the only sources which consume
> concurrently from multiple splits and thus alignment applies.
>
> @Thomas:
>
> I wonder if "supporting" split alignment in SourceReaderBase and then doing
> nothing if the split reader does not implement AlignedSplitReader could be
> misleading? Perhaps WithSplitsAlignment can instead be added to the
> specific source reader (i.e. KafkaSourceReader) to make it explicit that
> the source actually supports it.
>
> I understand your concern. Hmm, I think we could actually do that. Given
> the actual implementation of the SourceReaderBase#alignSplits is rather
> short (just a forward to the corresponding method of SplitFetcher), we
> could reimplement it in the actual source implementations. This solution
> has the downside though. Authors of new sources would have to do two
> things: extend from AlignedSplitReader and implement WithSplitsAssignment,
> instead of just extending AlignedSplitReader. I would be fine with such a
> tradeoff though. What others think?
>
> @Steven:
>
> For this part from the motivation section, is it accurate? Let's assume one
> source task consumes from 3 partitions and one of the partition is
> significantly slower. In this situation, watermark for this source task
> won't hold back as it is reading recent data from other two Kafka
> partitions. As a result, it won't hold back the overall watermark. I
> thought the problem is that we may have late data for this slow partition.
>
> It will hold back the watermark. Watermark of an operator is the minimum
> of watermarks of all splits[1]
>
> I have another question about the restart. Say split alignment is
> triggered. checkpoint is completed. job failed and restored from the last
> checkpoint. because alignment decision is not checkpointed, initially
> alignment won't be enforced until we get a cycle of watermark aggregation
> and propagation, right? Not saying this corner is a problem. Just want to
> understand it more.
>
> Your understanding is correct.
>
> @Becket:
>
> 1. I think watermark alignment is sort of a general use case, so should we
> just add the related methods to SourceReader directly instead of
> introducing the new interface of WithSplitAssignment? We can provide
> default implementations, so backwards compatibility won't be an issue.
>
> I don't think we can provide a default implementation. How would we do
> that? Would it be just a no-op? Is it better than having an opt-in
> interface? The default implementation would have to be added exclusively in
> a *Public* SourceReader interface. By the way notice SourceReaderBase
> does extend from WithSplitsAlignment, so effectively all implementations do
> handle the alignment case. To be honest I think it is impossible to
> implement the SourceReader interface directly by end users.
>
> 2. As you mentioned, the SplitReader interface probably also needs some
> change to support throttling at the split granularity. Can you add that
> interface change into the public interface section as well?
>
> It has been added from the beginning. See *AlignedSplitReader.*
>
> 3. Nit, can we avoid using the method name assignSplits here, given that it
> is not actually changing the split assignments? It seems something like
> pauseOrResumeSplits(), or adjustSplitsThrottling() is more accurate.
>
> The method's called *alignSplits*, not assign. Do you still prefer a
> different name for that? Personally, I am open for suggestions here.
>
> Best,
>
> Dawid
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/#watermark-generation
> On 22/04/2022 05:59, Becket Qin wrote:
>
> Thanks for driving the effort, Sebastion. I think the motivation makes a
> lot of sense. Just a few suggestions / questions.
>
> 1. I think watermark alignment is sort of a general use case, so should we
> just add the related methods to SourceReader directly instead of
> introducing the new interface of WithSplitAssignment? We can provide
> default implementations, so backwards compatibility won't be an issue.
>
> 2. As you mentioned, the SplitReader interface probably also needs some
> change to support throttling at the split granularity. Can you add that
> interface change into the public interface section as well?
>
> 3. Nit, can we avoid using the method name assignSplits here, given that it
> is not actually changing the split assignments? It seems something like
> pauseOrResumeSplits(), or adjustSplitsThrottling() is more accurate.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Thu, Apr 21, 2022 at 11:39 PM Steven Wu <stevenz...@gmail.com> 
> <stevenz...@gmail.com> wrote:
>
>
> However, a single source operator may read data from multiple
>
> splits/partitions, e.g., multiple Kafka partitions, such that even with
> watermark alignment the source operator may need to buffer excessive amount
> of data if one split emits data faster than another.
>
> For this part from the motivation section, is it accurate? Let's assume one
> source task consumes from 3 partitions and one of the partition is
> significantly slower. In this situation, watermark for this source task
> won't hold back as it is reading recent data from other two Kafka
> partitions. As a result, it won't hold back the overall watermark. I
> thought the problem is that we may have late data for this slow partition.
>
> I have another question about the restart. Say split alignment is
> triggered. checkpoint is completed. job failed and restored from the last
> checkpoint. because alignment decision is not checkpointed, initially
> alignment won't be enforced until we get a cycle of watermark aggregation
> and propagation, right? Not saying this corner is a problem. Just want to
> understand it more.
>
>
>
> On Thu, Apr 21, 2022 at 8:20 AM Thomas Weise <t...@apache.org> 
> <t...@apache.org> wrote:
>
>
> Thanks for working on this!
>
> I wonder if "supporting" split alignment in SourceReaderBase and then
>
> doing
>
> nothing if the split reader does not implement AlignedSplitReader could
>
> be
>
> misleading? Perhaps WithSplitsAlignment can instead be added to the
> specific source reader (i.e. KafkaSourceReader) to make it explicit that
> the source actually supports it.
>
> Thanks,
> Thomas
>
>
> On Thu, Apr 21, 2022 at 4:57 AM Konstantin Knauf <kna...@apache.org> 
> <kna...@apache.org>
> wrote:
>
>
> Hi Sebastian, Hi Dawid,
>
> As part of this FLIP, the `AlignedSplitReader` interface (aka the stop
>
> &
>
> resume behavior) will be implemented for Kafka and Pulsar only,
>
> correct?
>
> +1 in general. I believe it is valuable to complete the watermark
>
> aligned
>
> story with this FLIP.
>
> Cheers,
>
> Konstantin
>
>
>
>
>
>
>
> On Thu, Apr 21, 2022 at 12:36 PM Dawid Wysakowicz <
>
> dwysakow...@apache.org>
>
> wrote:
>
>
> To be explicit, having worked on it, I support it ;) I think we can
> start a vote thread soonish, as there are no concerns so far.
>
> Best,
>
> Dawid
>
> On 13/04/2022 11:27, Sebastian Mattheis wrote:
>
> Dear Flink developers,
>
> I would like to open a discussion on FLIP 217 [1] for an extension
>
> of
>
> Watermark Alignment to perform alignment also in SplitReaders. To
>
> do
>
> so,
>
> SplitReaders must be able to suspend and resume reading from split
>
> sources
>
> where the SourceOperator coordinates and controlls suspend and
>
> resume.
>
> To
>
> gather information about current watermarks of the SplitReaders, we
>
> extend
>
> the internal WatermarkOutputMulitplexer and report watermarks to
>
> the
>
> SourceOperator.
>
> There is a PoC for this FLIP [2], prototyped by Arvid Heise and
>
> revised
>
> and
>
> reworked by Dawid Wysakowicz (He did most of the work.) and me. The
>
> changes
>
> are backwards compatible in a way that if affected components do
>
> not
>
> support split alignment the behavior is as before.
>
> Best,
> Sebastian
>
> [1]
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-217+Support+watermark+alignment+of+source+splits
>
> [2] https://github.com/dawidwys/flink/tree/aligned-splits
>
> --
>
> Konstantin Knauf
> https://twitter.com/snntrable
> https://github.com/knaufk
>
>

Reply via email to