Thanks hongshun for the updating and pretty detailed analysis for edge cases, the updated FLIP looks good to me now.
Only last implementation details about scenario in motivation section: Restart with Changed subscription: During restart, if source options remove a topic or table. The splits which have already assigned can not be removed. Could you clarify how we resolve this in Kafka connector ? Best, Leonard > 2025 10月 9 19:48,Hongshun Wang <[email protected]> 写道: > > Hi devs, > If there are no further suggestions, I will start the voting tomorrow。 > > Best, > Hongshun > > On Fri, Sep 26, 2025 at 7:48 PM Hongshun Wang <[email protected] > <mailto:[email protected]>> wrote: >> Hi Becket and Leonard, >> I have updated the content of this FLIP. The key point is that: >> When the split enumerator receives a split, these splits must have already >> existed in pendingSplitAssignment or assignedSplitments. >> If the split is in pendingSplitAssignments, ignore it. >> If the split is in assignedSplitAssignments but has a different taskId, >> ignore it (this indicates it was already assigned to another task). >> If the split is in assignedSplitAssignments and shares the same taskId, move >> the assignment from assignedSplitments to pendingSplitAssignment to >> re-assign again. >> >> For better understanding why use these strategies. I added some examples and >> pictures to show it. >> >> Would you like to help me check whether there are still some problems? >> >> Best >> Hongshun >> >> >> On Fri, Sep 26, 2025 at 5:08 PM Leonard Xu <[email protected] >> <mailto:[email protected]>> wrote: >>> Thanks Becket and Hongshun for the insightful discussion. >>> >>> The underlying implementation and communication mechanisms of Flink Source >>> indeed involve many intricate details, we discussed the issue of splits >>> re-assignment in specific scenarios, but fortunately, the final decision >>> turned out to be pretty clear. >>> >>> +1 to Becket’s proposal to keeps the framework cleaner and more flexible. >>> +1 to Hongshun’s point to provide comprehensive guidance for connector >>> developers. >>> >>> >>> Best, >>> Leonard >>> >>> >>> >>>> 2025 9月 26 16:30,Hongshun Wang <[email protected] >>>> <mailto:[email protected]>> 写道: >>>> >>>> Hi Becket, >>>> >>>> I Got it. You’re suggesting we should not handle this in the source >>>> framework but instead let the split enumerator manage these three >>>> scenarios. >>>> >>>> Let me explain why I originally favored handling it in the framework: I'm >>>> concerned that connector developers might overlook certain edge cases >>>> (after all, we even payed extensive discussions to fully clarify the logic) >>>> >>>> However, your point keeps the framework cleaner and more flexible. Thus, I >>>> will take it. >>>> >>>> Perhaps, in this FLIP, we should focus on providing comprehensive guidance >>>> for connector developers: explain how to implement a split enumerator, >>>> including the underlying challenges and their solutions. >>>> >>>> Additionally, we can use the Kafka connector as a reference implementation >>>> to demonstrate the practical steps. This way, developers who want to >>>> implement similar connectors can directly reference this example. >>>> >>>> >>>> Best, >>>> Hongshun >>>> >>>> >>>> >>>> On Fri, Sep 26, 2025 at 1:27 PM Becket Qin <[email protected] >>>> <mailto:[email protected]>> wrote: >>>>> It would be good to not expose runtime details to the source >>>>> implementation if possible. >>>>> >>>>> Today, the split enumerator implementations are expected to track the >>>>> split assignment. >>>>> >>>>> Assuming the split enumerator implementation keeps a split assignment >>>>> map, that means the enumerator should already know whether a split is >>>>> assigned or unassigned. So it can handle the three scenarios you >>>>> mentioned. >>>>> >>>>>> The split is reported by a reader during a global restoration. >>>>> The split enumerator should have just been restored / created. If the >>>>> enumerator expects a full reassignment of splits up on global recovery, >>>>> there should be no assigned splits to that reader in the split assignment >>>>> mapping. >>>>> >>>>>> The split is reported by a reader during a partial failure recovery. >>>>> In this case, when SplitEnumerator.addReader() is invoked, the split >>>>> assignment map in the enumerator implementation should already have some >>>>> split assignments for the reader. Therefore it is a partial failover. If >>>>> the source supports split reassignment on recovery, the enumerator can >>>>> assign splits that are different from the reported assignment of that >>>>> reader in the SplitEnumeratorContext, or it can also assign the same >>>>> splits. In any case, the enumerator knows that this is a partial recovery >>>>> because the assignment map is non-empty. >>>>> >>>>>> The split is not reported by a reader, but is assigned after the last >>>>>> successful checkpoint and was never acknowledged. >>>>> This is actually one of the step in the partial failure recover. >>>>> SplitEnumerator.addSplitsBack() will be called first before >>>>> SplitReader.addReader() is called for the recovered reader. When the >>>>> SplitEnumerator.addSplitsBack() is invoked, it is for sure a partial >>>>> recovery. And the enumerator should remove these splits from the split >>>>> assignment map as if they were never assigned. >>>>> >>>>> I think this should work, right? >>>>> >>>>> Thanks, >>>>> >>>>> Jiangjie (Becket) Qin >>>>> >>>>> On Thu, Sep 25, 2025 at 8:34 PM Hongshun Wang <[email protected] >>>>> <mailto:[email protected]>> wrote: >>>>>> Hi Becket and Leonard, >>>>>> >>>>>> Thanks for your advice. >>>>>> >>>>>> > put all the reader information in the SplitEnumerator context >>>>>> I have a concern: the current registeredReaders in >>>>>> SourceCoordinatorContext will be removed after subtaskResetor execution >>>>>> on failure.However, this approach has merit. >>>>>> >>>>>> One more situation I found my previous design does not cover: >>>>>> Initial state: Reader A reports splits (1, 2). >>>>>> Enumerator action: Assigns split 1 to Reader A, and split 2 to Reader B. >>>>>> Failure scenario: Reader A fails before checkpointing. Since this is a >>>>>> partial failure, only Reader A restarts. >>>>>> Recovery issue: Upon recovery, Reader A re-reports split (1). >>>>>> In my previous design, the enumerator will ignore Reader A's >>>>>> re-registration which will cause data loss. >>>>>> >>>>>> Thus, when the enumerator receives a split, the split may originate from >>>>>> three scenarios: >>>>>> The split is reported by a reader during a global restoration. >>>>>> The split is reported by a reader during a partial failure recovery. >>>>>> The split is not reported by a reader, but is assigned after the last >>>>>> successful checkpoint and was never acknowledged. >>>>>> In the first scenario (global restore), the split should be >>>>>> re-distributed. For the latter two scenarios (partial failover and >>>>>> post-checkpoint assignment), we need to reassign the split to its >>>>>> originally assigned subtask. >>>>>> >>>>>> By implementing a method in the SplitEnumerator context to track each >>>>>> assigned split's status, the system can correctly identify and resolve >>>>>> split ownership in all three scenarios.What about adding a >>>>>> `SplitRecoveryType splitRecoveryType(Split split)` in >>>>>> SplitEnumeratorContext. SplitRecoveryTypeis a enum including >>>>>> `UNASSIGNED`、`GLOBAL_RESTORE`、`PARTIAL_FAILOVER` and >>>>>> `POST_CHECKPOINT_ASSIGNMENT`. >>>>>> >>>>>> What do you think? Are there any details or scenarios I haven't >>>>>> considered? Looking forward to your advice. >>>>>> >>>>>> Best, >>>>>> Hongshun >>>>>> >>>>>> On Thu, Sep 11, 2025 at 12:41 AM Becket Qin <[email protected] >>>>>> <mailto:[email protected]>> wrote: >>>>>>> Thanks for the explanation, Hongshun. >>>>>>> >>>>>>> Current pattern of handling new reader registration following: >>>>>>> 1. put all the reader information in the SplitEnumerator context >>>>>>> 2. notify the enumerator about the new reader registration. >>>>>>> 3. Let the split enumerator get whatever information it wants from the >>>>>>> context and do its job. >>>>>>> This pattern decouples the information passing and the reader >>>>>>> registration >>>>>>> notification. This makes the API extensible - we can add more >>>>>>> information >>>>>>> (e.g. reported assigned splits in our case) about the reader to the >>>>>>> context >>>>>>> without introducing new methods. >>>>>>> >>>>>>> Introducing a new method of addSplitBackOnRecovery() is redundant to the >>>>>>> above pattern. Do we really need it? >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> Jiangjie (Becket) Qin >>>>>>> >>>>>>> On Mon, Sep 8, 2025 at 8:18 PM Hongshun Wang <[email protected] >>>>>>> <mailto:[email protected]>> >>>>>>> wrote: >>>>>>> >>>>>>> > Hi Becket, >>>>>>> > >>>>>>> > > I am curious what would the enumerator do differently for the splits >>>>>>> > added via addSplitsBackOnRecovery() V.S. addSplitsBack()? >>>>>>> > >>>>>>> > In this FLIP, there are two distinct scenarios in which the >>>>>>> > enumerator >>>>>>> > receives splits being added back: >>>>>>> > 1. Job-level restore: The job is restored, splits from reader’s >>>>>>> > state are >>>>>>> > reported by ReaderRegistrationEvent. >>>>>>> > 2. Reader-level restart: a reader is started but not the whole job, >>>>>>> > splits assigned to it after the last successful checkpoint. This is >>>>>>> > what >>>>>>> > addSplitsBack used to do. >>>>>>> > >>>>>>> > >>>>>>> > In these two situations, the enumerator will choose different >>>>>>> > strategies. >>>>>>> > 1. Job-level restore: the splits should be redistributed across >>>>>>> > readers >>>>>>> > according to the current partitioner strategy. >>>>>>> > 2. Reader-level restart: the splits should be reassigned directly >>>>>>> > back to >>>>>>> > the same reader they were originally assigned to, preserving locality >>>>>>> > and >>>>>>> > avoiding unnecessary redistribution >>>>>>> > >>>>>>> > Therefore, the enumerator must clearly distinguish between these two >>>>>>> > scenarios.I used to deprecate the former addSplitsBack(List<SplitT> >>>>>>> > splits, int subtaskId) but add a new addSplitsBack(List<SplitT> >>>>>>> > splits, int subtaskId, >>>>>>> > boolean reportedByReader). >>>>>>> > Leonard suggest to use another method addSplitsBackOnRecovery but not >>>>>>> > influenced currently addSplitsBack. >>>>>>> > >>>>>>> > Best >>>>>>> > Hongshun >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > On 2025/09/08 17:20:31 Becket Qin wrote: >>>>>>> > > Hi Leonard, >>>>>>> > > >>>>>>> > > >>>>>>> > > > Could we introduce a new method like addSplitsBackOnRecovery with >>>>>>> > default >>>>>>> > > > implementation. In this way, we can provide better backward >>>>>>> > compatibility >>>>>>> > > > and also makes it easier for developers to understand. >>>>>>> > > >>>>>>> > > >>>>>>> > > I am curious what would the enumerator do differently for the splits >>>>>>> > added >>>>>>> > > via addSplitsBackOnRecovery() V.S. addSplitsBack()? Today, >>>>>>> > addSplitsBack() >>>>>>> > > is also only called upon recovery. So the new method seems >>>>>>> > > confusing. One >>>>>>> > > thing worth clarifying is if the Source implements >>>>>>> > > SupportSplitReassignmentOnRecovery, upon recovery, should the splits >>>>>>> > > reported by the readers also be added back to the SplitEnumerator >>>>>>> > > via the >>>>>>> > > addSplitsBack() call? Or should the SplitEnumerator explicitly >>>>>>> > > query the >>>>>>> > > registered reader information via the SplitEnumeratorContext to get >>>>>>> > > the >>>>>>> > > originally assigned splits when addReader() is invoked? I was >>>>>>> > > assuming >>>>>>> > the >>>>>>> > > latter in the beginning, so the behavior of addSplitsBack() remains >>>>>>> > > unchanged, but I am not opposed in doing the former. >>>>>>> > > >>>>>>> > > Also, can you elaborate on the backwards compatibility issue you >>>>>>> > > see if >>>>>>> > we >>>>>>> > > do not have a separate addSplitsBackOnRecovery() method? Even >>>>>>> > > without >>>>>>> > this >>>>>>> > > new method, the behavior remains exactly the same unless the end >>>>>>> > > users >>>>>>> > > implement the mix-in interface of >>>>>>> > > "SupportSplitReassignmentOnRecovery", >>>>>>> > > right? >>>>>>> > > >>>>>>> > > Thanks, >>>>>>> > > >>>>>>> > > Jiangjie (Becket) Qin >>>>>>> > > >>>>>>> > > On Mon, Sep 8, 2025 at 1:48 AM Hongshun Wang <[email protected] >>>>>>> > > <mailto:[email protected]>> >>>>>>> > > wrote: >>>>>>> > > >>>>>>> > > > Hi devs, >>>>>>> > > > >>>>>>> > > > It has been quite some time since this FLIP[1] was first proposed. >>>>>>> > Thank >>>>>>> > > > you for your valuable feedback—based on your suggestions, the >>>>>>> > > > FLIP has >>>>>>> > > > undergone several rounds of revisions. >>>>>>> > > > >>>>>>> > > > Any more advice is welcome and appreciated. If there are no >>>>>>> > > > further >>>>>>> > > > concerns, I plan to start the vote tomorrow. >>>>>>> > > > >>>>>>> > > > Best >>>>>>> > > > Hongshun >>>>>>> > > > >>>>>>> > > > [1] >>>>>>> > > > >>>>>>> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=373886480 >>>>>>> > > > >>>>>>> > > > On Mon, Sep 8, 2025 at 4:42 PM Hongshun Wang <[email protected] >>>>>>> > > > <mailto:[email protected]>> >>>>>>> > > > wrote: >>>>>>> > > > >>>>>>> > > > > Hi Leonard, >>>>>>> > > > > Thanks for your advice. It makes sense and I have modified it. >>>>>>> > > > > >>>>>>> > > > > Best, >>>>>>> > > > > Hongshun >>>>>>> > > > > >>>>>>> > > > > On Mon, Sep 8, 2025 at 11:40 AM Leonard Xu <[email protected] >>>>>>> > > > > <mailto:[email protected]>> wrote: >>>>>>> > > > > >>>>>>> > > > >> Thanks Hongshun and Becket for the deep discussion. >>>>>>> > > > >> >>>>>>> > > > >> I only have one comment for one API design: >>>>>>> > > > >> >>>>>>> > > > >> > Deprecate the old addSplitsBack method, use a addSplitsBack >>>>>>> > > > >> > with >>>>>>> > > > >> param isReportedByReader instead. Because, The enumerator can >>>>>>> > > > >> apply >>>>>>> > > > >> different reassignment policies based on the context. >>>>>>> > > > >> >>>>>>> > > > >> Could we introduce a new method like *addSplitsBackOnRecovery* >>>>>>> > > > >> with >>>>>>> > > > default >>>>>>> > > > >> implementation. In this way, we can provide better backward >>>>>>> > > > >> compatibility and also makes it easier for developers to >>>>>>> > > > >> understand. >>>>>>> > > > >> >>>>>>> > > > >> Best, >>>>>>> > > > >> Leonard >>>>>>> > > > >> >>>>>>> > > > >> >>>>>>> > > > >> >>>>>>> > > > >> 2025 9月 3 20:26,Hongshun Wang <[email protected] >>>>>>> > > > >> <mailto:[email protected]>> 写道: >>>>>>> > > > >> >>>>>>> > > > >> Hi Becket, >>>>>>> > > > >> >>>>>>> > > > >> I think that's a great idea! I have added the >>>>>>> > > > >> SupportSplitReassignmentOnRecovery interface in this FLIP. If a >>>>>>> > Source >>>>>>> > > > >> implements this interface indicates that the source operator >>>>>>> > > > >> needs >>>>>>> > to >>>>>>> > > > >> report splits to the enumerator and receive reassignment.[1] >>>>>>> > > > >> >>>>>>> > > > >> Best, >>>>>>> > > > >> Hongshun >>>>>>> > > > >> >>>>>>> > > > >> [1] >>>>>>> > > > >> >>>>>>> > > > >>>>>>> > >>>>>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment >>>>>>> > > > >> >>>>>>> > > > >> On Thu, Aug 21, 2025 at 12:09 PM Becket Qin <[email protected] >>>>>>> > > > >> <mailto:[email protected]>> >>>>>>> > > > wrote: >>>>>>> > > > >> >>>>>>> > > > >>> Hi Hongshun, >>>>>>> > > > >>> >>>>>>> > > > >>> I think the convention for such optional features in Source >>>>>>> > > > >>> is via >>>>>>> > > > >>> mix-in interfaces. So instead of adding a method to the >>>>>>> > SourceReader, >>>>>>> > > > maybe >>>>>>> > > > >>> we should introduce an interface >>>>>>> > > > >>> SupportSplitReassingmentOnRecovery >>>>>>> > > > with >>>>>>> > > > >>> this method. If a Source implementation implements that >>>>>>> > > > >>> interface, >>>>>>> > > > then the >>>>>>> > > > >>> SourceOperator will check the desired behavior and act >>>>>>> > > > >>> accordingly. >>>>>>> > > > >>> >>>>>>> > > > >>> Thanks, >>>>>>> > > > >>> >>>>>>> > > > >>> Jiangjie (Becket) Qin >>>>>>> > > > >>> >>>>>>> > > > >>> On Wed, Aug 20, 2025 at 8:52 PM Hongshun Wang < >>>>>>> > [email protected] <mailto:[email protected]> >>>>>>> > > > > >>>>>>> > > > >>> wrote: >>>>>>> > > > >>> >>>>>>> > > > >>>> Hi de vs, >>>>>>> > > > >>>> >>>>>>> > > > >>>> Would anyone like to discuss this FLIP? I'd appreciate your >>>>>>> > feedback >>>>>>> > > > >>>> and suggestions. >>>>>>> > > > >>>> >>>>>>> > > > >>>> Best, >>>>>>> > > > >>>> Hongshun >>>>>>> > > > >>>> >>>>>>> > > > >>>> >>>>>>> > > > >>>> 2025年8月13日 14:23,Hongshun Wang <[email protected] >>>>>>> > > > >>>> <mailto:[email protected]>> 写道: >>>>>>> > > > >>>> >>>>>>> > > > >>>> Hi Becket, >>>>>>> > > > >>>> >>>>>>> > > > >>>> Thank you for your detailed feedback. The new contract makes >>>>>>> > > > >>>> good >>>>>>> > > > sense >>>>>>> > > > >>>> to me and effectively addresses the issues I encountered at >>>>>>> > > > >>>> the >>>>>>> > > > beginning >>>>>>> > > > >>>> of the design. >>>>>>> > > > >>>> >>>>>>> > > > >>>> That said, I recommend not reporting splits by default, >>>>>>> > > > >>>> primarily >>>>>>> > for >>>>>>> > > > >>>> compatibility and practical reasons: >>>>>>> > > > >>>> >>>>>>> > > > >>>> > For these reasons, we do not expect the Split objects to >>>>>>> > > > >>>> > be >>>>>>> > huge, >>>>>>> > > > >>>> and we are not trying to design for huge Split objects >>>>>>> > > > >>>> either as >>>>>>> > they >>>>>>> > > > will >>>>>>> > > > >>>> have problems even today. >>>>>>> > > > >>>> >>>>>>> > > > >>>> 1. >>>>>>> > > > >>>> >>>>>>> > > > >>>> Not all existing connector match this rule >>>>>>> > > > >>>> For example, in mysql cdc connector, a binlog split may >>>>>>> > > > >>>> contain >>>>>>> > > > >>>> hundreds (or even more) snapshot split completion >>>>>>> > > > >>>> records. This >>>>>>> > > > state is >>>>>>> > > > >>>> large and is currently transmitted incrementally through >>>>>>> > multiple >>>>>>> > > > >>>> BinlogSplitMetaEvent messages. Since the binlog reader >>>>>>> > > > >>>> operates >>>>>>> > > > >>>> with single parallelism, reporting the full split state on >>>>>>> > recovery >>>>>>> > > > >>>> could be inefficient or even infeasible. >>>>>>> > > > >>>> For such sources, it would be better to provide a >>>>>>> > > > >>>> mechanism to >>>>>>> > skip >>>>>>> > > > >>>> split reporting during restart until they redesign and >>>>>>> > > > >>>> reduce >>>>>>> > the >>>>>>> > > > >>>> split size. >>>>>>> > > > >>>> 2. >>>>>>> > > > >>>> >>>>>>> > > > >>>> Not all enumerators maintain unassigned splits in state. >>>>>>> > > > >>>> Some SplitEnumerator(such as kafka connector) >>>>>>> > > > >>>> implementations >>>>>>> > do >>>>>>> > > > >>>> not track or persistently manage unassigned splits. >>>>>>> > > > >>>> Requiring >>>>>>> > them >>>>>>> > > > to >>>>>>> > > > >>>> handle re-registration would add unnecessary complexity. >>>>>>> > > > >>>> Even >>>>>>> > > > though we >>>>>>> > > > >>>> maybe implements in kafka connector, currently, kafka >>>>>>> > > > >>>> connector >>>>>>> > is >>>>>>> > > > decouple >>>>>>> > > > >>>> with flink version, we also need to make sure the elder >>>>>>> > > > >>>> version >>>>>>> > is >>>>>>> > > > >>>> compatible. >>>>>>> > > > >>>> >>>>>>> > > > >>>> ------------------------------ >>>>>>> > > > >>>> >>>>>>> > > > >>>> To address these concerns, I propose introducing a new >>>>>>> > > > >>>> method: >>>>>>> > boolean >>>>>>> > > > >>>> SourceReader#shouldReassignSplitsOnRecovery() with a default >>>>>>> > > > >>>> implementation returning false. This allows source readers >>>>>>> > > > >>>> to opt >>>>>>> > in >>>>>>> > > > >>>> to split reassignment only when necessary. Since the new >>>>>>> > > > >>>> contract >>>>>>> > > > already >>>>>>> > > > >>>> places the responsibility for split assignment on the >>>>>>> > > > >>>> enumerator, >>>>>>> > not >>>>>>> > > > >>>> reporting splits by default is a safe and clean default >>>>>>> > > > >>>> behavior. >>>>>>> > > > >>>> >>>>>>> > > > >>>> >>>>>>> > > > >>>> ------------------------------ >>>>>>> > > > >>>> >>>>>>> > > > >>>> >>>>>>> > > > >>>> I’ve updated the implementation and the FIP accordingly[1]. >>>>>>> > > > >>>> It >>>>>>> > quite a >>>>>>> > > > >>>> big change. In particular, for the Kafka connector, we can >>>>>>> > > > >>>> now use >>>>>>> > a >>>>>>> > > > >>>> pluggable SplitPartitioner to support different split >>>>>>> > > > >>>> assignment >>>>>>> > > > >>>> strategies (e.g., default, round-robin). >>>>>>> > > > >>>> >>>>>>> > > > >>>> >>>>>>> > > > >>>> Could you please review it when you have a chance? >>>>>>> > > > >>>> >>>>>>> > > > >>>> >>>>>>> > > > >>>> Best, >>>>>>> > > > >>>> >>>>>>> > > > >>>> Hongshun >>>>>>> > > > >>>> >>>>>>> > > > >>>> >>>>>>> > > > >>>> [1] >>>>>>> > > > >>>> >>>>>>> > > > >>>>>>> > >>>>>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment >>>>>>> > > > >>>> >>>>>>> > > > >>>> On Sat, Aug 9, 2025 at 3:03 AM Becket Qin <[email protected] >>>>>>> > > > >>>> <mailto:[email protected]>> >>>>>>> > > > wrote: >>>>>>> > > > >>>> >>>>>>> > > > >>>>> Hi Hongshun, >>>>>>> > > > >>>>> >>>>>>> > > > >>>>> I am not too concerned about the transmission cost. Because >>>>>>> > > > >>>>> the >>>>>>> > full >>>>>>> > > > >>>>> split transmission has to happen in the initial assignment >>>>>>> > > > >>>>> phase >>>>>>> > > > already. >>>>>>> > > > >>>>> And in the future, we probably want to also introduce some >>>>>>> > > > >>>>> kind >>>>>>> > of >>>>>>> > > > workload >>>>>>> > > > >>>>> balance across source readers, e.g. based on the per-split >>>>>>> > > > throughput or >>>>>>> > > > >>>>> the per-source-reader workload in heterogeneous clusters. >>>>>>> > > > >>>>> For >>>>>>> > these >>>>>>> > > > >>>>> reasons, we do not expect the Split objects to be huge, and >>>>>>> > > > >>>>> we >>>>>>> > are >>>>>>> > > > not >>>>>>> > > > >>>>> trying to design for huge Split objects either as they will >>>>>>> > > > >>>>> have >>>>>>> > > > problems >>>>>>> > > > >>>>> even today. >>>>>>> > > > >>>>> >>>>>>> > > > >>>>> Good point on the potential split loss, please see the reply >>>>>>> > below: >>>>>>> > > > >>>>> >>>>>>> > > > >>>>> Scenario 2: >>>>>>> > > > >>>>> >>>>>>> > > > >>>>> >>>>>>> > > > >>>>>> 1. Reader A reports splits (1 and 2), and Reader B reports >>>>>>> > > > >>>>>> (3 >>>>>>> > and 4) >>>>>>> > > > >>>>>> upon restart. >>>>>>> > > > >>>>>> 2. Before the enumerator receives all reports and performs >>>>>>> > > > >>>>>> reassignment, a checkpoint is triggered. >>>>>>> > > > >>>>>> 3. Since no splits have been reassigned yet, both readers >>>>>>> > > > >>>>>> have >>>>>>> > empty >>>>>>> > > > >>>>>> states. >>>>>>> > > > >>>>>> 4. When restarting from this checkpoint, all four splits >>>>>>> > > > >>>>>> are >>>>>>> > lost. >>>>>>> > > > >>>>> >>>>>>> > > > >>>>> The reader registration happens in the >>>>>>> > > > >>>>> SourceOperator.open(), >>>>>>> > which >>>>>>> > > > >>>>> means the task is still in the initializing state, >>>>>>> > > > >>>>> therefore the >>>>>>> > > > checkpoint >>>>>>> > > > >>>>> should not be triggered until the enumerator receives all >>>>>>> > > > >>>>> the >>>>>>> > split >>>>>>> > > > reports. >>>>>>> > > > >>>>> >>>>>>> > > > >>>>> There is a nuance here. Today, the RPC call from the TM to >>>>>>> > > > >>>>> the JM >>>>>>> > is >>>>>>> > > > >>>>> async. So it is possible that the SourceOpertor.open() has >>>>>>> > returned, >>>>>>> > > > but >>>>>>> > > > >>>>> the enumerator has not received the split reports. However, >>>>>>> > because >>>>>>> > > > the >>>>>>> > > > >>>>> task status update RPC call goes to the same channel as the >>>>>>> > > > >>>>> split >>>>>>> > > > reports >>>>>>> > > > >>>>> call, so the task status RPC call will happen after the >>>>>>> > > > >>>>> split >>>>>>> > > > reports call >>>>>>> > > > >>>>> on the JM side. Therefore, on the JM side, the >>>>>>> > > > >>>>> SourceCoordinator >>>>>>> > will >>>>>>> > > > >>>>> always first receive the split reports, then receive the >>>>>>> > checkpoint >>>>>>> > > > request. >>>>>>> > > > >>>>> This "happen before" relationship is kind of important to >>>>>>> > guarantee >>>>>>> > > > >>>>> the consistent state between enumerator and readers. >>>>>>> > > > >>>>> >>>>>>> > > > >>>>> Scenario 1: >>>>>>> > > > >>>>> >>>>>>> > > > >>>>> >>>>>>> > > > >>>>>> 1. Upon restart, Reader A reports assigned splits (1 and >>>>>>> > > > >>>>>> 2), and >>>>>>> > > > >>>>>> Reader B reports (3 and 4). >>>>>>> > > > >>>>>> 2. The enumerator receives these reports but only reassigns >>>>>>> > splits 1 >>>>>>> > > > >>>>>> and 2 — not 3 and 4. >>>>>>> > > > >>>>>> 3. A checkpoint or savepoint is then triggered. Only >>>>>>> > > > >>>>>> splits 1 >>>>>>> > and 2 >>>>>>> > > > >>>>>> are recorded in the reader states; splits 3 and 4 are not >>>>>>> > persisted. >>>>>>> > > > >>>>>> 4. If the job is later restarted from this checkpoint, >>>>>>> > > > >>>>>> splits 3 >>>>>>> > and >>>>>>> > > > 4 >>>>>>> > > > >>>>>> will be permanently lost. >>>>>>> > > > >>>>> >>>>>>> > > > >>>>> This scenario is possible. One solution is to let the >>>>>>> > > > >>>>> enumerator >>>>>>> > > > >>>>> implementation handle this. That means if the enumerator >>>>>>> > > > >>>>> relies >>>>>>> > on >>>>>>> > > > the >>>>>>> > > > >>>>> initial split reports from the source readers, it should >>>>>>> > > > >>>>> maintain >>>>>>> > > > these >>>>>>> > > > >>>>> reports by itself. In the above example, the enumerator >>>>>>> > > > >>>>> will need >>>>>>> > to >>>>>>> > > > >>>>> remember that 3 and 4 are not assigned and put it into its >>>>>>> > > > >>>>> own >>>>>>> > state. >>>>>>> > > > >>>>> The current contract is that anything assigned to the >>>>>>> > SourceReaders >>>>>>> > > > >>>>> are completely owned by the SourceReaders. Enumerators can >>>>>>> > remember >>>>>>> > > > the >>>>>>> > > > >>>>> assignments but cannot change them, even when the source >>>>>>> > > > >>>>> reader >>>>>>> > > > recovers / >>>>>>> > > > >>>>> restarts. >>>>>>> > > > >>>>> With this FLIP, the contract becomes that the source >>>>>>> > > > >>>>> readers will >>>>>>> > > > >>>>> return the ownership of the splits to the enumerator. So the >>>>>>> > > > enumerator is >>>>>>> > > > >>>>> responsible for maintaining these splits, until they are >>>>>>> > > > >>>>> assigned >>>>>>> > to >>>>>>> > > > a >>>>>>> > > > >>>>> source reader again. >>>>>>> > > > >>>>> >>>>>>> > > > >>>>> There are other cases where there may be conflict >>>>>>> > > > >>>>> information >>>>>>> > between >>>>>>> > > > >>>>> reader and enumerator. For example, consider the following >>>>>>> > sequence: >>>>>>> > > > >>>>> 1. reader A reports splits (1 and 2) up on restart. >>>>>>> > > > >>>>> 2. enumerator receives the report and assigns both 1 and 2 >>>>>>> > > > >>>>> to >>>>>>> > reader >>>>>>> > > > B. >>>>>>> > > > >>>>> 3. reader A failed before checkpointing. And this is a >>>>>>> > > > >>>>> partial >>>>>>> > > > >>>>> failure, so only reader A restarts. >>>>>>> > > > >>>>> 4. When reader A recovers, it will again report splits (1 >>>>>>> > > > >>>>> and 2) >>>>>>> > to >>>>>>> > > > >>>>> the enumerator. >>>>>>> > > > >>>>> 5. The enumerator should ignore this report because it has >>>>>>> > > > >>>>> assigned splits (1 and 2) to reader B. >>>>>>> > > > >>>>> >>>>>>> > > > >>>>> So with the new contract, the enumerator should be the >>>>>>> > > > >>>>> source of >>>>>>> > > > truth >>>>>>> > > > >>>>> for split ownership. >>>>>>> > > > >>>>> >>>>>>> > > > >>>>> Thanks, >>>>>>> > > > >>>>> >>>>>>> > > > >>>>> Jiangjie (Becket) Qin >>>>>>> > > > >>>>> >>>>>>> > > > >>>>> On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang < >>>>>>> > > > [email protected] <mailto:[email protected]>> >>>>>>> > > > >>>>> wrote: >>>>>>> > > > >>>>> >>>>>>> > > > >>>>>> Hi Becket, >>>>>>> > > > >>>>>> >>>>>>> > > > >>>>>> >>>>>>> > > > >>>>>> I did consider this approach at the beginning (and it was >>>>>>> > > > >>>>>> also >>>>>>> > > > >>>>>> mentioned in this FLIP), since it would allow more >>>>>>> > > > >>>>>> flexibility >>>>>>> > in >>>>>>> > > > >>>>>> reassigning all splits. However, there are a few potential >>>>>>> > issues. >>>>>>> > > > >>>>>> >>>>>>> > > > >>>>>> 1. High Transmission Cost >>>>>>> > > > >>>>>> If we pass the full split objects (rather than just split >>>>>>> > > > >>>>>> IDs), >>>>>>> > the >>>>>>> > > > >>>>>> data size could be significant, leading to high overhead >>>>>>> > > > >>>>>> during >>>>>>> > > > >>>>>> transmission — especially when many splits are involved. >>>>>>> > > > >>>>>> >>>>>>> > > > >>>>>> 2. Risk of Split Loss >>>>>>> > > > >>>>>> >>>>>>> > > > >>>>>> Risk of split loss exists unless we have a mechanism to >>>>>>> > > > >>>>>> make >>>>>>> > sure >>>>>>> > > > >>>>>> only can checkpoint after all the splits are reassigned. >>>>>>> > > > >>>>>> There are scenarios where splits could be lost due to >>>>>>> > inconsistent >>>>>>> > > > >>>>>> state handling during recovery: >>>>>>> > > > >>>>>> >>>>>>> > > > >>>>>> >>>>>>> > > > >>>>>> Scenario 1: >>>>>>> > > > >>>>>> >>>>>>> > > > >>>>>> >>>>>>> > > > >>>>>> 1. Upon restart, Reader A reports assigned splits (1 >>>>>>> > > > >>>>>> and 2), >>>>>>> > and >>>>>>> > > > >>>>>> Reader B reports (3 and 4). >>>>>>> > > > >>>>>> 2. The enumerator receives these reports but only >>>>>>> > > > >>>>>> reassigns >>>>>>> > > > >>>>>> splits 1 and 2 — not 3 and 4. >>>>>>> > > > >>>>>> 3. A checkpoint or savepoint is then triggered. Only >>>>>>> > > > >>>>>> splits 1 >>>>>>> > and >>>>>>> > > > >>>>>> 2 are recorded in the reader states; splits 3 and 4 are >>>>>>> > > > >>>>>> not >>>>>>> > > > persisted. >>>>>>> > > > >>>>>> 4. If the job is later restarted from this checkpoint, >>>>>>> > > > >>>>>> splits >>>>>>> > 3 >>>>>>> > > > >>>>>> and 4 will be permanently lost. >>>>>>> > > > >>>>>> >>>>>>> > > > >>>>>> >>>>>>> > > > >>>>>> Scenario 2: >>>>>>> > > > >>>>>> >>>>>>> > > > >>>>>> 1. Reader A reports splits (1 and 2), and Reader B >>>>>>> > > > >>>>>> reports (3 >>>>>>> > and >>>>>>> > > > >>>>>> 4) upon restart. >>>>>>> > > > >>>>>> 2. Before the enumerator receives all reports and >>>>>>> > > > >>>>>> performs >>>>>>> > > > >>>>>> reassignment, a checkpoint is triggered. >>>>>>> > > > >>>>>> 3. Since no splits have been reassigned yet, both >>>>>>> > > > >>>>>> readers >>>>>>> > have >>>>>>> > > > >>>>>> empty states. >>>>>>> > > > >>>>>> 4. When restarting from this checkpoint, all four >>>>>>> > > > >>>>>> splits are >>>>>>> > > > lost. >>>>>>> > > > >>>>>> >>>>>>> > > > >>>>>> >>>>>>> > > > >>>>>> Let me know if you have thoughts on how we might mitigate >>>>>>> > > > >>>>>> these >>>>>>> > > > risks! >>>>>>> > > > >>>>>> >>>>>>> > > > >>>>>> Best >>>>>>> > > > >>>>>> Hongshun >>>>>>> > > > >>>>>> >>>>>>> > > > >>>>>> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin <[email protected] >>>>>>> > > > >>>>>> <mailto:[email protected]>> >>>>>>> > > > >>>>>> wrote: >>>>>>> > > > >>>>>> >>>>>>> > > > >>>>>>> Hi Hongshun, >>>>>>> > > > >>>>>>> >>>>>>> > > > >>>>>>> The steps sound reasonable to me in general. In terms of >>>>>>> > > > >>>>>>> the >>>>>>> > > > updated >>>>>>> > > > >>>>>>> FLIP wiki, it would be good to see if we can keep the >>>>>>> > > > >>>>>>> protocol >>>>>>> > > > simple. One >>>>>>> > > > >>>>>>> alternative way to achieve this behavior is following: >>>>>>> > > > >>>>>>> >>>>>>> > > > >>>>>>> 1. Upon SourceOperator startup, the SourceOperator sends >>>>>>> > > > >>>>>>> ReaderRegistrationEvent with the currently assigned >>>>>>> > > > >>>>>>> splits to >>>>>>> > the >>>>>>> > > > >>>>>>> enumerator. It does not add these splits to the >>>>>>> > > > >>>>>>> SourceReader. >>>>>>> > > > >>>>>>> 2. The enumerator will always use the >>>>>>> > > > >>>>>>> SourceEnumeratorContext.assignSplits() to assign the >>>>>>> > > > >>>>>>> splits. >>>>>>> > (not >>>>>>> > > > via the >>>>>>> > > > >>>>>>> response of the SourceRegistrationEvent, this allows async >>>>>>> > split >>>>>>> > > > assignment >>>>>>> > > > >>>>>>> in case the enumerator wants to wait until all the >>>>>>> > > > >>>>>>> readers are >>>>>>> > > > registered) >>>>>>> > > > >>>>>>> 3. The SourceOperator will only call >>>>>>> > > > >>>>>>> SourceReader.addSplits() >>>>>>> > when >>>>>>> > > > >>>>>>> it receives the AddSplitEvent from the enumerator. >>>>>>> > > > >>>>>>> >>>>>>> > > > >>>>>>> This protocol has a few benefits: >>>>>>> > > > >>>>>>> 1. it basically allows arbitrary split reassignment upon >>>>>>> > restart >>>>>>> > > > >>>>>>> 2. simplicity: there is only one way to assign splits. >>>>>>> > > > >>>>>>> >>>>>>> > > > >>>>>>> So we only need one interface change: >>>>>>> > > > >>>>>>> - add the initially assigned splits to ReaderInfo so the >>>>>>> > Enumerator >>>>>>> > > > >>>>>>> can access it. >>>>>>> > > > >>>>>>> and one behavior change: >>>>>>> > > > >>>>>>> - The SourceOperator should stop assigning splits to the >>>>>>> > > > >>>>>>> from >>>>>>> > state >>>>>>> > > > >>>>>>> restoration, but >>>>>>> > [message truncated...] >>>>>>> > >>>
