add dev@ group to the thread as Thomas suggested

Arvid,

The scenario 3 (Dynamic assignment + temporary no split) in the FLIP-180
(idleness) can happen to Iceberg source alignment, as readers can be
temporarily starved due to the holdback by the enumerator when assigning
new splits upon request.

Totally agree that we should decouple this discussion with the FLIP-217,
which addresses the split level watermark alignment problem as a follow-up
of FLIP-182

Becket,

Yes, currently Iceberg source implemented the alignment leveraging the
dynamic split assignment from FLIP-27 design. Basically, the enumerator can
hold back split assignments to readers when necessary. Everything are
centralized in the enumerator: (1) watermark extraction and aggregation (2)
alignment decision and execution

The motivation of this discussion is to see if Iceberg source can leverage
some of the watermark alignment solutions (like FLIP-182) from Flink
framework. E.g., as mentioned in the doc, Iceberg source can potentially
leverage the FLIP-182 framework to do the watermark extraction and
aggregation. For the alignment decision and execution, we can keep them in
the centralized enumerator.

Thanks,
Steven

On Thu, Apr 28, 2022 at 2:05 AM Becket Qin <becket....@gmail.com> wrote:

> Hi Steven,
>
> Thanks for pulling me into this thread. I think the timestamp
> alignment use case here is a good example of what FLIP-27 was designed for.
>
> Technically speaking, Iceberg source can already implement the timestamp
> alignment in the Flink new source even without FLIP-182. However, I
> understand the rationale here because timestamp alignment is also trying to
> orchestrate the consumption of splits. However, it looks like FLIP-182 was
> not designed in a way that it can be easily extended for other use cases.
> It may probably worth thinking of a more general mechanism to answer the
> following questions:
>
> 1. What information whose source of truth is the Flink framework should be
> exposed to the SplitEnumerator and SourceReader? And how?
> 2. What control actions in the Flink framework are worth exposing to the
> SplitEnumerators and SourceReaders? And how?
>
> In the context of timestamp alignment, the first question is more
> relevant. For example, instead of hardcode the ReportWatermarkEvent
> handling logic in the SourceCoordinator, should we expose this to the
> SplitEnumerator? So basically there will be some information, such as
> subtask local watermark, whose source of truth is Flink runtime, but useful
> to the user provided pluggables.
>
> I think there are a few control flow patterns to make a complete design:
>
> a. Framework space information (e.g. watermark) --> User space Pluggables
> (e.g. SplitEnumerator) --> User space Actions (e.g. Pause reading a split).
> b. Framework space information (e.g. task failure) --> User space
> pluggables (e.g. SplitEnumerator) --> Framework space actions (e.g. exit
> the job)
> c. User space information (e.g. a custom workload metric) --> User space
> pluggables (e.g. SplitEnumerator) --> User space actions (e.g. rebalance
> the workload across the source readers).
> d. Use space information (e.g. a custom stopping event in the stream) -->
> User space pluggables (e.g. SplitEnumerator) --> Framework space actions
> (e.g. stop the job).
>
> So basically for any user provided pluggables, the input information may
> either come from another user provided logic or from the framework, and
> after receiving that information, the pluggable may either want the
> framework or another pluggable to take an action. So this gives the above 4
> combinations.
>
> In our case, when the pluggables are SplitEnumerator and SourceReader, the
> control flows that only involve user space actions are fully supported. But
> it seems that when it comes to control flows involving framework space
> information, some of the information has not been exposed to the pluggable,
> and some framework actions might also be missing.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Thu, Apr 28, 2022 at 3:44 PM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi folks,
>>
>> quick input from my side. I think this is from the implementation
>> perspective what Piotr and I had in mind for a global min watermark that
>> helps in idleness cases. See also point 3 in
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition
>> .
>>
>> Basically, we would like to empower source enumerators to determine the
>> global min watermark for all source readers factoring in even future
>> splits. Not all sources can supply that information (think of a general
>> file source) but most should be able to. Basically, Flink should know for a
>> given source at a given point in time what the min watermark across all
>> source subtasks is.
>>
>> Here is some background:
>> In the context of idleness, we can deterministically advance the
>> watermark. In the pre-FLIP-27 era, we had heuristic approaches in sources
>> to switch to idleness and thus allow watermarks to increase in cases where
>> fewer splits than source tasks are available. However, for sources with
>> dynamic split discovery that actually yields incorrect results. Think of a
>> Kinesis consumer where a shard is split. Then a previously idle source
>> subtask may receive a new split with time t0 as the lowest timestamp. Since
>> the source subtask did not participate in the global watermark generation
>> (because it was idle), the previously emitted watermark may be past t0 and
>> thus results in late records potentially being discarded. A rerun of the
>> same pipeline on historic data would not render the source subtask idle and
>> not result in late records. The solution was to not render source subtasks
>> automatically idle by the framework if there are no spits. That leads to
>> confusion for Kafka users with static topic subscription where #splits <
>> #parallelism stalls pipelines because the watermark is not advancing. Here,
>> your sketched solution can be transferred to KafkaSource to let Flink know
>> that min global watermark on a static assignment is determined by the
>> slowest partition. Hence, all idle readers emit that min global watermark
>> and the user sees progress.
>> This whole idea is related to FLIP-182 watermark alignment but I'd go
>> with another FLIP as the goal is quite different even though the
>> implementation overlaps.
>>
>> Now Iceberg seems to use the same information to actually pause the
>> consumption of files and create some kind of orderness guarantees as far as
>> I understood. This probably can be applied to any source with dynamic split
>> discovery. However, I wouldn't mix up the concepts and hence I appreciate
>> you not chiming into the FLIP-182 and ff. threads. The goal of FLIP-182 is
>> to pause readers while consuming a split, while your approach pauses
>> readers before processing another split. So it feels more closely related
>> to the global min watermark - so it could either be part of that FLIP or a
>> FLIP of its own. Afaik API changes should actually happen only on the
>> enumerator side both for your ideas and for global min watermark.
>>
>> Best,
>>
>> Arvid
>>
>> On Wed, Apr 27, 2022 at 7:31 PM Thomas Weise <t...@apache.org> wrote:
>>
>>> Hi Steven,
>>>
>>> Would it be better to bring this as a separate thread related to Iceberg
>>> source to the dev@ list? I think this could benefit from broader input?
>>>
>>> Thanks
>>>
>>> On Wed, Apr 27, 2022 at 9:36 AM Steven Wu <stevenz...@gmail.com> wrote:
>>>
>>>> + Becket and Sebastian
>>>>
>>>> It is also related to the split level watermark alignment discussion
>>>> thread. Because it is already very long, I don't want to further complicate
>>>> the ongoing discussion there. But I can move the discussion to that
>>>> existing thread if that is preferred.
>>>>
>>>>
>>>> On Tue, Apr 26, 2022 at 10:03 PM Steven Wu <stevenz...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> We are thinking about how to align with the Flink community and
>>>>> leverage the FLIP-182 watermark alignment in the Iceberg source. I put 
>>>>> some
>>>>> context in this google doc. Would love to get hear your thoughts on this.
>>>>>
>>>>>
>>>>> https://docs.google.com/document/d/1zfwF8e5LszazcOzmUAOeOtpM9v8dKEPlY_BRFSmI3us/edit#
>>>>>
>>>>> Thanks,
>>>>> Steven
>>>>>
>>>>

Reply via email to