@Konstantin:

   As part of this FLIP, the `AlignedSplitReader` interface (aka the stop &
   resume behavior) will be implemented for Kafka and Pulsar only, correct?

Correct, as far as I know though, those are the only sources which consume concurrently from multiple splits and thus alignment applies.

@Thomas:

   I wonder if "supporting" split alignment in SourceReaderBase and
   then doing
   nothing if the split reader does not implement AlignedSplitReader
   could be
   misleading? Perhaps WithSplitsAlignment can instead be added to the
   specific source reader (i.e. KafkaSourceReader) to make it explicit that
   the source actually supports it.

I understand your concern. Hmm, I think we could actually do that. Given the actual implementation of the SourceReaderBase#alignSplits is rather short (just a forward to the corresponding method of SplitFetcher), we could reimplement it in the actual source implementations. This solution has the downside though. Authors of new sources would have to do two things: extend from AlignedSplitReader and implement WithSplitsAssignment, instead of just extending AlignedSplitReader. I would be fine with such a tradeoff though. What others think?

@Steven:

   For this part from the motivation section, is it accurate? Let's
   assume one
   source task consumes from 3 partitions and one of the partition is
   significantly slower. In this situation, watermark for this source task
   won't hold back as it is reading recent data from other two Kafka
   partitions. As a result, it won't hold back the overall watermark. I
   thought the problem is that we may have late data for this slow
   partition.

It will hold back the watermark. Watermark of an operator is the minimum of watermarks of all splits[1]

   I have another question about the restart. Say split alignment is
   triggered. checkpoint is completed. job failed and restored from the
   last
   checkpoint. because alignment decision is not checkpointed, initially
   alignment won't be enforced until we get a cycle of watermark
   aggregation
   and propagation, right? Not saying this corner is a problem. Just
   want to
   understand it more.

Your understanding is correct.

@Becket:

   1. I think watermark alignment is sort of a general use case, so
   should we
   just add the related methods to SourceReader directly instead of
   introducing the new interface of WithSplitAssignment? We can provide
   default implementations, so backwards compatibility won't be an issue.

I don't think we can provide a default implementation. How would we do that? Would it be just a no-op? Is it better than having an opt-in interface? The default implementation would have to be added exclusively in a *Public* SourceReader interface. By the way notice SourceReaderBase does extend from WithSplitsAlignment, so effectively all implementations do handle the alignment case. To be honest I think it is impossible to implement the SourceReader interface directly by end users.

   2. As you mentioned, the SplitReader interface probably also needs some
   change to support throttling at the split granularity. Can you add that
   interface change into the public interface section as well?

It has been added from the beginning. See *AlignedSplitReader.*

   3. Nit, can we avoid using the method name assignSplits here, given
   that it
   is not actually changing the split assignments? It seems something like
   pauseOrResumeSplits(), or adjustSplitsThrottling() is more accurate.

The method's called *alignSplits*, not assign. Do you still prefer a different name for that? Personally, I am open for suggestions here.

Best,

Dawid

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/#watermark-generation

On 22/04/2022 05:59, Becket Qin wrote:
Thanks for driving the effort, Sebastion. I think the motivation makes a
lot of sense. Just a few suggestions / questions.

1. I think watermark alignment is sort of a general use case, so should we
just add the related methods to SourceReader directly instead of
introducing the new interface of WithSplitAssignment? We can provide
default implementations, so backwards compatibility won't be an issue.

2. As you mentioned, the SplitReader interface probably also needs some
change to support throttling at the split granularity. Can you add that
interface change into the public interface section as well?

3. Nit, can we avoid using the method name assignSplits here, given that it
is not actually changing the split assignments? It seems something like
pauseOrResumeSplits(), or adjustSplitsThrottling() is more accurate.

Thanks,

Jiangjie (Becket) Qin

On Thu, Apr 21, 2022 at 11:39 PM Steven Wu<stevenz...@gmail.com>  wrote:

However, a single source operator may read data from multiple
splits/partitions, e.g., multiple Kafka partitions, such that even with
watermark alignment the source operator may need to buffer excessive amount
of data if one split emits data faster than another.

For this part from the motivation section, is it accurate? Let's assume one
source task consumes from 3 partitions and one of the partition is
significantly slower. In this situation, watermark for this source task
won't hold back as it is reading recent data from other two Kafka
partitions. As a result, it won't hold back the overall watermark. I
thought the problem is that we may have late data for this slow partition.

I have another question about the restart. Say split alignment is
triggered. checkpoint is completed. job failed and restored from the last
checkpoint. because alignment decision is not checkpointed, initially
alignment won't be enforced until we get a cycle of watermark aggregation
and propagation, right? Not saying this corner is a problem. Just want to
understand it more.



On Thu, Apr 21, 2022 at 8:20 AM Thomas Weise<t...@apache.org>  wrote:

Thanks for working on this!

I wonder if "supporting" split alignment in SourceReaderBase and then
doing
nothing if the split reader does not implement AlignedSplitReader could
be
misleading? Perhaps WithSplitsAlignment can instead be added to the
specific source reader (i.e. KafkaSourceReader) to make it explicit that
the source actually supports it.

Thanks,
Thomas


On Thu, Apr 21, 2022 at 4:57 AM Konstantin Knauf<kna...@apache.org>
wrote:

Hi Sebastian, Hi Dawid,

As part of this FLIP, the `AlignedSplitReader` interface (aka the stop
&
resume behavior) will be implemented for Kafka and Pulsar only,
correct?
+1 in general. I believe it is valuable to complete the watermark
aligned
story with this FLIP.

Cheers,

Konstantin







On Thu, Apr 21, 2022 at 12:36 PM Dawid Wysakowicz <
dwysakow...@apache.org>
wrote:

To be explicit, having worked on it, I support it ;) I think we can
start a vote thread soonish, as there are no concerns so far.

Best,

Dawid

On 13/04/2022 11:27, Sebastian Mattheis wrote:
Dear Flink developers,

I would like to open a discussion on FLIP 217 [1] for an extension
of
Watermark Alignment to perform alignment also in SplitReaders. To
do
so,
SplitReaders must be able to suspend and resume reading from split
sources
where the SourceOperator coordinates and controlls suspend and
resume.
To
gather information about current watermarks of the SplitReaders, we
extend
the internal WatermarkOutputMulitplexer and report watermarks to
the
SourceOperator.

There is a PoC for this FLIP [2], prototyped by Arvid Heise and
revised
and
reworked by Dawid Wysakowicz (He did most of the work.) and me. The
changes
are backwards compatible in a way that if affected components do
not
support split alignment the behavior is as before.

Best,
Sebastian

[1]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-217+Support+watermark+alignment+of+source+splits
[2]https://github.com/dawidwys/flink/tree/aligned-splits


--

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk

Attachment: OpenPGP_signature
Description: OpenPGP digital signature

Reply via email to