might be the same as => might NOT be the same as On Fri, May 6, 2022 at 8:13 PM Steven Wu <stevenz...@gmail.com> wrote:
> The conclusion of this discussion could be that we don't see much value in > leveraging FLIP-182 with Iceberg source. That would totally be fine. > > For me, one big sticking point is the alignment timestamp for the > (Iceberg) source might be the same as the Flink application watermark. > > On Thu, May 5, 2022 at 9:53 PM Piotr Nowojski <piotr.nowoj...@gmail.com> > wrote: > >> Option 1 sounds reasonable but I would be tempted to wait for a second >> motivational use case before generalizing the framework. However I wouldn’t >> oppose this extension if others feel it’s useful and good thing to do >> >> Piotrek >> >> > Wiadomość napisana przez Becket Qin <becket....@gmail.com> w dniu >> 06.05.2022, o godz. 03:50: >> > >> > I think the key point here is essentially what information should Flink >> > expose to the user pluggables. Apparently split / local task watermark >> is >> > something many user pluggables would be interested in. Right now it is >> > calculated by the Flink framework but not exposed to the users space, >> i.e. >> > SourceReader / SplitEnumerator. So it looks at least we can offer this >> > information in some way so users can leverage that information to do >> > things. >> > >> > That said, I am not sure if this would help in the Iceberg alignment >> case. >> > Because at this point, FLIP-182 reports source reader watermarks >> > periodically, which may not align with the RequestSplitEvent. So if we >> > really want to leverage the FLIP-182 mechanism here, I see a few ways, >> just >> > to name two of them: >> > 1. we can expose the source reader watermark in the >> SourceReaderContext, so >> > the source readers can put the local watermark in a custom operator >> event. >> > This will effectively bypass the existing RequestSplitEvent. Or we can >> also >> > extend the RequestSplitEvent to add an additional info field of byte[] >> > type, so users can piggy-back additional information there, be it >> watermark >> > or other stuff. >> > 2. Simply piggy-back the local watermark in the RequestSplitEvent and >> pass >> > that info to the SplitEnumerator as well. >> > >> > If we are going to do this, personally I'd prefer the first way, as it >> > provides a mechanism to allow future extension. So it would be easier to >> > expose other framework information to the user space in the future. >> > >> > Thanks, >> > >> > Jiangjie (Becket) Qin >> > >> > >> > >> >> On Fri, May 6, 2022 at 6:15 AM Thomas Weise <t...@apache.org> wrote: >> >> >> >>> On Wed, May 4, 2022 at 11:03 AM Steven Wu <stevenz...@gmail.com> >> wrote: >> >>> Any opinion on different timestamp for source alignment (vs Flink >> >> application watermark)? For Iceberg source, we might want to enforce >> >> alignment on kafka timestamp but Flink application watermark may use >> event >> >> time field from payload. >> >> >> >> I imagine that more generally the question is alignment based on the >> >> iceberg partition/file metadata vs. individual rows? I think that >> >> should work as long as there is a guarantee for out of orderness >> >> within the split? >> >> >> >> Thomas >> >> >> >>> >> >>> Thanks, >> >>> Steven >> >>> >> >>> On Wed, May 4, 2022 at 7:02 AM Becket Qin <becket....@gmail.com> >> wrote: >> >>>> >> >>>> Hey Piotr, >> >>>> >> >>>> I think the mechanism FLIP-182 provided is a reasonable default one, >> >> which >> >>>> ensures the watermarks are only drifted by an upper bound. However, >> >>>> admittedly there are also other strategies for different purposes. >> >>>> >> >>>> In the Iceberg case, I am not sure if a static strictly allowed >> >> watermark >> >>>> drift is desired. The source might just want to finish reading the >> >> assigned >> >>>> splits as fast as possible. And it is OK to have a drift of "one >> split", >> >>>> instead of a fixed time period. >> >>>> >> >>>> As another example, if there are some fast readers whose splits are >> >> always >> >>>> throttled, while the other slow readers are struggling to keep up >> with >> >> the >> >>>> rest of the splits, the split enumerator may decide to reassign the >> slow >> >>>> splits so all the readers have something to read. This would need the >> >>>> SplitEnumerator to be aware of the watermark progress on each reader. >> >> So it >> >>>> seems useful to expose the WatermarkAlignmentEvent information to the >> >>>> SplitEnumerator as well. >> >>>> >> >>>> Thanks, >> >>>> >> >>>> Jiangjie (Becket) Qin >> >>>> >> >>>> >> >>>> >> >>>> On Tue, May 3, 2022 at 7:58 PM Piotr Nowojski <pnowoj...@apache.org> >> >> wrote: >> >>>> >> >>>>> Hi Steven, >> >>>>> >> >>>>> Isn't this redundant to FLIP-182 and FLIP-217? Can not Iceberg just >> >> emit >> >>>>> all splits and let FLIP-182/FLIP-217 handle the watermark alignment >> >> and >> >>>>> block the splits that are too much into the future? I can see this >> >> being an >> >>>>> issue if the existence of too many blocked splits is occupying too >> >> many >> >>>>> resources. >> >>>>> >> >>>>> If that's the case, indeed SourceCoordinator/SplitEnumerator would >> >> have to >> >>>>> decide on some basis how many and which splits to assign in what >> >> order. But >> >>>>> in that case I'm not sure how much you could use from FLIP-182 and >> >>>>> FLIP-217. They seem somehow orthogonal to me, operating on different >> >>>>> levels. FLIP-182 and FLIP-217 are working with whatever splits have >> >> already >> >>>>> been generated and assigned. You could leverage FLIP-182 and >> FLIP-217 >> >> and >> >>>>> take care of only the problem to limit the number of parallel active >> >>>>> splits. And here I'm not sure if it would be worth generalising a >> >> solution >> >>>>> across different connectors. >> >>>>> >> >>>>> Regarding the global watermark, I made a related comment sometime >> ago >> >>>>> about it [1]. It sounds to me like you also need to solve this >> >> problem, >> >>>>> otherwise Iceberg users will encounter late records in case of some >> >> race >> >>>>> conditions between assigning new splits and completions of older. >> >>>>> >> >>>>> Best, >> >>>>> Piotrek >> >>>>> >> >>>>> [1] >> >>>>> >> >> >> https://issues.apache.org/jira/browse/FLINK-21871?focusedCommentId=17495545&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17495545 >> >>>>> >> >>>>> pon., 2 maj 2022 o 04:26 Steven Wu <stevenz...@gmail.com> >> napisał(a): >> >>>>> >> >>>>>> add dev@ group to the thread as Thomas suggested >> >>>>>> >> >>>>>> Arvid, >> >>>>>> >> >>>>>> The scenario 3 (Dynamic assignment + temporary no split) in the >> >> FLIP-180 >> >>>>>> (idleness) can happen to Iceberg source alignment, as readers can >> be >> >>>>>> temporarily starved due to the holdback by the enumerator when >> >> assigning >> >>>>>> new splits upon request. >> >>>>>> >> >>>>>> Totally agree that we should decouple this discussion with the >> >> FLIP-217, >> >>>>>> which addresses the split level watermark alignment problem as a >> >> follow-up >> >>>>>> of FLIP-182 >> >>>>>> >> >>>>>> Becket, >> >>>>>> >> >>>>>> Yes, currently Iceberg source implemented the alignment leveraging >> >> the >> >>>>>> dynamic split assignment from FLIP-27 design. Basically, the >> >> enumerator >> >>>>>> can >> >>>>>> hold back split assignments to readers when necessary. Everything >> are >> >>>>>> centralized in the enumerator: (1) watermark extraction and >> >> aggregation >> >>>>>> (2) >> >>>>>> alignment decision and execution >> >>>>>> >> >>>>>> The motivation of this discussion is to see if Iceberg source can >> >> leverage >> >>>>>> some of the watermark alignment solutions (like FLIP-182) from >> Flink >> >>>>>> framework. E.g., as mentioned in the doc, Iceberg source can >> >> potentially >> >>>>>> leverage the FLIP-182 framework to do the watermark extraction and >> >>>>>> aggregation. For the alignment decision and execution, we can keep >> >> them in >> >>>>>> the centralized enumerator. >> >>>>>> >> >>>>>> Thanks, >> >>>>>> Steven >> >>>>>> >> >>>>>> On Thu, Apr 28, 2022 at 2:05 AM Becket Qin <becket....@gmail.com> >> >> wrote: >> >>>>>> >> >>>>>>> Hi Steven, >> >>>>>>> >> >>>>>>> Thanks for pulling me into this thread. I think the timestamp >> >>>>>>> alignment use case here is a good example of what FLIP-27 was >> >> designed >> >>>>>> for. >> >>>>>>> >> >>>>>>> Technically speaking, Iceberg source can already implement the >> >> timestamp >> >>>>>>> alignment in the Flink new source even without FLIP-182. However, >> I >> >>>>>>> understand the rationale here because timestamp alignment is also >> >>>>>> trying to >> >>>>>>> orchestrate the consumption of splits. However, it looks like >> >> FLIP-182 >> >>>>>> was >> >>>>>>> not designed in a way that it can be easily extended for other use >> >>>>>> cases. >> >>>>>>> It may probably worth thinking of a more general mechanism to >> >> answer the >> >>>>>>> following questions: >> >>>>>>> >> >>>>>>> 1. What information whose source of truth is the Flink framework >> >> should >> >>>>>> be >> >>>>>>> exposed to the SplitEnumerator and SourceReader? And how? >> >>>>>>> 2. What control actions in the Flink framework are worth exposing >> >> to the >> >>>>>>> SplitEnumerators and SourceReaders? And how? >> >>>>>>> >> >>>>>>> In the context of timestamp alignment, the first question is more >> >>>>>>> relevant. For example, instead of hardcode the >> ReportWatermarkEvent >> >>>>>>> handling logic in the SourceCoordinator, should we expose this to >> >> the >> >>>>>>> SplitEnumerator? So basically there will be some information, such >> >> as >> >>>>>>> subtask local watermark, whose source of truth is Flink runtime, >> >> but >> >>>>>> useful >> >>>>>>> to the user provided pluggables. >> >>>>>>> >> >>>>>>> I think there are a few control flow patterns to make a complete >> >> design: >> >>>>>>> >> >>>>>>> a. Framework space information (e.g. watermark) --> User space >> >>>>>> Pluggables >> >>>>>>> (e.g. SplitEnumerator) --> User space Actions (e.g. Pause reading >> a >> >>>>>> split). >> >>>>>>> b. Framework space information (e.g. task failure) --> User space >> >>>>>>> pluggables (e.g. SplitEnumerator) --> Framework space actions >> >> (e.g. exit >> >>>>>>> the job) >> >>>>>>> c. User space information (e.g. a custom workload metric) --> User >> >> space >> >>>>>>> pluggables (e.g. SplitEnumerator) --> User space actions (e.g. >> >> rebalance >> >>>>>>> the workload across the source readers). >> >>>>>>> d. Use space information (e.g. a custom stopping event in the >> >> stream) >> >>>>>> --> >> >>>>>>> User space pluggables (e.g. SplitEnumerator) --> Framework space >> >> actions >> >>>>>>> (e.g. stop the job). >> >>>>>>> >> >>>>>>> So basically for any user provided pluggables, the input >> >> information may >> >>>>>>> either come from another user provided logic or from the >> >> framework, and >> >>>>>>> after receiving that information, the pluggable may either want >> the >> >>>>>>> framework or another pluggable to take an action. So this gives >> the >> >>>>>> above 4 >> >>>>>>> combinations. >> >>>>>>> >> >>>>>>> In our case, when the pluggables are SplitEnumerator and >> >> SourceReader, >> >>>>>> the >> >>>>>>> control flows that only involve user space actions are fully >> >> supported. >> >>>>>> But >> >>>>>>> it seems that when it comes to control flows involving framework >> >> space >> >>>>>>> information, some of the information has not been exposed to the >> >>>>>> pluggable, >> >>>>>>> and some framework actions might also be missing. >> >>>>>>> >> >>>>>>> Thanks, >> >>>>>>> >> >>>>>>> Jiangjie (Becket) Qin >> >>>>>>> >> >>>>>>> On Thu, Apr 28, 2022 at 3:44 PM Arvid Heise <ar...@apache.org> >> >> wrote: >> >>>>>>> >> >>>>>>>> Hi folks, >> >>>>>>>> >> >>>>>>>> quick input from my side. I think this is from the implementation >> >>>>>>>> perspective what Piotr and I had in mind for a global min >> >> watermark >> >>>>>> that >> >>>>>>>> helps in idleness cases. See also point 3 in >> >>>>>>>> >> >>>>>> >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition >> >>>>>>>> . >> >>>>>>>> >> >>>>>>>> Basically, we would like to empower source enumerators to >> >> determine the >> >>>>>>>> global min watermark for all source readers factoring in even >> >> future >> >>>>>>>> splits. Not all sources can supply that information (think of a >> >> general >> >>>>>>>> file source) but most should be able to. Basically, Flink should >> >> know >> >>>>>> for a >> >>>>>>>> given source at a given point in time what the min watermark >> >> across all >> >>>>>>>> source subtasks is. >> >>>>>>>> >> >>>>>>>> Here is some background: >> >>>>>>>> In the context of idleness, we can deterministically advance the >> >>>>>>>> watermark. In the pre-FLIP-27 era, we had heuristic approaches in >> >>>>>> sources >> >>>>>>>> to switch to idleness and thus allow watermarks to increase in >> >> cases >> >>>>>> where >> >>>>>>>> fewer splits than source tasks are available. However, for >> >> sources with >> >>>>>>>> dynamic split discovery that actually yields incorrect results. >> >> Think >> >>>>>> of a >> >>>>>>>> Kinesis consumer where a shard is split. Then a previously idle >> >> source >> >>>>>>>> subtask may receive a new split with time t0 as the lowest >> >> timestamp. >> >>>>>> Since >> >>>>>>>> the source subtask did not participate in the global watermark >> >>>>>> generation >> >>>>>>>> (because it was idle), the previously emitted watermark may be >> >> past t0 >> >>>>>> and >> >>>>>>>> thus results in late records potentially being discarded. A rerun >> >> of >> >>>>>> the >> >>>>>>>> same pipeline on historic data would not render the source >> subtask >> >>>>>> idle and >> >>>>>>>> not result in late records. The solution was to not render source >> >>>>>> subtasks >> >>>>>>>> automatically idle by the framework if there are no spits. That >> >> leads >> >>>>>> to >> >>>>>>>> confusion for Kafka users with static topic subscription where >> >> #splits >> >>>>>> < >> >>>>>>>> #parallelism stalls pipelines because the watermark is not >> >> advancing. >> >>>>>> Here, >> >>>>>>>> your sketched solution can be transferred to KafkaSource to let >> >> Flink >> >>>>>> know >> >>>>>>>> that min global watermark on a static assignment is determined by >> >> the >> >>>>>>>> slowest partition. Hence, all idle readers emit that min global >> >>>>>> watermark >> >>>>>>>> and the user sees progress. >> >>>>>>>> This whole idea is related to FLIP-182 watermark alignment but >> >> I'd go >> >>>>>>>> with another FLIP as the goal is quite different even though the >> >>>>>>>> implementation overlaps. >> >>>>>>>> >> >>>>>>>> Now Iceberg seems to use the same information to actually pause >> >> the >> >>>>>>>> consumption of files and create some kind of orderness guarantees >> >> as >> >>>>>> far as >> >>>>>>>> I understood. This probably can be applied to any source with >> >> dynamic >> >>>>>> split >> >>>>>>>> discovery. However, I wouldn't mix up the concepts and hence I >> >>>>>> appreciate >> >>>>>>>> you not chiming into the FLIP-182 and ff. threads. The goal of >> >>>>>> FLIP-182 is >> >>>>>>>> to pause readers while consuming a split, while your approach >> >> pauses >> >>>>>>>> readers before processing another split. So it feels more closely >> >>>>>> related >> >>>>>>>> to the global min watermark - so it could either be part of that >> >> FLIP >> >>>>>> or a >> >>>>>>>> FLIP of its own. Afaik API changes should actually happen only on >> >> the >> >>>>>>>> enumerator side both for your ideas and for global min watermark. >> >>>>>>>> >> >>>>>>>> Best, >> >>>>>>>> >> >>>>>>>> Arvid >> >>>>>>>> >> >>>>>>>> On Wed, Apr 27, 2022 at 7:31 PM Thomas Weise <t...@apache.org> >> >> wrote: >> >>>>>>>> >> >>>>>>>>> Hi Steven, >> >>>>>>>>> >> >>>>>>>>> Would it be better to bring this as a separate thread related to >> >>>>>> Iceberg >> >>>>>>>>> source to the dev@ list? I think this could benefit from >> broader >> >>>>>> input? >> >>>>>>>>> >> >>>>>>>>> Thanks >> >>>>>>>>> >> >>>>>>>>> On Wed, Apr 27, 2022 at 9:36 AM Steven Wu <stevenz...@gmail.com >> > >> >>>>>> wrote: >> >>>>>>>>> >> >>>>>>>>>> + Becket and Sebastian >> >>>>>>>>>> >> >>>>>>>>>> It is also related to the split level watermark alignment >> >> discussion >> >>>>>>>>>> thread. Because it is already very long, I don't want to >> further >> >>>>>> complicate >> >>>>>>>>>> the ongoing discussion there. But I can move the discussion to >> >> that >> >>>>>>>>>> existing thread if that is preferred. >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> On Tue, Apr 26, 2022 at 10:03 PM Steven Wu < >> >> stevenz...@gmail.com> >> >>>>>>>>>> wrote: >> >>>>>>>>>> >> >>>>>>>>>>> Hi all, >> >>>>>>>>>>> >> >>>>>>>>>>> We are thinking about how to align with the Flink community >> and >> >>>>>>>>>>> leverage the FLIP-182 watermark alignment in the Iceberg >> >> source. I >> >>>>>> put some >> >>>>>>>>>>> context in this google doc. Would love to get hear your >> >> thoughts on >> >>>>>> this. >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>> >> >> >> https://docs.google.com/document/d/1zfwF8e5LszazcOzmUAOeOtpM9v8dKEPlY_BRFSmI3us/edit# >> >>>>>>>>>>> >> >>>>>>>>>>> Thanks, >> >>>>>>>>>>> Steven >> >>>>>>>>>>> >> >>>>>>>>>> >> >>>>>> >> >>>>> >> >> >> >