Hi Leonard, If the SplitEnumerator received all splits after a restart, it becomes straightforward to clear and un-assign the unmatched splits(checking whether matches the source options). However, a key question arises: *should automatically discard obsolete splits, or explicitly notify the user via an exception?*
We provided a option `scan.partition-unsubscribe.strategy`: 1. If Strict, throws an exception when encountering removed splits. 2. If Lenient, automatically removes obsolete splits silently. What Do you think? Best, Hongshun On Thu, Oct 9, 2025 at 9:37 PM Leonard Xu <[email protected]> wrote: > Thanks hongshun for the updating and pretty detailed analysis for edge > cases, the updated FLIP looks good to me now. > > Only last implementation details about scenario in motivation section: > > *Restart with Changed subscription: During restart, if source options > remove a topic or table. The splits which have already assigned can not be > removed.* > > Could you clarify how we resolve this in Kafka connector ? > > Best, > Leonard > > > > 2025 10月 9 19:48,Hongshun Wang <[email protected]> 写道: > > Hi devs, > If there are no further suggestions, I will start the voting tomorrow。 > > Best, > Hongshun > > On Fri, Sep 26, 2025 at 7:48 PM Hongshun Wang <[email protected]> > wrote: > >> Hi Becket and Leonard, >> >> I have updated the content of this FLIP. The key point is that: >> >> When the split enumerator receives a split, *these splits must have >> already existed in pendingSplitAssignment or assignedSplitments*. >> >> - If the split is in pendingSplitAssignments, ignore it. >> - If the split is in assignedSplitAssignments but has a different >> taskId, ignore it (this indicates it was already assigned to another >> task). >> - If the split is in assignedSplitAssignments and shares the same >> taskId, move the assignment from assignedSplitments to >> pendingSplitAssignment >> to re-assign again. >> >> >> For better understanding why use these strategies. I added some examples >> and pictures to show it. >> >> Would you like to help me check whether there are still some problems? >> >> Best >> Hongshun >> >> >> >> On Fri, Sep 26, 2025 at 5:08 PM Leonard Xu <[email protected]> wrote: >> >>> Thanks Becket and Hongshun for the insightful discussion. >>> >>> The underlying implementation and communication mechanisms of Flink >>> Source indeed involve many intricate details, we discussed the issue of >>> splits re-assignment in specific scenarios, but fortunately, the final >>> decision turned out to be pretty clear. >>> >>> +1 to Becket’s proposal to keeps the framework cleaner and more >>> flexible. >>> +1 to Hongshun’s point to provide comprehensive guidance for connector >>> developers. >>> >>> >>> Best, >>> Leonard >>> >>> >>> >>> 2025 9月 26 16:30,Hongshun Wang <[email protected]> 写道: >>> >>> Hi Becket, >>> >>> I Got it. You’re suggesting we should not handle this in the source >>> framework but instead let the split enumerator manage these three scenarios. >>> >>> Let me explain why I originally favored handling it in the framework: >>> I'm concerned that connector developers might overlook certain edge cases >>> (after all, we even payed extensive discussions to fully clarify the logic) >>> >>> However, your point keeps the framework cleaner and more flexible. Thus, >>> I will take it. >>> >>> Perhaps, in this FLIP, we should focus on providing comprehensive >>> guidance for connector developers: explain how to implement a split >>> enumerator, including the underlying challenges and their solutions. >>> >>> >>> Additionally, we can use the Kafka connector as a reference >>> implementation to demonstrate the practical steps. This way, developers who >>> want to implement similar connectors can directly reference this example. >>> >>> >>> Best, >>> Hongshun >>> >>> >>> >>> On Fri, Sep 26, 2025 at 1:27 PM Becket Qin <[email protected]> wrote: >>> >>>> It would be good to not expose runtime details to the source >>>> implementation if possible. >>>> >>>> Today, the split enumerator implementations are expected to track the >>>> split assignment. >>>> >>>> Assuming the split enumerator implementation keeps a split assignment >>>> map, that means the enumerator should already know whether a split is >>>> assigned or unassigned. So it can handle the three scenarios you mentioned. >>>> >>>> The split is reported by a reader during a global restoration. >>>>> >>>> The split enumerator should have just been restored / created. If the >>>> enumerator expects a full reassignment of splits up on global recovery, >>>> there should be no assigned splits to that reader in the split assignment >>>> mapping. >>>> >>>> The split is reported by a reader during a partial failure recovery. >>>>> >>>> In this case, when SplitEnumerator.addReader() is invoked, the split >>>> assignment map in the enumerator implementation should already have some >>>> split assignments for the reader. Therefore it is a partial failover. If >>>> the source supports split reassignment on recovery, the enumerator can >>>> assign splits that are different from the reported assignment of that >>>> reader in the SplitEnumeratorContext, or it can also assign the same >>>> splits. In any case, the enumerator knows that this is a partial recovery >>>> because the assignment map is non-empty. >>>> >>>> The split is not reported by a reader, but is assigned after the last >>>>> successful checkpoint and was never acknowledged. >>>> >>>> This is actually one of the step in the partial failure recover. >>>> SplitEnumerator.addSplitsBack() will be called first before >>>> SplitReader.addReader() is called for the recovered reader. When the >>>> SplitEnumerator.addSplitsBack() is invoked, it is for sure a partial >>>> recovery. And the enumerator should remove these splits from the split >>>> assignment map as if they were never assigned. >>>> >>>> I think this should work, right? >>>> >>>> Thanks, >>>> >>>> Jiangjie (Becket) Qin >>>> >>>> On Thu, Sep 25, 2025 at 8:34 PM Hongshun Wang <[email protected]> >>>> wrote: >>>> >>>>> Hi Becket and Leonard, >>>>> >>>>> Thanks for your advice. >>>>> >>>>> > put all the reader information in the SplitEnumerator context >>>>> I have a concern: the current registeredReaders in* >>>>> SourceCoordinatorContext will be removed after subtaskResetor execution on >>>>> failure*.However, this approach has merit. >>>>> >>>>> One more situation I found my previous design does not cover: >>>>> >>>>> 1. Initial state: Reader A reports splits (1, 2). >>>>> 2. Enumerator action: Assigns split 1 to Reader A, and split 2 to >>>>> Reader B. >>>>> 3. Failure scenario: Reader A fails before checkpointing. Since >>>>> this is a partial failure, only Reader A restarts. >>>>> 4. Recovery issue: Upon recovery, Reader A re-reports split (1). >>>>> >>>>> In my previous design, the enumerator will ignore Reader A's >>>>> re-registration which will cause data loss. >>>>> >>>>> Thus, when the enumerator receives a split, the split may originate >>>>> from three scenarios: >>>>> >>>>> 1. The split is reported by a reader during a global restoration. >>>>> 2. The split is reported by a reader during a partial failure >>>>> recovery. >>>>> 3. The split is not reported by a reader, but is assigned after >>>>> the last successful checkpoint and was never acknowledged. >>>>> >>>>> In the first scenario (global restore), the split should >>>>> be re-distributed. For the latter two scenarios (partial failover and >>>>> post-checkpoint assignment), we need to reassign the split to >>>>> its originally assigned subtask. >>>>> >>>>> By implementing a method in the SplitEnumerator context to track each >>>>> assigned split's status, the system can correctly identify and resolve >>>>> split ownership in all three scenarios.*What about adding a >>>>> `SplitRecoveryType splitRecoveryType(Split split)` in >>>>> SplitEnumeratorContext.* SplitRecoveryTypeis a enum including >>>>> `UNASSIGNED`、`GLOBAL_RESTORE`、`PARTIAL_FAILOVER` and >>>>> `POST_CHECKPOINT_ASSIGNMENT`. >>>>> >>>>> What do you think? Are there any details or scenarios I haven't >>>>> considered? Looking forward to your advice. >>>>> >>>>> Best, >>>>> Hongshun >>>>> >>>>> On Thu, Sep 11, 2025 at 12:41 AM Becket Qin <[email protected]> >>>>> wrote: >>>>> >>>>>> Thanks for the explanation, Hongshun. >>>>>> >>>>>> Current pattern of handling new reader registration following: >>>>>> 1. put all the reader information in the SplitEnumerator context >>>>>> 2. notify the enumerator about the new reader registration. >>>>>> 3. Let the split enumerator get whatever information it wants from the >>>>>> context and do its job. >>>>>> This pattern decouples the information passing and the reader >>>>>> registration >>>>>> notification. This makes the API extensible - we can add more >>>>>> information >>>>>> (e.g. reported assigned splits in our case) about the reader to the >>>>>> context >>>>>> without introducing new methods. >>>>>> >>>>>> Introducing a new method of addSplitBackOnRecovery() is redundant to >>>>>> the >>>>>> above pattern. Do we really need it? >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Jiangjie (Becket) Qin >>>>>> >>>>>> On Mon, Sep 8, 2025 at 8:18 PM Hongshun Wang <[email protected] >>>>>> > >>>>>> wrote: >>>>>> >>>>>> > Hi Becket, >>>>>> > >>>>>> > > I am curious what would the enumerator do differently for the >>>>>> splits >>>>>> > added via addSplitsBackOnRecovery() V.S. addSplitsBack()? >>>>>> > >>>>>> > In this FLIP, there are two distinct scenarios in which the >>>>>> enumerator >>>>>> > receives splits being added back: >>>>>> > 1. Job-level restore: The job is restored, splits from reader’s >>>>>> state are >>>>>> > reported by ReaderRegistrationEvent. >>>>>> > 2. Reader-level restart: a reader is started but not the whole >>>>>> job, >>>>>> > splits assigned to it after the last successful checkpoint. This >>>>>> is what >>>>>> > addSplitsBack used to do. >>>>>> > >>>>>> > >>>>>> > In these two situations, the enumerator will choose different >>>>>> strategies. >>>>>> > 1. Job-level restore: the splits should be redistributed across >>>>>> readers >>>>>> > according to the current partitioner strategy. >>>>>> > 2. Reader-level restart: the splits should be reassigned directly >>>>>> back to >>>>>> > the same reader they were originally assigned to, preserving >>>>>> locality and >>>>>> > avoiding unnecessary redistribution >>>>>> > >>>>>> > Therefore, the enumerator must clearly distinguish between these two >>>>>> > scenarios.I used to deprecate the former addSplitsBack(List<SplitT> >>>>>> > splits, int subtaskId) but add a new addSplitsBack(List<SplitT> >>>>>> > splits, int subtaskId, >>>>>> > boolean reportedByReader). >>>>>> > Leonard suggest to use another method addSplitsBackOnRecovery but >>>>>> not >>>>>> > influenced currently addSplitsBack. >>>>>> > >>>>>> > Best >>>>>> > Hongshun >>>>>> > >>>>>> > >>>>>> > >>>>>> > On 2025/09/08 17:20:31 Becket Qin wrote: >>>>>> > > Hi Leonard, >>>>>> > > >>>>>> > > >>>>>> > > > Could we introduce a new method like addSplitsBackOnRecovery >>>>>> with >>>>>> > default >>>>>> > > > implementation. In this way, we can provide better backward >>>>>> > compatibility >>>>>> > > > and also makes it easier for developers to understand. >>>>>> > > >>>>>> > > >>>>>> > > I am curious what would the enumerator do differently for the >>>>>> splits >>>>>> > added >>>>>> > > via addSplitsBackOnRecovery() V.S. addSplitsBack()? Today, >>>>>> > addSplitsBack() >>>>>> > > is also only called upon recovery. So the new method seems >>>>>> confusing. One >>>>>> > > thing worth clarifying is if the Source implements >>>>>> > > SupportSplitReassignmentOnRecovery, upon recovery, should the >>>>>> splits >>>>>> > > reported by the readers also be added back to the SplitEnumerator >>>>>> via the >>>>>> > > addSplitsBack() call? Or should the SplitEnumerator explicitly >>>>>> query the >>>>>> > > registered reader information via the SplitEnumeratorContext to >>>>>> get the >>>>>> > > originally assigned splits when addReader() is invoked? I was >>>>>> assuming >>>>>> > the >>>>>> > > latter in the beginning, so the behavior of addSplitsBack() >>>>>> remains >>>>>> > > unchanged, but I am not opposed in doing the former. >>>>>> > > >>>>>> > > Also, can you elaborate on the backwards compatibility issue you >>>>>> see if >>>>>> > we >>>>>> > > do not have a separate addSplitsBackOnRecovery() method? Even >>>>>> without >>>>>> > this >>>>>> > > new method, the behavior remains exactly the same unless the end >>>>>> users >>>>>> > > implement the mix-in interface of >>>>>> "SupportSplitReassignmentOnRecovery", >>>>>> > > right? >>>>>> > > >>>>>> > > Thanks, >>>>>> > > >>>>>> > > Jiangjie (Becket) Qin >>>>>> > > >>>>>> > > On Mon, Sep 8, 2025 at 1:48 AM Hongshun Wang <[email protected]> >>>>>> > > wrote: >>>>>> > > >>>>>> > > > Hi devs, >>>>>> > > > >>>>>> > > > It has been quite some time since this FLIP[1] was first >>>>>> proposed. >>>>>> > Thank >>>>>> > > > you for your valuable feedback—based on your suggestions, the >>>>>> FLIP has >>>>>> > > > undergone several rounds of revisions. >>>>>> > > > >>>>>> > > > Any more advice is welcome and appreciated. If there are no >>>>>> further >>>>>> > > > concerns, I plan to start the vote tomorrow. >>>>>> > > > >>>>>> > > > Best >>>>>> > > > Hongshun >>>>>> > > > >>>>>> > > > [1] >>>>>> > > > >>>>>> > >>>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=373886480 >>>>>> > > > >>>>>> > > > On Mon, Sep 8, 2025 at 4:42 PM Hongshun Wang <[email protected]> >>>>>> > > > wrote: >>>>>> > > > >>>>>> > > > > Hi Leonard, >>>>>> > > > > Thanks for your advice. It makes sense and I have modified >>>>>> it. >>>>>> > > > > >>>>>> > > > > Best, >>>>>> > > > > Hongshun >>>>>> > > > > >>>>>> > > > > On Mon, Sep 8, 2025 at 11:40 AM Leonard Xu <[email protected]> >>>>>> wrote: >>>>>> > > > > >>>>>> > > > >> Thanks Hongshun and Becket for the deep discussion. >>>>>> > > > >> >>>>>> > > > >> I only have one comment for one API design: >>>>>> > > > >> >>>>>> > > > >> > Deprecate the old addSplitsBack method, use a >>>>>> addSplitsBack with >>>>>> > > > >> param isReportedByReader instead. Because, The enumerator >>>>>> can apply >>>>>> > > > >> different reassignment policies based on the context. >>>>>> > > > >> >>>>>> > > > >> Could we introduce a new method like >>>>>> *addSplitsBackOnRecovery* with >>>>>> > > > default >>>>>> > > > >> implementation. In this way, we can provide better backward >>>>>> > > > >> compatibility and also makes it easier for developers to >>>>>> understand. >>>>>> > > > >> >>>>>> > > > >> Best, >>>>>> > > > >> Leonard >>>>>> > > > >> >>>>>> > > > >> >>>>>> > > > >> >>>>>> > > > >> 2025 9月 3 20:26,Hongshun Wang <[email protected]> 写道: >>>>>> > > > >> >>>>>> > > > >> Hi Becket, >>>>>> > > > >> >>>>>> > > > >> I think that's a great idea! I have added the >>>>>> > > > >> SupportSplitReassignmentOnRecovery interface in this FLIP. >>>>>> If a >>>>>> > Source >>>>>> > > > >> implements this interface indicates that the source operator >>>>>> needs >>>>>> > to >>>>>> > > > >> report splits to the enumerator and receive reassignment.[1] >>>>>> > > > >> >>>>>> > > > >> Best, >>>>>> > > > >> Hongshun >>>>>> > > > >> >>>>>> > > > >> [1] >>>>>> > > > >> >>>>>> > > > >>>>>> > >>>>>> > >>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment >>>>>> > > > >> >>>>>> > > > >> On Thu, Aug 21, 2025 at 12:09 PM Becket Qin <[email protected] >>>>>> > >>>>>> > > > wrote: >>>>>> > > > >> >>>>>> > > > >>> Hi Hongshun, >>>>>> > > > >>> >>>>>> > > > >>> I think the convention for such optional features in Source >>>>>> is via >>>>>> > > > >>> mix-in interfaces. So instead of adding a method to the >>>>>> > SourceReader, >>>>>> > > > maybe >>>>>> > > > >>> we should introduce an interface >>>>>> SupportSplitReassingmentOnRecovery >>>>>> > > > with >>>>>> > > > >>> this method. If a Source implementation implements that >>>>>> interface, >>>>>> > > > then the >>>>>> > > > >>> SourceOperator will check the desired behavior and act >>>>>> accordingly. >>>>>> > > > >>> >>>>>> > > > >>> Thanks, >>>>>> > > > >>> >>>>>> > > > >>> Jiangjie (Becket) Qin >>>>>> > > > >>> >>>>>> > > > >>> On Wed, Aug 20, 2025 at 8:52 PM Hongshun Wang < >>>>>> > [email protected] >>>>>> > > > > >>>>>> > > > >>> wrote: >>>>>> > > > >>> >>>>>> > > > >>>> Hi de vs, >>>>>> > > > >>>> >>>>>> > > > >>>> Would anyone like to discuss this FLIP? I'd appreciate your >>>>>> > feedback >>>>>> > > > >>>> and suggestions. >>>>>> > > > >>>> >>>>>> > > > >>>> Best, >>>>>> > > > >>>> Hongshun >>>>>> > > > >>>> >>>>>> > > > >>>> >>>>>> > > > >>>> 2025年8月13日 14:23,Hongshun Wang <[email protected]> 写道: >>>>>> > > > >>>> >>>>>> > > > >>>> Hi Becket, >>>>>> > > > >>>> >>>>>> > > > >>>> Thank you for your detailed feedback. The new contract >>>>>> makes good >>>>>> > > > sense >>>>>> > > > >>>> to me and effectively addresses the issues I encountered >>>>>> at the >>>>>> > > > beginning >>>>>> > > > >>>> of the design. >>>>>> > > > >>>> >>>>>> > > > >>>> That said, I recommend not reporting splits by default, >>>>>> primarily >>>>>> > for >>>>>> > > > >>>> compatibility and practical reasons: >>>>>> > > > >>>> >>>>>> > > > >>>> > For these reasons, we do not expect the Split objects >>>>>> to be >>>>>> > huge, >>>>>> > > > >>>> and we are not trying to design for huge Split objects >>>>>> either as >>>>>> > they >>>>>> > > > will >>>>>> > > > >>>> have problems even today. >>>>>> > > > >>>> >>>>>> > > > >>>> 1. >>>>>> > > > >>>> >>>>>> > > > >>>> Not all existing connector match this rule >>>>>> > > > >>>> For example, in mysql cdc connector, a binlog split may >>>>>> contain >>>>>> > > > >>>> hundreds (or even more) snapshot split completion >>>>>> records. This >>>>>> > > > state is >>>>>> > > > >>>> large and is currently transmitted incrementally through >>>>>> > multiple >>>>>> > > > >>>> BinlogSplitMetaEvent messages. Since the binlog reader >>>>>> operates >>>>>> > > > >>>> with single parallelism, reporting the full split state >>>>>> on >>>>>> > recovery >>>>>> > > > >>>> could be inefficient or even infeasible. >>>>>> > > > >>>> For such sources, it would be better to provide a >>>>>> mechanism to >>>>>> > skip >>>>>> > > > >>>> split reporting during restart until they redesign and >>>>>> reduce >>>>>> > the >>>>>> > > > >>>> split size. >>>>>> > > > >>>> 2. >>>>>> > > > >>>> >>>>>> > > > >>>> Not all enumerators maintain unassigned splits in state. >>>>>> > > > >>>> Some SplitEnumerator(such as kafka connector) >>>>>> implementations >>>>>> > do >>>>>> > > > >>>> not track or persistently manage unassigned splits. >>>>>> Requiring >>>>>> > them >>>>>> > > > to >>>>>> > > > >>>> handle re-registration would add unnecessary >>>>>> complexity. Even >>>>>> > > > though we >>>>>> > > > >>>> maybe implements in kafka connector, currently, kafka >>>>>> connector >>>>>> > is >>>>>> > > > decouple >>>>>> > > > >>>> with flink version, we also need to make sure the elder >>>>>> version >>>>>> > is >>>>>> > > > >>>> compatible. >>>>>> > > > >>>> >>>>>> > > > >>>> ------------------------------ >>>>>> > > > >>>> >>>>>> > > > >>>> To address these concerns, I propose introducing a new >>>>>> method: >>>>>> > boolean >>>>>> > > > >>>> SourceReader#shouldReassignSplitsOnRecovery() with a >>>>>> default >>>>>> > > > >>>> implementation returning false. This allows source readers >>>>>> to opt >>>>>> > in >>>>>> > > > >>>> to split reassignment only when necessary. Since the new >>>>>> contract >>>>>> > > > already >>>>>> > > > >>>> places the responsibility for split assignment on the >>>>>> enumerator, >>>>>> > not >>>>>> > > > >>>> reporting splits by default is a safe and clean default >>>>>> behavior. >>>>>> > > > >>>> >>>>>> > > > >>>> >>>>>> > > > >>>> ------------------------------ >>>>>> > > > >>>> >>>>>> > > > >>>> >>>>>> > > > >>>> I’ve updated the implementation and the FIP >>>>>> accordingly[1]. It >>>>>> > quite a >>>>>> > > > >>>> big change. In particular, for the Kafka connector, we can >>>>>> now use >>>>>> > a >>>>>> > > > >>>> pluggable SplitPartitioner to support different split >>>>>> assignment >>>>>> > > > >>>> strategies (e.g., default, round-robin). >>>>>> > > > >>>> >>>>>> > > > >>>> >>>>>> > > > >>>> Could you please review it when you have a chance? >>>>>> > > > >>>> >>>>>> > > > >>>> >>>>>> > > > >>>> Best, >>>>>> > > > >>>> >>>>>> > > > >>>> Hongshun >>>>>> > > > >>>> >>>>>> > > > >>>> >>>>>> > > > >>>> [1] >>>>>> > > > >>>> >>>>>> > > > >>>>>> > >>>>>> > >>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment >>>>>> > > > >>>> >>>>>> > > > >>>> On Sat, Aug 9, 2025 at 3:03 AM Becket Qin <[email protected] >>>>>> > >>>>>> > > > wrote: >>>>>> > > > >>>> >>>>>> > > > >>>>> Hi Hongshun, >>>>>> > > > >>>>> >>>>>> > > > >>>>> I am not too concerned about the transmission cost. >>>>>> Because the >>>>>> > full >>>>>> > > > >>>>> split transmission has to happen in the initial >>>>>> assignment phase >>>>>> > > > already. >>>>>> > > > >>>>> And in the future, we probably want to also introduce >>>>>> some kind >>>>>> > of >>>>>> > > > workload >>>>>> > > > >>>>> balance across source readers, e.g. based on the per-split >>>>>> > > > throughput or >>>>>> > > > >>>>> the per-source-reader workload in heterogeneous clusters. >>>>>> For >>>>>> > these >>>>>> > > > >>>>> reasons, we do not expect the Split objects to be huge, >>>>>> and we >>>>>> > are >>>>>> > > > not >>>>>> > > > >>>>> trying to design for huge Split objects either as they >>>>>> will have >>>>>> > > > problems >>>>>> > > > >>>>> even today. >>>>>> > > > >>>>> >>>>>> > > > >>>>> Good point on the potential split loss, please see the >>>>>> reply >>>>>> > below: >>>>>> > > > >>>>> >>>>>> > > > >>>>> Scenario 2: >>>>>> > > > >>>>> >>>>>> > > > >>>>> >>>>>> > > > >>>>>> 1. Reader A reports splits (1 and 2), and Reader B >>>>>> reports (3 >>>>>> > and 4) >>>>>> > > > >>>>>> upon restart. >>>>>> > > > >>>>>> 2. Before the enumerator receives all reports and >>>>>> performs >>>>>> > > > >>>>>> reassignment, a checkpoint is triggered. >>>>>> > > > >>>>>> 3. Since no splits have been reassigned yet, both >>>>>> readers have >>>>>> > empty >>>>>> > > > >>>>>> states. >>>>>> > > > >>>>>> 4. When restarting from this checkpoint, all four splits >>>>>> are >>>>>> > lost. >>>>>> > > > >>>>> >>>>>> > > > >>>>> The reader registration happens in the >>>>>> SourceOperator.open(), >>>>>> > which >>>>>> > > > >>>>> means the task is still in the initializing state, >>>>>> therefore the >>>>>> > > > checkpoint >>>>>> > > > >>>>> should not be triggered until the enumerator receives all >>>>>> the >>>>>> > split >>>>>> > > > reports. >>>>>> > > > >>>>> >>>>>> > > > >>>>> There is a nuance here. Today, the RPC call from the TM >>>>>> to the JM >>>>>> > is >>>>>> > > > >>>>> async. So it is possible that the SourceOpertor.open() has >>>>>> > returned, >>>>>> > > > but >>>>>> > > > >>>>> the enumerator has not received the split reports. >>>>>> However, >>>>>> > because >>>>>> > > > the >>>>>> > > > >>>>> task status update RPC call goes to the same channel as >>>>>> the split >>>>>> > > > reports >>>>>> > > > >>>>> call, so the task status RPC call will happen after the >>>>>> split >>>>>> > > > reports call >>>>>> > > > >>>>> on the JM side. Therefore, on the JM side, the >>>>>> SourceCoordinator >>>>>> > will >>>>>> > > > >>>>> always first receive the split reports, then receive the >>>>>> > checkpoint >>>>>> > > > request. >>>>>> > > > >>>>> This "happen before" relationship is kind of important to >>>>>> > guarantee >>>>>> > > > >>>>> the consistent state between enumerator and readers. >>>>>> > > > >>>>> >>>>>> > > > >>>>> Scenario 1: >>>>>> > > > >>>>> >>>>>> > > > >>>>> >>>>>> > > > >>>>>> 1. Upon restart, Reader A reports assigned splits (1 and >>>>>> 2), and >>>>>> > > > >>>>>> Reader B reports (3 and 4). >>>>>> > > > >>>>>> 2. The enumerator receives these reports but only >>>>>> reassigns >>>>>> > splits 1 >>>>>> > > > >>>>>> and 2 — not 3 and 4. >>>>>> > > > >>>>>> 3. A checkpoint or savepoint is then triggered. Only >>>>>> splits 1 >>>>>> > and 2 >>>>>> > > > >>>>>> are recorded in the reader states; splits 3 and 4 are not >>>>>> > persisted. >>>>>> > > > >>>>>> 4. If the job is later restarted from this checkpoint, >>>>>> splits 3 >>>>>> > and >>>>>> > > > 4 >>>>>> > > > >>>>>> will be permanently lost. >>>>>> > > > >>>>> >>>>>> > > > >>>>> This scenario is possible. One solution is to let the >>>>>> enumerator >>>>>> > > > >>>>> implementation handle this. That means if the enumerator >>>>>> relies >>>>>> > on >>>>>> > > > the >>>>>> > > > >>>>> initial split reports from the source readers, it should >>>>>> maintain >>>>>> > > > these >>>>>> > > > >>>>> reports by itself. In the above example, the enumerator >>>>>> will need >>>>>> > to >>>>>> > > > >>>>> remember that 3 and 4 are not assigned and put it into >>>>>> its own >>>>>> > state. >>>>>> > > > >>>>> The current contract is that anything assigned to the >>>>>> > SourceReaders >>>>>> > > > >>>>> are completely owned by the SourceReaders. Enumerators can >>>>>> > remember >>>>>> > > > the >>>>>> > > > >>>>> assignments but cannot change them, even when the source >>>>>> reader >>>>>> > > > recovers / >>>>>> > > > >>>>> restarts. >>>>>> > > > >>>>> With this FLIP, the contract becomes that the source >>>>>> readers will >>>>>> > > > >>>>> return the ownership of the splits to the enumerator. So >>>>>> the >>>>>> > > > enumerator is >>>>>> > > > >>>>> responsible for maintaining these splits, until they are >>>>>> assigned >>>>>> > to >>>>>> > > > a >>>>>> > > > >>>>> source reader again. >>>>>> > > > >>>>> >>>>>> > > > >>>>> There are other cases where there may be conflict >>>>>> information >>>>>> > between >>>>>> > > > >>>>> reader and enumerator. For example, consider the following >>>>>> > sequence: >>>>>> > > > >>>>> 1. reader A reports splits (1 and 2) up on restart. >>>>>> > > > >>>>> 2. enumerator receives the report and assigns both 1 and >>>>>> 2 to >>>>>> > reader >>>>>> > > > B. >>>>>> > > > >>>>> 3. reader A failed before checkpointing. And this is a >>>>>> partial >>>>>> > > > >>>>> failure, so only reader A restarts. >>>>>> > > > >>>>> 4. When reader A recovers, it will again report splits (1 >>>>>> and 2) >>>>>> > to >>>>>> > > > >>>>> the enumerator. >>>>>> > > > >>>>> 5. The enumerator should ignore this report because it has >>>>>> > > > >>>>> assigned splits (1 and 2) to reader B. >>>>>> > > > >>>>> >>>>>> > > > >>>>> So with the new contract, the enumerator should be the >>>>>> source of >>>>>> > > > truth >>>>>> > > > >>>>> for split ownership. >>>>>> > > > >>>>> >>>>>> > > > >>>>> Thanks, >>>>>> > > > >>>>> >>>>>> > > > >>>>> Jiangjie (Becket) Qin >>>>>> > > > >>>>> >>>>>> > > > >>>>> On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang < >>>>>> > > > [email protected]> >>>>>> > > > >>>>> wrote: >>>>>> > > > >>>>> >>>>>> > > > >>>>>> Hi Becket, >>>>>> > > > >>>>>> >>>>>> > > > >>>>>> >>>>>> > > > >>>>>> I did consider this approach at the beginning (and it >>>>>> was also >>>>>> > > > >>>>>> mentioned in this FLIP), since it would allow more >>>>>> flexibility >>>>>> > in >>>>>> > > > >>>>>> reassigning all splits. However, there are a few >>>>>> potential >>>>>> > issues. >>>>>> > > > >>>>>> >>>>>> > > > >>>>>> 1. High Transmission Cost >>>>>> > > > >>>>>> If we pass the full split objects (rather than just >>>>>> split IDs), >>>>>> > the >>>>>> > > > >>>>>> data size could be significant, leading to high overhead >>>>>> during >>>>>> > > > >>>>>> transmission — especially when many splits are involved. >>>>>> > > > >>>>>> >>>>>> > > > >>>>>> 2. Risk of Split Loss >>>>>> > > > >>>>>> >>>>>> > > > >>>>>> Risk of split loss exists unless we have a mechanism to >>>>>> make >>>>>> > sure >>>>>> > > > >>>>>> only can checkpoint after all the splits are reassigned. >>>>>> > > > >>>>>> There are scenarios where splits could be lost due to >>>>>> > inconsistent >>>>>> > > > >>>>>> state handling during recovery: >>>>>> > > > >>>>>> >>>>>> > > > >>>>>> >>>>>> > > > >>>>>> Scenario 1: >>>>>> > > > >>>>>> >>>>>> > > > >>>>>> >>>>>> > > > >>>>>> 1. Upon restart, Reader A reports assigned splits (1 >>>>>> and 2), >>>>>> > and >>>>>> > > > >>>>>> Reader B reports (3 and 4). >>>>>> > > > >>>>>> 2. The enumerator receives these reports but only >>>>>> reassigns >>>>>> > > > >>>>>> splits 1 and 2 — not 3 and 4. >>>>>> > > > >>>>>> 3. A checkpoint or savepoint is then triggered. Only >>>>>> splits 1 >>>>>> > and >>>>>> > > > >>>>>> 2 are recorded in the reader states; splits 3 and 4 >>>>>> are not >>>>>> > > > persisted. >>>>>> > > > >>>>>> 4. If the job is later restarted from this >>>>>> checkpoint, splits >>>>>> > 3 >>>>>> > > > >>>>>> and 4 will be permanently lost. >>>>>> > > > >>>>>> >>>>>> > > > >>>>>> >>>>>> > > > >>>>>> Scenario 2: >>>>>> > > > >>>>>> >>>>>> > > > >>>>>> 1. Reader A reports splits (1 and 2), and Reader B >>>>>> reports (3 >>>>>> > and >>>>>> > > > >>>>>> 4) upon restart. >>>>>> > > > >>>>>> 2. Before the enumerator receives all reports and >>>>>> performs >>>>>> > > > >>>>>> reassignment, a checkpoint is triggered. >>>>>> > > > >>>>>> 3. Since no splits have been reassigned yet, both >>>>>> readers >>>>>> > have >>>>>> > > > >>>>>> empty states. >>>>>> > > > >>>>>> 4. When restarting from this checkpoint, all four >>>>>> splits are >>>>>> > > > lost. >>>>>> > > > >>>>>> >>>>>> > > > >>>>>> >>>>>> > > > >>>>>> Let me know if you have thoughts on how we might >>>>>> mitigate these >>>>>> > > > risks! >>>>>> > > > >>>>>> >>>>>> > > > >>>>>> Best >>>>>> > > > >>>>>> Hongshun >>>>>> > > > >>>>>> >>>>>> > > > >>>>>> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin < >>>>>> [email protected]> >>>>>> > > > >>>>>> wrote: >>>>>> > > > >>>>>> >>>>>> > > > >>>>>>> Hi Hongshun, >>>>>> > > > >>>>>>> >>>>>> > > > >>>>>>> The steps sound reasonable to me in general. In terms >>>>>> of the >>>>>> > > > updated >>>>>> > > > >>>>>>> FLIP wiki, it would be good to see if we can keep the >>>>>> protocol >>>>>> > > > simple. One >>>>>> > > > >>>>>>> alternative way to achieve this behavior is following: >>>>>> > > > >>>>>>> >>>>>> > > > >>>>>>> 1. Upon SourceOperator startup, the SourceOperator sends >>>>>> > > > >>>>>>> ReaderRegistrationEvent with the currently assigned >>>>>> splits to >>>>>> > the >>>>>> > > > >>>>>>> enumerator. It does not add these splits to the >>>>>> SourceReader. >>>>>> > > > >>>>>>> 2. The enumerator will always use the >>>>>> > > > >>>>>>> SourceEnumeratorContext.assignSplits() to assign the >>>>>> splits. >>>>>> > (not >>>>>> > > > via the >>>>>> > > > >>>>>>> response of the SourceRegistrationEvent, this allows >>>>>> async >>>>>> > split >>>>>> > > > assignment >>>>>> > > > >>>>>>> in case the enumerator wants to wait until all the >>>>>> readers are >>>>>> > > > registered) >>>>>> > > > >>>>>>> 3. The SourceOperator will only call >>>>>> SourceReader.addSplits() >>>>>> > when >>>>>> > > > >>>>>>> it receives the AddSplitEvent from the enumerator. >>>>>> > > > >>>>>>> >>>>>> > > > >>>>>>> This protocol has a few benefits: >>>>>> > > > >>>>>>> 1. it basically allows arbitrary split reassignment upon >>>>>> > restart >>>>>> > > > >>>>>>> 2. simplicity: there is only one way to assign splits. >>>>>> > > > >>>>>>> >>>>>> > > > >>>>>>> So we only need one interface change: >>>>>> > > > >>>>>>> - add the initially assigned splits to ReaderInfo so the >>>>>> > Enumerator >>>>>> > > > >>>>>>> can access it. >>>>>> > > > >>>>>>> and one behavior change: >>>>>> > > > >>>>>>> - The SourceOperator should stop assigning splits to >>>>>> the from >>>>>> > state >>>>>> > > > >>>>>>> restoration, but >>>>>> > [message truncated...] >>>>>> > >>>>>> >>>>> >>> >
