Thanks for driving the effort, Sebastion. I think the motivation makes a
lot of sense. Just a few suggestions / questions.

1. I think watermark alignment is sort of a general use case, so should we
just add the related methods to SourceReader directly instead of
introducing the new interface of WithSplitAssignment? We can provide
default implementations, so backwards compatibility won't be an issue.

2. As you mentioned, the SplitReader interface probably also needs some
change to support throttling at the split granularity. Can you add that
interface change into the public interface section as well?

3. Nit, can we avoid using the method name assignSplits here, given that it
is not actually changing the split assignments? It seems something like
pauseOrResumeSplits(), or adjustSplitsThrottling() is more accurate.

Thanks,

Jiangjie (Becket) Qin

On Thu, Apr 21, 2022 at 11:39 PM Steven Wu <stevenz...@gmail.com> wrote:

> > However, a single source operator may read data from multiple
> splits/partitions, e.g., multiple Kafka partitions, such that even with
> watermark alignment the source operator may need to buffer excessive amount
> of data if one split emits data faster than another.
>
> For this part from the motivation section, is it accurate? Let's assume one
> source task consumes from 3 partitions and one of the partition is
> significantly slower. In this situation, watermark for this source task
> won't hold back as it is reading recent data from other two Kafka
> partitions. As a result, it won't hold back the overall watermark. I
> thought the problem is that we may have late data for this slow partition.
>
> I have another question about the restart. Say split alignment is
> triggered. checkpoint is completed. job failed and restored from the last
> checkpoint. because alignment decision is not checkpointed, initially
> alignment won't be enforced until we get a cycle of watermark aggregation
> and propagation, right? Not saying this corner is a problem. Just want to
> understand it more.
>
>
>
> On Thu, Apr 21, 2022 at 8:20 AM Thomas Weise <t...@apache.org> wrote:
>
> > Thanks for working on this!
> >
> > I wonder if "supporting" split alignment in SourceReaderBase and then
> doing
> > nothing if the split reader does not implement AlignedSplitReader could
> be
> > misleading? Perhaps WithSplitsAlignment can instead be added to the
> > specific source reader (i.e. KafkaSourceReader) to make it explicit that
> > the source actually supports it.
> >
> > Thanks,
> > Thomas
> >
> >
> > On Thu, Apr 21, 2022 at 4:57 AM Konstantin Knauf <kna...@apache.org>
> > wrote:
> >
> > > Hi Sebastian, Hi Dawid,
> > >
> > > As part of this FLIP, the `AlignedSplitReader` interface (aka the stop
> &
> > > resume behavior) will be implemented for Kafka and Pulsar only,
> correct?
> > >
> > > +1 in general. I believe it is valuable to complete the watermark
> aligned
> > > story with this FLIP.
> > >
> > > Cheers,
> > >
> > > Konstantin
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Thu, Apr 21, 2022 at 12:36 PM Dawid Wysakowicz <
> > dwysakow...@apache.org>
> > > wrote:
> > >
> > > > To be explicit, having worked on it, I support it ;) I think we can
> > > > start a vote thread soonish, as there are no concerns so far.
> > > >
> > > > Best,
> > > >
> > > > Dawid
> > > >
> > > > On 13/04/2022 11:27, Sebastian Mattheis wrote:
> > > > > Dear Flink developers,
> > > > >
> > > > > I would like to open a discussion on FLIP 217 [1] for an extension
> of
> > > > > Watermark Alignment to perform alignment also in SplitReaders. To
> do
> > > so,
> > > > > SplitReaders must be able to suspend and resume reading from split
> > > > sources
> > > > > where the SourceOperator coordinates and controlls suspend and
> > resume.
> > > To
> > > > > gather information about current watermarks of the SplitReaders, we
> > > > extend
> > > > > the internal WatermarkOutputMulitplexer and report watermarks to
> the
> > > > > SourceOperator.
> > > > >
> > > > > There is a PoC for this FLIP [2], prototyped by Arvid Heise and
> > revised
> > > > and
> > > > > reworked by Dawid Wysakowicz (He did most of the work.) and me. The
> > > > changes
> > > > > are backwards compatible in a way that if affected components do
> not
> > > > > support split alignment the behavior is as before.
> > > > >
> > > > > Best,
> > > > > Sebastian
> > > > >
> > > > > [1]
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-217+Support+watermark+alignment+of+source+splits
> > > > > [2] https://github.com/dawidwys/flink/tree/aligned-splits
> > > > >
> > > >
> > >
> > >
> > > --
> > >
> > > Konstantin Knauf
> > >
> > > https://twitter.com/snntrable
> > >
> > > https://github.com/knaufk
> > >
> >
>

Reply via email to