might be the same as => might NOT be the same as

On Fri, May 6, 2022 at 8:13 PM Steven Wu <stevenz...@gmail.com> wrote:

> The conclusion of this discussion could be that we don't see much value in
> leveraging FLIP-182 with Iceberg source. That would totally be fine.
>
> For me, one big sticking point is the alignment timestamp for the
> (Iceberg) source might be the same as the Flink application watermark.
>
> On Thu, May 5, 2022 at 9:53 PM Piotr Nowojski <piotr.nowoj...@gmail.com>
> wrote:
>
>> Option 1 sounds reasonable but I would be tempted to wait for a second
>> motivational use case before generalizing the framework. However I wouldn’t
>> oppose this extension if others feel it’s useful and good thing to do
>>
>> Piotrek
>>
>> > Wiadomość napisana przez Becket Qin <becket....@gmail.com> w dniu
>> 06.05.2022, o godz. 03:50:
>> >
>> > I think the key point here is essentially what information should Flink
>> > expose to the user pluggables. Apparently split / local task watermark
>> is
>> > something many user pluggables would be interested in. Right now it is
>> > calculated by the Flink framework but not exposed to the users space,
>> i.e.
>> > SourceReader / SplitEnumerator. So it looks at least we can offer this
>> > information in some way so users can leverage that information to do
>> > things.
>> >
>> > That said, I am not sure if this would help in the Iceberg alignment
>> case.
>> > Because at this point, FLIP-182 reports source reader watermarks
>> > periodically, which may not align with the RequestSplitEvent. So if we
>> > really want to leverage the FLIP-182 mechanism here, I see a few ways,
>> just
>> > to name two of them:
>> > 1. we can expose the source reader watermark in the
>> SourceReaderContext, so
>> > the source readers can put the local watermark in a custom operator
>> event.
>> > This will effectively bypass the existing RequestSplitEvent. Or we can
>> also
>> > extend the RequestSplitEvent to add an additional info field of byte[]
>> > type, so users can piggy-back additional information there, be it
>> watermark
>> > or other stuff.
>> > 2. Simply piggy-back the local watermark in the RequestSplitEvent and
>> pass
>> > that info to the SplitEnumerator as well.
>> >
>> > If we are going to do this, personally I'd prefer the first way, as it
>> > provides a mechanism to allow future extension. So it would be easier to
>> > expose other framework information to the user space in the future.
>> >
>> > Thanks,
>> >
>> > Jiangjie (Becket) Qin
>> >
>> >
>> >
>> >> On Fri, May 6, 2022 at 6:15 AM Thomas Weise <t...@apache.org> wrote:
>> >>
>> >>> On Wed, May 4, 2022 at 11:03 AM Steven Wu <stevenz...@gmail.com>
>> wrote:
>> >>> Any opinion on different timestamp for source alignment (vs Flink
>> >> application watermark)? For Iceberg source, we might want to enforce
>> >> alignment on kafka timestamp but Flink application watermark may use
>> event
>> >> time field from payload.
>> >>
>> >> I imagine that more generally the question is alignment based on the
>> >> iceberg partition/file metadata vs. individual rows? I think that
>> >> should work as long as there is a guarantee for out of orderness
>> >> within the split?
>> >>
>> >> Thomas
>> >>
>> >>>
>> >>> Thanks,
>> >>> Steven
>> >>>
>> >>> On Wed, May 4, 2022 at 7:02 AM Becket Qin <becket....@gmail.com>
>> wrote:
>> >>>>
>> >>>> Hey Piotr,
>> >>>>
>> >>>> I think the mechanism FLIP-182 provided is a reasonable default one,
>> >> which
>> >>>> ensures the watermarks are only drifted by an upper bound. However,
>> >>>> admittedly there are also other strategies for different purposes.
>> >>>>
>> >>>> In the Iceberg case, I am not sure if a static strictly allowed
>> >> watermark
>> >>>> drift is desired. The source might just want to finish reading the
>> >> assigned
>> >>>> splits as fast as possible. And it is OK to have a drift of "one
>> split",
>> >>>> instead of a fixed time period.
>> >>>>
>> >>>> As another example, if there are some fast readers whose splits are
>> >> always
>> >>>> throttled, while the other slow readers are struggling to keep up
>> with
>> >> the
>> >>>> rest of the splits, the split enumerator may decide to reassign the
>> slow
>> >>>> splits so all the readers have something to read. This would need the
>> >>>> SplitEnumerator to be aware of the watermark progress on each reader.
>> >> So it
>> >>>> seems useful to expose the WatermarkAlignmentEvent information to the
>> >>>> SplitEnumerator as well.
>> >>>>
>> >>>> Thanks,
>> >>>>
>> >>>> Jiangjie (Becket) Qin
>> >>>>
>> >>>>
>> >>>>
>> >>>> On Tue, May 3, 2022 at 7:58 PM Piotr Nowojski <pnowoj...@apache.org>
>> >> wrote:
>> >>>>
>> >>>>> Hi Steven,
>> >>>>>
>> >>>>> Isn't this redundant to FLIP-182 and FLIP-217? Can not Iceberg just
>> >> emit
>> >>>>> all splits and let FLIP-182/FLIP-217 handle the watermark alignment
>> >> and
>> >>>>> block the splits that are too much into the future? I can see this
>> >> being an
>> >>>>> issue if the existence of too many blocked splits is occupying too
>> >> many
>> >>>>> resources.
>> >>>>>
>> >>>>> If that's the case, indeed SourceCoordinator/SplitEnumerator would
>> >> have to
>> >>>>> decide on some basis how many and which splits to assign in what
>> >> order. But
>> >>>>> in that case I'm not sure how much you could use from FLIP-182 and
>> >>>>> FLIP-217. They seem somehow orthogonal to me, operating on different
>> >>>>> levels. FLIP-182 and FLIP-217 are working with whatever splits have
>> >> already
>> >>>>> been generated and assigned. You could leverage FLIP-182 and
>> FLIP-217
>> >> and
>> >>>>> take care of only the problem to limit the number of parallel active
>> >>>>> splits. And here I'm not sure if it would be worth generalising a
>> >> solution
>> >>>>> across different connectors.
>> >>>>>
>> >>>>> Regarding the global watermark, I made a related comment sometime
>> ago
>> >>>>> about it [1]. It sounds to me like you also need to solve this
>> >> problem,
>> >>>>> otherwise Iceberg users will encounter late records in case of some
>> >> race
>> >>>>> conditions between assigning new splits and completions of older.
>> >>>>>
>> >>>>> Best,
>> >>>>> Piotrek
>> >>>>>
>> >>>>> [1]
>> >>>>>
>> >>
>> https://issues.apache.org/jira/browse/FLINK-21871?focusedCommentId=17495545&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17495545
>> >>>>>
>> >>>>> pon., 2 maj 2022 o 04:26 Steven Wu <stevenz...@gmail.com>
>> napisał(a):
>> >>>>>
>> >>>>>> add dev@ group to the thread as Thomas suggested
>> >>>>>>
>> >>>>>> Arvid,
>> >>>>>>
>> >>>>>> The scenario 3 (Dynamic assignment + temporary no split) in the
>> >> FLIP-180
>> >>>>>> (idleness) can happen to Iceberg source alignment, as readers can
>> be
>> >>>>>> temporarily starved due to the holdback by the enumerator when
>> >> assigning
>> >>>>>> new splits upon request.
>> >>>>>>
>> >>>>>> Totally agree that we should decouple this discussion with the
>> >> FLIP-217,
>> >>>>>> which addresses the split level watermark alignment problem as a
>> >> follow-up
>> >>>>>> of FLIP-182
>> >>>>>>
>> >>>>>> Becket,
>> >>>>>>
>> >>>>>> Yes, currently Iceberg source implemented the alignment leveraging
>> >> the
>> >>>>>> dynamic split assignment from FLIP-27 design. Basically, the
>> >> enumerator
>> >>>>>> can
>> >>>>>> hold back split assignments to readers when necessary. Everything
>> are
>> >>>>>> centralized in the enumerator: (1) watermark extraction and
>> >> aggregation
>> >>>>>> (2)
>> >>>>>> alignment decision and execution
>> >>>>>>
>> >>>>>> The motivation of this discussion is to see if Iceberg source can
>> >> leverage
>> >>>>>> some of the watermark alignment solutions (like FLIP-182) from
>> Flink
>> >>>>>> framework. E.g., as mentioned in the doc, Iceberg source can
>> >> potentially
>> >>>>>> leverage the FLIP-182 framework to do the watermark extraction and
>> >>>>>> aggregation. For the alignment decision and execution, we can keep
>> >> them in
>> >>>>>> the centralized enumerator.
>> >>>>>>
>> >>>>>> Thanks,
>> >>>>>> Steven
>> >>>>>>
>> >>>>>> On Thu, Apr 28, 2022 at 2:05 AM Becket Qin <becket....@gmail.com>
>> >> wrote:
>> >>>>>>
>> >>>>>>> Hi Steven,
>> >>>>>>>
>> >>>>>>> Thanks for pulling me into this thread. I think the timestamp
>> >>>>>>> alignment use case here is a good example of what FLIP-27 was
>> >> designed
>> >>>>>> for.
>> >>>>>>>
>> >>>>>>> Technically speaking, Iceberg source can already implement the
>> >> timestamp
>> >>>>>>> alignment in the Flink new source even without FLIP-182. However,
>> I
>> >>>>>>> understand the rationale here because timestamp alignment is also
>> >>>>>> trying to
>> >>>>>>> orchestrate the consumption of splits. However, it looks like
>> >> FLIP-182
>> >>>>>> was
>> >>>>>>> not designed in a way that it can be easily extended for other use
>> >>>>>> cases.
>> >>>>>>> It may probably worth thinking of a more general mechanism to
>> >> answer the
>> >>>>>>> following questions:
>> >>>>>>>
>> >>>>>>> 1. What information whose source of truth is the Flink framework
>> >> should
>> >>>>>> be
>> >>>>>>> exposed to the SplitEnumerator and SourceReader? And how?
>> >>>>>>> 2. What control actions in the Flink framework are worth exposing
>> >> to the
>> >>>>>>> SplitEnumerators and SourceReaders? And how?
>> >>>>>>>
>> >>>>>>> In the context of timestamp alignment, the first question is more
>> >>>>>>> relevant. For example, instead of hardcode the
>> ReportWatermarkEvent
>> >>>>>>> handling logic in the SourceCoordinator, should we expose this to
>> >> the
>> >>>>>>> SplitEnumerator? So basically there will be some information, such
>> >> as
>> >>>>>>> subtask local watermark, whose source of truth is Flink runtime,
>> >> but
>> >>>>>> useful
>> >>>>>>> to the user provided pluggables.
>> >>>>>>>
>> >>>>>>> I think there are a few control flow patterns to make a complete
>> >> design:
>> >>>>>>>
>> >>>>>>> a. Framework space information (e.g. watermark) --> User space
>> >>>>>> Pluggables
>> >>>>>>> (e.g. SplitEnumerator) --> User space Actions (e.g. Pause reading
>> a
>> >>>>>> split).
>> >>>>>>> b. Framework space information (e.g. task failure) --> User space
>> >>>>>>> pluggables (e.g. SplitEnumerator) --> Framework space actions
>> >> (e.g. exit
>> >>>>>>> the job)
>> >>>>>>> c. User space information (e.g. a custom workload metric) --> User
>> >> space
>> >>>>>>> pluggables (e.g. SplitEnumerator) --> User space actions (e.g.
>> >> rebalance
>> >>>>>>> the workload across the source readers).
>> >>>>>>> d. Use space information (e.g. a custom stopping event in the
>> >> stream)
>> >>>>>> -->
>> >>>>>>> User space pluggables (e.g. SplitEnumerator) --> Framework space
>> >> actions
>> >>>>>>> (e.g. stop the job).
>> >>>>>>>
>> >>>>>>> So basically for any user provided pluggables, the input
>> >> information may
>> >>>>>>> either come from another user provided logic or from the
>> >> framework, and
>> >>>>>>> after receiving that information, the pluggable may either want
>> the
>> >>>>>>> framework or another pluggable to take an action. So this gives
>> the
>> >>>>>> above 4
>> >>>>>>> combinations.
>> >>>>>>>
>> >>>>>>> In our case, when the pluggables are SplitEnumerator and
>> >> SourceReader,
>> >>>>>> the
>> >>>>>>> control flows that only involve user space actions are fully
>> >> supported.
>> >>>>>> But
>> >>>>>>> it seems that when it comes to control flows involving framework
>> >> space
>> >>>>>>> information, some of the information has not been exposed to the
>> >>>>>> pluggable,
>> >>>>>>> and some framework actions might also be missing.
>> >>>>>>>
>> >>>>>>> Thanks,
>> >>>>>>>
>> >>>>>>> Jiangjie (Becket) Qin
>> >>>>>>>
>> >>>>>>> On Thu, Apr 28, 2022 at 3:44 PM Arvid Heise <ar...@apache.org>
>> >> wrote:
>> >>>>>>>
>> >>>>>>>> Hi folks,
>> >>>>>>>>
>> >>>>>>>> quick input from my side. I think this is from the implementation
>> >>>>>>>> perspective what Piotr and I had in mind for a global min
>> >> watermark
>> >>>>>> that
>> >>>>>>>> helps in idleness cases. See also point 3 in
>> >>>>>>>>
>> >>>>>>
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition
>> >>>>>>>> .
>> >>>>>>>>
>> >>>>>>>> Basically, we would like to empower source enumerators to
>> >> determine the
>> >>>>>>>> global min watermark for all source readers factoring in even
>> >> future
>> >>>>>>>> splits. Not all sources can supply that information (think of a
>> >> general
>> >>>>>>>> file source) but most should be able to. Basically, Flink should
>> >> know
>> >>>>>> for a
>> >>>>>>>> given source at a given point in time what the min watermark
>> >> across all
>> >>>>>>>> source subtasks is.
>> >>>>>>>>
>> >>>>>>>> Here is some background:
>> >>>>>>>> In the context of idleness, we can deterministically advance the
>> >>>>>>>> watermark. In the pre-FLIP-27 era, we had heuristic approaches in
>> >>>>>> sources
>> >>>>>>>> to switch to idleness and thus allow watermarks to increase in
>> >> cases
>> >>>>>> where
>> >>>>>>>> fewer splits than source tasks are available. However, for
>> >> sources with
>> >>>>>>>> dynamic split discovery that actually yields incorrect results.
>> >> Think
>> >>>>>> of a
>> >>>>>>>> Kinesis consumer where a shard is split. Then a previously idle
>> >> source
>> >>>>>>>> subtask may receive a new split with time t0 as the lowest
>> >> timestamp.
>> >>>>>> Since
>> >>>>>>>> the source subtask did not participate in the global watermark
>> >>>>>> generation
>> >>>>>>>> (because it was idle), the previously emitted watermark may be
>> >> past t0
>> >>>>>> and
>> >>>>>>>> thus results in late records potentially being discarded. A rerun
>> >> of
>> >>>>>> the
>> >>>>>>>> same pipeline on historic data would not render the source
>> subtask
>> >>>>>> idle and
>> >>>>>>>> not result in late records. The solution was to not render source
>> >>>>>> subtasks
>> >>>>>>>> automatically idle by the framework if there are no spits. That
>> >> leads
>> >>>>>> to
>> >>>>>>>> confusion for Kafka users with static topic subscription where
>> >> #splits
>> >>>>>> <
>> >>>>>>>> #parallelism stalls pipelines because the watermark is not
>> >> advancing.
>> >>>>>> Here,
>> >>>>>>>> your sketched solution can be transferred to KafkaSource to let
>> >> Flink
>> >>>>>> know
>> >>>>>>>> that min global watermark on a static assignment is determined by
>> >> the
>> >>>>>>>> slowest partition. Hence, all idle readers emit that min global
>> >>>>>> watermark
>> >>>>>>>> and the user sees progress.
>> >>>>>>>> This whole idea is related to FLIP-182 watermark alignment but
>> >> I'd go
>> >>>>>>>> with another FLIP as the goal is quite different even though the
>> >>>>>>>> implementation overlaps.
>> >>>>>>>>
>> >>>>>>>> Now Iceberg seems to use the same information to actually pause
>> >> the
>> >>>>>>>> consumption of files and create some kind of orderness guarantees
>> >> as
>> >>>>>> far as
>> >>>>>>>> I understood. This probably can be applied to any source with
>> >> dynamic
>> >>>>>> split
>> >>>>>>>> discovery. However, I wouldn't mix up the concepts and hence I
>> >>>>>> appreciate
>> >>>>>>>> you not chiming into the FLIP-182 and ff. threads. The goal of
>> >>>>>> FLIP-182 is
>> >>>>>>>> to pause readers while consuming a split, while your approach
>> >> pauses
>> >>>>>>>> readers before processing another split. So it feels more closely
>> >>>>>> related
>> >>>>>>>> to the global min watermark - so it could either be part of that
>> >> FLIP
>> >>>>>> or a
>> >>>>>>>> FLIP of its own. Afaik API changes should actually happen only on
>> >> the
>> >>>>>>>> enumerator side both for your ideas and for global min watermark.
>> >>>>>>>>
>> >>>>>>>> Best,
>> >>>>>>>>
>> >>>>>>>> Arvid
>> >>>>>>>>
>> >>>>>>>> On Wed, Apr 27, 2022 at 7:31 PM Thomas Weise <t...@apache.org>
>> >> wrote:
>> >>>>>>>>
>> >>>>>>>>> Hi Steven,
>> >>>>>>>>>
>> >>>>>>>>> Would it be better to bring this as a separate thread related to
>> >>>>>> Iceberg
>> >>>>>>>>> source to the dev@ list? I think this could benefit from
>> broader
>> >>>>>> input?
>> >>>>>>>>>
>> >>>>>>>>> Thanks
>> >>>>>>>>>
>> >>>>>>>>> On Wed, Apr 27, 2022 at 9:36 AM Steven Wu <stevenz...@gmail.com
>> >
>> >>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>>> + Becket and Sebastian
>> >>>>>>>>>>
>> >>>>>>>>>> It is also related to the split level watermark alignment
>> >> discussion
>> >>>>>>>>>> thread. Because it is already very long, I don't want to
>> further
>> >>>>>> complicate
>> >>>>>>>>>> the ongoing discussion there. But I can move the discussion to
>> >> that
>> >>>>>>>>>> existing thread if that is preferred.
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> On Tue, Apr 26, 2022 at 10:03 PM Steven Wu <
>> >> stevenz...@gmail.com>
>> >>>>>>>>>> wrote:
>> >>>>>>>>>>
>> >>>>>>>>>>> Hi all,
>> >>>>>>>>>>>
>> >>>>>>>>>>> We are thinking about how to align with the Flink community
>> and
>> >>>>>>>>>>> leverage the FLIP-182 watermark alignment in the Iceberg
>> >> source. I
>> >>>>>> put some
>> >>>>>>>>>>> context in this google doc. Would love to get hear your
>> >> thoughts on
>> >>>>>> this.
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>
>> >>
>> https://docs.google.com/document/d/1zfwF8e5LszazcOzmUAOeOtpM9v8dKEPlY_BRFSmI3us/edit#
>> >>>>>>>>>>>
>> >>>>>>>>>>> Thanks,
>> >>>>>>>>>>> Steven
>> >>>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>
>> >>>>>
>> >>
>>
>

Reply via email to