Hi Hongshun, Thanks for updating the FLIP. A quick question: why do we need to change the behavior of addSplitsBack()? Should it remain the same?
Regarding the case of restart with changed subscription. I think the only correct behavior is removing obsolete splits without any warning / exception. It is OK to add an info level logging if we want to. It is a clear intention if the user has explicitly changed subscription and restarted the job. There is no need to add a config to double confirm. Regards, Jiangjie (Becket) Qin On Thu, Oct 9, 2025 at 7:28 PM Hongshun Wang <[email protected]> wrote: > Hi Leonard, > > If the SplitEnumerator received all splits after a restart, it becomes > straightforward to clear and un-assign the unmatched splits(checking > whether matches the source options). However, a key question arises: *should > automatically discard obsolete splits, or explicitly notify the user via an > exception?* > > We provided a option `scan.partition-unsubscribe.strategy`: > 1. If Strict, throws an exception when encountering removed splits. > 2. If Lenient, automatically removes obsolete splits silently. > > What Do you think? > > Best, > Hongshun > > On Thu, Oct 9, 2025 at 9:37 PM Leonard Xu <[email protected]> wrote: > >> Thanks hongshun for the updating and pretty detailed analysis for edge >> cases, the updated FLIP looks good to me now. >> >> Only last implementation details about scenario in motivation section: >> >> *Restart with Changed subscription: During restart, if source options >> remove a topic or table. The splits which have already assigned can not be >> removed.* >> >> Could you clarify how we resolve this in Kafka connector ? >> >> Best, >> Leonard >> >> >> >> 2025 10月 9 19:48,Hongshun Wang <[email protected]> 写道: >> >> Hi devs, >> If there are no further suggestions, I will start the voting tomorrow。 >> >> Best, >> Hongshun >> >> On Fri, Sep 26, 2025 at 7:48 PM Hongshun Wang <[email protected]> >> wrote: >> >>> Hi Becket and Leonard, >>> >>> I have updated the content of this FLIP. The key point is that: >>> >>> When the split enumerator receives a split, *these splits must have >>> already existed in pendingSplitAssignment or assignedSplitments*. >>> >>> - If the split is in pendingSplitAssignments, ignore it. >>> - If the split is in assignedSplitAssignments but has a different >>> taskId, ignore it (this indicates it was already assigned to another >>> task). >>> - If the split is in assignedSplitAssignments and shares the same >>> taskId, move the assignment from assignedSplitments to >>> pendingSplitAssignment >>> to re-assign again. >>> >>> >>> For better understanding why use these strategies. I added some examples >>> and pictures to show it. >>> >>> Would you like to help me check whether there are still some problems? >>> >>> Best >>> Hongshun >>> >>> >>> >>> On Fri, Sep 26, 2025 at 5:08 PM Leonard Xu <[email protected]> wrote: >>> >>>> Thanks Becket and Hongshun for the insightful discussion. >>>> >>>> The underlying implementation and communication mechanisms of Flink >>>> Source indeed involve many intricate details, we discussed the issue of >>>> splits re-assignment in specific scenarios, but fortunately, the final >>>> decision turned out to be pretty clear. >>>> >>>> +1 to Becket’s proposal to keeps the framework cleaner and more >>>> flexible. >>>> +1 to Hongshun’s point to provide comprehensive guidance for connector >>>> developers. >>>> >>>> >>>> Best, >>>> Leonard >>>> >>>> >>>> >>>> 2025 9月 26 16:30,Hongshun Wang <[email protected]> 写道: >>>> >>>> Hi Becket, >>>> >>>> I Got it. You’re suggesting we should not handle this in the source >>>> framework but instead let the split enumerator manage these three >>>> scenarios. >>>> >>>> Let me explain why I originally favored handling it in the framework: >>>> I'm concerned that connector developers might overlook certain edge cases >>>> (after all, we even payed extensive discussions to fully clarify the logic) >>>> >>>> However, your point keeps the framework cleaner and more flexible. >>>> Thus, I will take it. >>>> >>>> Perhaps, in this FLIP, we should focus on providing comprehensive >>>> guidance for connector developers: explain how to implement a split >>>> enumerator, including the underlying challenges and their solutions. >>>> >>>> >>>> Additionally, we can use the Kafka connector as a reference >>>> implementation to demonstrate the practical steps. This way, developers who >>>> want to implement similar connectors can directly reference this example. >>>> >>>> >>>> Best, >>>> Hongshun >>>> >>>> >>>> >>>> On Fri, Sep 26, 2025 at 1:27 PM Becket Qin <[email protected]> >>>> wrote: >>>> >>>>> It would be good to not expose runtime details to the source >>>>> implementation if possible. >>>>> >>>>> Today, the split enumerator implementations are expected to track the >>>>> split assignment. >>>>> >>>>> Assuming the split enumerator implementation keeps a split assignment >>>>> map, that means the enumerator should already know whether a split is >>>>> assigned or unassigned. So it can handle the three scenarios you >>>>> mentioned. >>>>> >>>>> The split is reported by a reader during a global restoration. >>>>>> >>>>> The split enumerator should have just been restored / created. If the >>>>> enumerator expects a full reassignment of splits up on global recovery, >>>>> there should be no assigned splits to that reader in the split assignment >>>>> mapping. >>>>> >>>>> The split is reported by a reader during a partial failure recovery. >>>>>> >>>>> In this case, when SplitEnumerator.addReader() is invoked, the split >>>>> assignment map in the enumerator implementation should already have some >>>>> split assignments for the reader. Therefore it is a partial failover. If >>>>> the source supports split reassignment on recovery, the enumerator can >>>>> assign splits that are different from the reported assignment of that >>>>> reader in the SplitEnumeratorContext, or it can also assign the same >>>>> splits. In any case, the enumerator knows that this is a partial recovery >>>>> because the assignment map is non-empty. >>>>> >>>>> The split is not reported by a reader, but is assigned after the last >>>>>> successful checkpoint and was never acknowledged. >>>>> >>>>> This is actually one of the step in the partial failure recover. >>>>> SplitEnumerator.addSplitsBack() will be called first before >>>>> SplitReader.addReader() is called for the recovered reader. When the >>>>> SplitEnumerator.addSplitsBack() is invoked, it is for sure a partial >>>>> recovery. And the enumerator should remove these splits from the split >>>>> assignment map as if they were never assigned. >>>>> >>>>> I think this should work, right? >>>>> >>>>> Thanks, >>>>> >>>>> Jiangjie (Becket) Qin >>>>> >>>>> On Thu, Sep 25, 2025 at 8:34 PM Hongshun Wang <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi Becket and Leonard, >>>>>> >>>>>> Thanks for your advice. >>>>>> >>>>>> > put all the reader information in the SplitEnumerator context >>>>>> I have a concern: the current registeredReaders in* >>>>>> SourceCoordinatorContext will be removed after subtaskResetor execution >>>>>> on >>>>>> failure*.However, this approach has merit. >>>>>> >>>>>> One more situation I found my previous design does not cover: >>>>>> >>>>>> 1. Initial state: Reader A reports splits (1, 2). >>>>>> 2. Enumerator action: Assigns split 1 to Reader A, and split 2 to >>>>>> Reader B. >>>>>> 3. Failure scenario: Reader A fails before checkpointing. Since >>>>>> this is a partial failure, only Reader A restarts. >>>>>> 4. Recovery issue: Upon recovery, Reader A re-reports split (1). >>>>>> >>>>>> In my previous design, the enumerator will ignore Reader A's >>>>>> re-registration which will cause data loss. >>>>>> >>>>>> Thus, when the enumerator receives a split, the split may originate >>>>>> from three scenarios: >>>>>> >>>>>> 1. The split is reported by a reader during a global restoration. >>>>>> 2. The split is reported by a reader during a partial failure >>>>>> recovery. >>>>>> 3. The split is not reported by a reader, but is assigned after >>>>>> the last successful checkpoint and was never acknowledged. >>>>>> >>>>>> In the first scenario (global restore), the split should >>>>>> be re-distributed. For the latter two scenarios (partial failover and >>>>>> post-checkpoint assignment), we need to reassign the split to >>>>>> its originally assigned subtask. >>>>>> >>>>>> By implementing a method in the SplitEnumerator context to track each >>>>>> assigned split's status, the system can correctly identify and resolve >>>>>> split ownership in all three scenarios.*What about adding a >>>>>> `SplitRecoveryType splitRecoveryType(Split split)` in >>>>>> SplitEnumeratorContext.* SplitRecoveryTypeis a enum including >>>>>> `UNASSIGNED`、`GLOBAL_RESTORE`、`PARTIAL_FAILOVER` and >>>>>> `POST_CHECKPOINT_ASSIGNMENT`. >>>>>> >>>>>> What do you think? Are there any details or scenarios I haven't >>>>>> considered? Looking forward to your advice. >>>>>> >>>>>> Best, >>>>>> Hongshun >>>>>> >>>>>> On Thu, Sep 11, 2025 at 12:41 AM Becket Qin <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Thanks for the explanation, Hongshun. >>>>>>> >>>>>>> Current pattern of handling new reader registration following: >>>>>>> 1. put all the reader information in the SplitEnumerator context >>>>>>> 2. notify the enumerator about the new reader registration. >>>>>>> 3. Let the split enumerator get whatever information it wants from >>>>>>> the >>>>>>> context and do its job. >>>>>>> This pattern decouples the information passing and the reader >>>>>>> registration >>>>>>> notification. This makes the API extensible - we can add more >>>>>>> information >>>>>>> (e.g. reported assigned splits in our case) about the reader to the >>>>>>> context >>>>>>> without introducing new methods. >>>>>>> >>>>>>> Introducing a new method of addSplitBackOnRecovery() is redundant to >>>>>>> the >>>>>>> above pattern. Do we really need it? >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> Jiangjie (Becket) Qin >>>>>>> >>>>>>> On Mon, Sep 8, 2025 at 8:18 PM Hongshun Wang < >>>>>>> [email protected]> >>>>>>> wrote: >>>>>>> >>>>>>> > Hi Becket, >>>>>>> > >>>>>>> > > I am curious what would the enumerator do differently for the >>>>>>> splits >>>>>>> > added via addSplitsBackOnRecovery() V.S. addSplitsBack()? >>>>>>> > >>>>>>> > In this FLIP, there are two distinct scenarios in which the >>>>>>> enumerator >>>>>>> > receives splits being added back: >>>>>>> > 1. Job-level restore: The job is restored, splits from reader’s >>>>>>> state are >>>>>>> > reported by ReaderRegistrationEvent. >>>>>>> > 2. Reader-level restart: a reader is started but not the whole >>>>>>> job, >>>>>>> > splits assigned to it after the last successful checkpoint. This >>>>>>> is what >>>>>>> > addSplitsBack used to do. >>>>>>> > >>>>>>> > >>>>>>> > In these two situations, the enumerator will choose different >>>>>>> strategies. >>>>>>> > 1. Job-level restore: the splits should be redistributed across >>>>>>> readers >>>>>>> > according to the current partitioner strategy. >>>>>>> > 2. Reader-level restart: the splits should be reassigned directly >>>>>>> back to >>>>>>> > the same reader they were originally assigned to, preserving >>>>>>> locality and >>>>>>> > avoiding unnecessary redistribution >>>>>>> > >>>>>>> > Therefore, the enumerator must clearly distinguish between these >>>>>>> two >>>>>>> > scenarios.I used to deprecate the former >>>>>>> addSplitsBack(List<SplitT> >>>>>>> > splits, int subtaskId) but add a new addSplitsBack(List<SplitT> >>>>>>> > splits, int subtaskId, >>>>>>> > boolean reportedByReader). >>>>>>> > Leonard suggest to use another method addSplitsBackOnRecovery but >>>>>>> not >>>>>>> > influenced currently addSplitsBack. >>>>>>> > >>>>>>> > Best >>>>>>> > Hongshun >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > On 2025/09/08 17:20:31 Becket Qin wrote: >>>>>>> > > Hi Leonard, >>>>>>> > > >>>>>>> > > >>>>>>> > > > Could we introduce a new method like addSplitsBackOnRecovery >>>>>>> with >>>>>>> > default >>>>>>> > > > implementation. In this way, we can provide better backward >>>>>>> > compatibility >>>>>>> > > > and also makes it easier for developers to understand. >>>>>>> > > >>>>>>> > > >>>>>>> > > I am curious what would the enumerator do differently for the >>>>>>> splits >>>>>>> > added >>>>>>> > > via addSplitsBackOnRecovery() V.S. addSplitsBack()? Today, >>>>>>> > addSplitsBack() >>>>>>> > > is also only called upon recovery. So the new method seems >>>>>>> confusing. One >>>>>>> > > thing worth clarifying is if the Source implements >>>>>>> > > SupportSplitReassignmentOnRecovery, upon recovery, should the >>>>>>> splits >>>>>>> > > reported by the readers also be added back to the >>>>>>> SplitEnumerator via the >>>>>>> > > addSplitsBack() call? Or should the SplitEnumerator explicitly >>>>>>> query the >>>>>>> > > registered reader information via the SplitEnumeratorContext to >>>>>>> get the >>>>>>> > > originally assigned splits when addReader() is invoked? I was >>>>>>> assuming >>>>>>> > the >>>>>>> > > latter in the beginning, so the behavior of addSplitsBack() >>>>>>> remains >>>>>>> > > unchanged, but I am not opposed in doing the former. >>>>>>> > > >>>>>>> > > Also, can you elaborate on the backwards compatibility issue you >>>>>>> see if >>>>>>> > we >>>>>>> > > do not have a separate addSplitsBackOnRecovery() method? Even >>>>>>> without >>>>>>> > this >>>>>>> > > new method, the behavior remains exactly the same unless the end >>>>>>> users >>>>>>> > > implement the mix-in interface of >>>>>>> "SupportSplitReassignmentOnRecovery", >>>>>>> > > right? >>>>>>> > > >>>>>>> > > Thanks, >>>>>>> > > >>>>>>> > > Jiangjie (Becket) Qin >>>>>>> > > >>>>>>> > > On Mon, Sep 8, 2025 at 1:48 AM Hongshun Wang <[email protected]> >>>>>>> > > wrote: >>>>>>> > > >>>>>>> > > > Hi devs, >>>>>>> > > > >>>>>>> > > > It has been quite some time since this FLIP[1] was first >>>>>>> proposed. >>>>>>> > Thank >>>>>>> > > > you for your valuable feedback—based on your suggestions, the >>>>>>> FLIP has >>>>>>> > > > undergone several rounds of revisions. >>>>>>> > > > >>>>>>> > > > Any more advice is welcome and appreciated. If there are no >>>>>>> further >>>>>>> > > > concerns, I plan to start the vote tomorrow. >>>>>>> > > > >>>>>>> > > > Best >>>>>>> > > > Hongshun >>>>>>> > > > >>>>>>> > > > [1] >>>>>>> > > > >>>>>>> > >>>>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=373886480 >>>>>>> > > > >>>>>>> > > > On Mon, Sep 8, 2025 at 4:42 PM Hongshun Wang <[email protected]> >>>>>>> > > > wrote: >>>>>>> > > > >>>>>>> > > > > Hi Leonard, >>>>>>> > > > > Thanks for your advice. It makes sense and I have modified >>>>>>> it. >>>>>>> > > > > >>>>>>> > > > > Best, >>>>>>> > > > > Hongshun >>>>>>> > > > > >>>>>>> > > > > On Mon, Sep 8, 2025 at 11:40 AM Leonard Xu <[email protected]> >>>>>>> wrote: >>>>>>> > > > > >>>>>>> > > > >> Thanks Hongshun and Becket for the deep discussion. >>>>>>> > > > >> >>>>>>> > > > >> I only have one comment for one API design: >>>>>>> > > > >> >>>>>>> > > > >> > Deprecate the old addSplitsBack method, use a >>>>>>> addSplitsBack with >>>>>>> > > > >> param isReportedByReader instead. Because, The enumerator >>>>>>> can apply >>>>>>> > > > >> different reassignment policies based on the context. >>>>>>> > > > >> >>>>>>> > > > >> Could we introduce a new method like >>>>>>> *addSplitsBackOnRecovery* with >>>>>>> > > > default >>>>>>> > > > >> implementation. In this way, we can provide better backward >>>>>>> > > > >> compatibility and also makes it easier for developers to >>>>>>> understand. >>>>>>> > > > >> >>>>>>> > > > >> Best, >>>>>>> > > > >> Leonard >>>>>>> > > > >> >>>>>>> > > > >> >>>>>>> > > > >> >>>>>>> > > > >> 2025 9月 3 20:26,Hongshun Wang <[email protected]> 写道: >>>>>>> > > > >> >>>>>>> > > > >> Hi Becket, >>>>>>> > > > >> >>>>>>> > > > >> I think that's a great idea! I have added the >>>>>>> > > > >> SupportSplitReassignmentOnRecovery interface in this FLIP. >>>>>>> If a >>>>>>> > Source >>>>>>> > > > >> implements this interface indicates that the source >>>>>>> operator needs >>>>>>> > to >>>>>>> > > > >> report splits to the enumerator and receive reassignment.[1] >>>>>>> > > > >> >>>>>>> > > > >> Best, >>>>>>> > > > >> Hongshun >>>>>>> > > > >> >>>>>>> > > > >> [1] >>>>>>> > > > >> >>>>>>> > > > >>>>>>> > >>>>>>> > >>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment >>>>>>> > > > >> >>>>>>> > > > >> On Thu, Aug 21, 2025 at 12:09 PM Becket Qin < >>>>>>> [email protected]> >>>>>>> > > > wrote: >>>>>>> > > > >> >>>>>>> > > > >>> Hi Hongshun, >>>>>>> > > > >>> >>>>>>> > > > >>> I think the convention for such optional features in >>>>>>> Source is via >>>>>>> > > > >>> mix-in interfaces. So instead of adding a method to the >>>>>>> > SourceReader, >>>>>>> > > > maybe >>>>>>> > > > >>> we should introduce an interface >>>>>>> SupportSplitReassingmentOnRecovery >>>>>>> > > > with >>>>>>> > > > >>> this method. If a Source implementation implements that >>>>>>> interface, >>>>>>> > > > then the >>>>>>> > > > >>> SourceOperator will check the desired behavior and act >>>>>>> accordingly. >>>>>>> > > > >>> >>>>>>> > > > >>> Thanks, >>>>>>> > > > >>> >>>>>>> > > > >>> Jiangjie (Becket) Qin >>>>>>> > > > >>> >>>>>>> > > > >>> On Wed, Aug 20, 2025 at 8:52 PM Hongshun Wang < >>>>>>> > [email protected] >>>>>>> > > > > >>>>>>> > > > >>> wrote: >>>>>>> > > > >>> >>>>>>> > > > >>>> Hi de vs, >>>>>>> > > > >>>> >>>>>>> > > > >>>> Would anyone like to discuss this FLIP? I'd appreciate >>>>>>> your >>>>>>> > feedback >>>>>>> > > > >>>> and suggestions. >>>>>>> > > > >>>> >>>>>>> > > > >>>> Best, >>>>>>> > > > >>>> Hongshun >>>>>>> > > > >>>> >>>>>>> > > > >>>> >>>>>>> > > > >>>> 2025年8月13日 14:23,Hongshun Wang <[email protected]> 写道: >>>>>>> > > > >>>> >>>>>>> > > > >>>> Hi Becket, >>>>>>> > > > >>>> >>>>>>> > > > >>>> Thank you for your detailed feedback. The new contract >>>>>>> makes good >>>>>>> > > > sense >>>>>>> > > > >>>> to me and effectively addresses the issues I encountered >>>>>>> at the >>>>>>> > > > beginning >>>>>>> > > > >>>> of the design. >>>>>>> > > > >>>> >>>>>>> > > > >>>> That said, I recommend not reporting splits by default, >>>>>>> primarily >>>>>>> > for >>>>>>> > > > >>>> compatibility and practical reasons: >>>>>>> > > > >>>> >>>>>>> > > > >>>> > For these reasons, we do not expect the Split objects >>>>>>> to be >>>>>>> > huge, >>>>>>> > > > >>>> and we are not trying to design for huge Split objects >>>>>>> either as >>>>>>> > they >>>>>>> > > > will >>>>>>> > > > >>>> have problems even today. >>>>>>> > > > >>>> >>>>>>> > > > >>>> 1. >>>>>>> > > > >>>> >>>>>>> > > > >>>> Not all existing connector match this rule >>>>>>> > > > >>>> For example, in mysql cdc connector, a binlog split >>>>>>> may contain >>>>>>> > > > >>>> hundreds (or even more) snapshot split completion >>>>>>> records. This >>>>>>> > > > state is >>>>>>> > > > >>>> large and is currently transmitted incrementally >>>>>>> through >>>>>>> > multiple >>>>>>> > > > >>>> BinlogSplitMetaEvent messages. Since the binlog reader >>>>>>> operates >>>>>>> > > > >>>> with single parallelism, reporting the full split >>>>>>> state on >>>>>>> > recovery >>>>>>> > > > >>>> could be inefficient or even infeasible. >>>>>>> > > > >>>> For such sources, it would be better to provide a >>>>>>> mechanism to >>>>>>> > skip >>>>>>> > > > >>>> split reporting during restart until they redesign and >>>>>>> reduce >>>>>>> > the >>>>>>> > > > >>>> split size. >>>>>>> > > > >>>> 2. >>>>>>> > > > >>>> >>>>>>> > > > >>>> Not all enumerators maintain unassigned splits in >>>>>>> state. >>>>>>> > > > >>>> Some SplitEnumerator(such as kafka connector) >>>>>>> implementations >>>>>>> > do >>>>>>> > > > >>>> not track or persistently manage unassigned splits. >>>>>>> Requiring >>>>>>> > them >>>>>>> > > > to >>>>>>> > > > >>>> handle re-registration would add unnecessary >>>>>>> complexity. Even >>>>>>> > > > though we >>>>>>> > > > >>>> maybe implements in kafka connector, currently, kafka >>>>>>> connector >>>>>>> > is >>>>>>> > > > decouple >>>>>>> > > > >>>> with flink version, we also need to make sure the >>>>>>> elder version >>>>>>> > is >>>>>>> > > > >>>> compatible. >>>>>>> > > > >>>> >>>>>>> > > > >>>> ------------------------------ >>>>>>> > > > >>>> >>>>>>> > > > >>>> To address these concerns, I propose introducing a new >>>>>>> method: >>>>>>> > boolean >>>>>>> > > > >>>> SourceReader#shouldReassignSplitsOnRecovery() with a >>>>>>> default >>>>>>> > > > >>>> implementation returning false. This allows source >>>>>>> readers to opt >>>>>>> > in >>>>>>> > > > >>>> to split reassignment only when necessary. Since the new >>>>>>> contract >>>>>>> > > > already >>>>>>> > > > >>>> places the responsibility for split assignment on the >>>>>>> enumerator, >>>>>>> > not >>>>>>> > > > >>>> reporting splits by default is a safe and clean default >>>>>>> behavior. >>>>>>> > > > >>>> >>>>>>> > > > >>>> >>>>>>> > > > >>>> ------------------------------ >>>>>>> > > > >>>> >>>>>>> > > > >>>> >>>>>>> > > > >>>> I’ve updated the implementation and the FIP >>>>>>> accordingly[1]. It >>>>>>> > quite a >>>>>>> > > > >>>> big change. In particular, for the Kafka connector, we >>>>>>> can now use >>>>>>> > a >>>>>>> > > > >>>> pluggable SplitPartitioner to support different split >>>>>>> assignment >>>>>>> > > > >>>> strategies (e.g., default, round-robin). >>>>>>> > > > >>>> >>>>>>> > > > >>>> >>>>>>> > > > >>>> Could you please review it when you have a chance? >>>>>>> > > > >>>> >>>>>>> > > > >>>> >>>>>>> > > > >>>> Best, >>>>>>> > > > >>>> >>>>>>> > > > >>>> Hongshun >>>>>>> > > > >>>> >>>>>>> > > > >>>> >>>>>>> > > > >>>> [1] >>>>>>> > > > >>>> >>>>>>> > > > >>>>>>> > >>>>>>> > >>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment >>>>>>> > > > >>>> >>>>>>> > > > >>>> On Sat, Aug 9, 2025 at 3:03 AM Becket Qin < >>>>>>> [email protected]> >>>>>>> > > > wrote: >>>>>>> > > > >>>> >>>>>>> > > > >>>>> Hi Hongshun, >>>>>>> > > > >>>>> >>>>>>> > > > >>>>> I am not too concerned about the transmission cost. >>>>>>> Because the >>>>>>> > full >>>>>>> > > > >>>>> split transmission has to happen in the initial >>>>>>> assignment phase >>>>>>> > > > already. >>>>>>> > > > >>>>> And in the future, we probably want to also introduce >>>>>>> some kind >>>>>>> > of >>>>>>> > > > workload >>>>>>> > > > >>>>> balance across source readers, e.g. based on the >>>>>>> per-split >>>>>>> > > > throughput or >>>>>>> > > > >>>>> the per-source-reader workload in heterogeneous >>>>>>> clusters. For >>>>>>> > these >>>>>>> > > > >>>>> reasons, we do not expect the Split objects to be huge, >>>>>>> and we >>>>>>> > are >>>>>>> > > > not >>>>>>> > > > >>>>> trying to design for huge Split objects either as they >>>>>>> will have >>>>>>> > > > problems >>>>>>> > > > >>>>> even today. >>>>>>> > > > >>>>> >>>>>>> > > > >>>>> Good point on the potential split loss, please see the >>>>>>> reply >>>>>>> > below: >>>>>>> > > > >>>>> >>>>>>> > > > >>>>> Scenario 2: >>>>>>> > > > >>>>> >>>>>>> > > > >>>>> >>>>>>> > > > >>>>>> 1. Reader A reports splits (1 and 2), and Reader B >>>>>>> reports (3 >>>>>>> > and 4) >>>>>>> > > > >>>>>> upon restart. >>>>>>> > > > >>>>>> 2. Before the enumerator receives all reports and >>>>>>> performs >>>>>>> > > > >>>>>> reassignment, a checkpoint is triggered. >>>>>>> > > > >>>>>> 3. Since no splits have been reassigned yet, both >>>>>>> readers have >>>>>>> > empty >>>>>>> > > > >>>>>> states. >>>>>>> > > > >>>>>> 4. When restarting from this checkpoint, all four >>>>>>> splits are >>>>>>> > lost. >>>>>>> > > > >>>>> >>>>>>> > > > >>>>> The reader registration happens in the >>>>>>> SourceOperator.open(), >>>>>>> > which >>>>>>> > > > >>>>> means the task is still in the initializing state, >>>>>>> therefore the >>>>>>> > > > checkpoint >>>>>>> > > > >>>>> should not be triggered until the enumerator receives >>>>>>> all the >>>>>>> > split >>>>>>> > > > reports. >>>>>>> > > > >>>>> >>>>>>> > > > >>>>> There is a nuance here. Today, the RPC call from the TM >>>>>>> to the JM >>>>>>> > is >>>>>>> > > > >>>>> async. So it is possible that the SourceOpertor.open() >>>>>>> has >>>>>>> > returned, >>>>>>> > > > but >>>>>>> > > > >>>>> the enumerator has not received the split reports. >>>>>>> However, >>>>>>> > because >>>>>>> > > > the >>>>>>> > > > >>>>> task status update RPC call goes to the same channel as >>>>>>> the split >>>>>>> > > > reports >>>>>>> > > > >>>>> call, so the task status RPC call will happen after the >>>>>>> split >>>>>>> > > > reports call >>>>>>> > > > >>>>> on the JM side. Therefore, on the JM side, the >>>>>>> SourceCoordinator >>>>>>> > will >>>>>>> > > > >>>>> always first receive the split reports, then receive the >>>>>>> > checkpoint >>>>>>> > > > request. >>>>>>> > > > >>>>> This "happen before" relationship is kind of important to >>>>>>> > guarantee >>>>>>> > > > >>>>> the consistent state between enumerator and readers. >>>>>>> > > > >>>>> >>>>>>> > > > >>>>> Scenario 1: >>>>>>> > > > >>>>> >>>>>>> > > > >>>>> >>>>>>> > > > >>>>>> 1. Upon restart, Reader A reports assigned splits (1 >>>>>>> and 2), and >>>>>>> > > > >>>>>> Reader B reports (3 and 4). >>>>>>> > > > >>>>>> 2. The enumerator receives these reports but only >>>>>>> reassigns >>>>>>> > splits 1 >>>>>>> > > > >>>>>> and 2 — not 3 and 4. >>>>>>> > > > >>>>>> 3. A checkpoint or savepoint is then triggered. Only >>>>>>> splits 1 >>>>>>> > and 2 >>>>>>> > > > >>>>>> are recorded in the reader states; splits 3 and 4 are >>>>>>> not >>>>>>> > persisted. >>>>>>> > > > >>>>>> 4. If the job is later restarted from this checkpoint, >>>>>>> splits 3 >>>>>>> > and >>>>>>> > > > 4 >>>>>>> > > > >>>>>> will be permanently lost. >>>>>>> > > > >>>>> >>>>>>> > > > >>>>> This scenario is possible. One solution is to let the >>>>>>> enumerator >>>>>>> > > > >>>>> implementation handle this. That means if the enumerator >>>>>>> relies >>>>>>> > on >>>>>>> > > > the >>>>>>> > > > >>>>> initial split reports from the source readers, it should >>>>>>> maintain >>>>>>> > > > these >>>>>>> > > > >>>>> reports by itself. In the above example, the enumerator >>>>>>> will need >>>>>>> > to >>>>>>> > > > >>>>> remember that 3 and 4 are not assigned and put it into >>>>>>> its own >>>>>>> > state. >>>>>>> > > > >>>>> The current contract is that anything assigned to the >>>>>>> > SourceReaders >>>>>>> > > > >>>>> are completely owned by the SourceReaders. Enumerators >>>>>>> can >>>>>>> > remember >>>>>>> > > > the >>>>>>> > > > >>>>> assignments but cannot change them, even when the source >>>>>>> reader >>>>>>> > > > recovers / >>>>>>> > > > >>>>> restarts. >>>>>>> > > > >>>>> With this FLIP, the contract becomes that the source >>>>>>> readers will >>>>>>> > > > >>>>> return the ownership of the splits to the enumerator. So >>>>>>> the >>>>>>> > > > enumerator is >>>>>>> > > > >>>>> responsible for maintaining these splits, until they are >>>>>>> assigned >>>>>>> > to >>>>>>> > > > a >>>>>>> > > > >>>>> source reader again. >>>>>>> > > > >>>>> >>>>>>> > > > >>>>> There are other cases where there may be conflict >>>>>>> information >>>>>>> > between >>>>>>> > > > >>>>> reader and enumerator. For example, consider the >>>>>>> following >>>>>>> > sequence: >>>>>>> > > > >>>>> 1. reader A reports splits (1 and 2) up on restart. >>>>>>> > > > >>>>> 2. enumerator receives the report and assigns both 1 and >>>>>>> 2 to >>>>>>> > reader >>>>>>> > > > B. >>>>>>> > > > >>>>> 3. reader A failed before checkpointing. And this is a >>>>>>> partial >>>>>>> > > > >>>>> failure, so only reader A restarts. >>>>>>> > > > >>>>> 4. When reader A recovers, it will again report splits >>>>>>> (1 and 2) >>>>>>> > to >>>>>>> > > > >>>>> the enumerator. >>>>>>> > > > >>>>> 5. The enumerator should ignore this report because it >>>>>>> has >>>>>>> > > > >>>>> assigned splits (1 and 2) to reader B. >>>>>>> > > > >>>>> >>>>>>> > > > >>>>> So with the new contract, the enumerator should be the >>>>>>> source of >>>>>>> > > > truth >>>>>>> > > > >>>>> for split ownership. >>>>>>> > > > >>>>> >>>>>>> > > > >>>>> Thanks, >>>>>>> > > > >>>>> >>>>>>> > > > >>>>> Jiangjie (Becket) Qin >>>>>>> > > > >>>>> >>>>>>> > > > >>>>> On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang < >>>>>>> > > > [email protected]> >>>>>>> > > > >>>>> wrote: >>>>>>> > > > >>>>> >>>>>>> > > > >>>>>> Hi Becket, >>>>>>> > > > >>>>>> >>>>>>> > > > >>>>>> >>>>>>> > > > >>>>>> I did consider this approach at the beginning (and it >>>>>>> was also >>>>>>> > > > >>>>>> mentioned in this FLIP), since it would allow more >>>>>>> flexibility >>>>>>> > in >>>>>>> > > > >>>>>> reassigning all splits. However, there are a few >>>>>>> potential >>>>>>> > issues. >>>>>>> > > > >>>>>> >>>>>>> > > > >>>>>> 1. High Transmission Cost >>>>>>> > > > >>>>>> If we pass the full split objects (rather than just >>>>>>> split IDs), >>>>>>> > the >>>>>>> > > > >>>>>> data size could be significant, leading to high >>>>>>> overhead during >>>>>>> > > > >>>>>> transmission — especially when many splits are involved. >>>>>>> > > > >>>>>> >>>>>>> > > > >>>>>> 2. Risk of Split Loss >>>>>>> > > > >>>>>> >>>>>>> > > > >>>>>> Risk of split loss exists unless we have a mechanism to >>>>>>> make >>>>>>> > sure >>>>>>> > > > >>>>>> only can checkpoint after all the splits are reassigned. >>>>>>> > > > >>>>>> There are scenarios where splits could be lost due to >>>>>>> > inconsistent >>>>>>> > > > >>>>>> state handling during recovery: >>>>>>> > > > >>>>>> >>>>>>> > > > >>>>>> >>>>>>> > > > >>>>>> Scenario 1: >>>>>>> > > > >>>>>> >>>>>>> > > > >>>>>> >>>>>>> > > > >>>>>> 1. Upon restart, Reader A reports assigned splits (1 >>>>>>> and 2), >>>>>>> > and >>>>>>> > > > >>>>>> Reader B reports (3 and 4). >>>>>>> > > > >>>>>> 2. The enumerator receives these reports but only >>>>>>> reassigns >>>>>>> > > > >>>>>> splits 1 and 2 — not 3 and 4. >>>>>>> > > > >>>>>> 3. A checkpoint or savepoint is then triggered. Only >>>>>>> splits 1 >>>>>>> > and >>>>>>> > > > >>>>>> 2 are recorded in the reader states; splits 3 and 4 >>>>>>> are not >>>>>>> > > > persisted. >>>>>>> > > > >>>>>> 4. If the job is later restarted from this >>>>>>> checkpoint, splits >>>>>>> > 3 >>>>>>> > > > >>>>>> and 4 will be permanently lost. >>>>>>> > > > >>>>>> >>>>>>> > > > >>>>>> >>>>>>> > > > >>>>>> Scenario 2: >>>>>>> > > > >>>>>> >>>>>>> > > > >>>>>> 1. Reader A reports splits (1 and 2), and Reader B >>>>>>> reports (3 >>>>>>> > and >>>>>>> > > > >>>>>> 4) upon restart. >>>>>>> > > > >>>>>> 2. Before the enumerator receives all reports and >>>>>>> performs >>>>>>> > > > >>>>>> reassignment, a checkpoint is triggered. >>>>>>> > > > >>>>>> 3. Since no splits have been reassigned yet, both >>>>>>> readers >>>>>>> > have >>>>>>> > > > >>>>>> empty states. >>>>>>> > > > >>>>>> 4. When restarting from this checkpoint, all four >>>>>>> splits are >>>>>>> > > > lost. >>>>>>> > > > >>>>>> >>>>>>> > > > >>>>>> >>>>>>> > > > >>>>>> Let me know if you have thoughts on how we might >>>>>>> mitigate these >>>>>>> > > > risks! >>>>>>> > > > >>>>>> >>>>>>> > > > >>>>>> Best >>>>>>> > > > >>>>>> Hongshun >>>>>>> > > > >>>>>> >>>>>>> > > > >>>>>> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin < >>>>>>> [email protected]> >>>>>>> > > > >>>>>> wrote: >>>>>>> > > > >>>>>> >>>>>>> > > > >>>>>>> Hi Hongshun, >>>>>>> > > > >>>>>>> >>>>>>> > > > >>>>>>> The steps sound reasonable to me in general. In terms >>>>>>> of the >>>>>>> > > > updated >>>>>>> > > > >>>>>>> FLIP wiki, it would be good to see if we can keep the >>>>>>> protocol >>>>>>> > > > simple. One >>>>>>> > > > >>>>>>> alternative way to achieve this behavior is following: >>>>>>> > > > >>>>>>> >>>>>>> > > > >>>>>>> 1. Upon SourceOperator startup, the SourceOperator >>>>>>> sends >>>>>>> > > > >>>>>>> ReaderRegistrationEvent with the currently assigned >>>>>>> splits to >>>>>>> > the >>>>>>> > > > >>>>>>> enumerator. It does not add these splits to the >>>>>>> SourceReader. >>>>>>> > > > >>>>>>> 2. The enumerator will always use the >>>>>>> > > > >>>>>>> SourceEnumeratorContext.assignSplits() to assign the >>>>>>> splits. >>>>>>> > (not >>>>>>> > > > via the >>>>>>> > > > >>>>>>> response of the SourceRegistrationEvent, this allows >>>>>>> async >>>>>>> > split >>>>>>> > > > assignment >>>>>>> > > > >>>>>>> in case the enumerator wants to wait until all the >>>>>>> readers are >>>>>>> > > > registered) >>>>>>> > > > >>>>>>> 3. The SourceOperator will only call >>>>>>> SourceReader.addSplits() >>>>>>> > when >>>>>>> > > > >>>>>>> it receives the AddSplitEvent from the enumerator. >>>>>>> > > > >>>>>>> >>>>>>> > > > >>>>>>> This protocol has a few benefits: >>>>>>> > > > >>>>>>> 1. it basically allows arbitrary split reassignment >>>>>>> upon >>>>>>> > restart >>>>>>> > > > >>>>>>> 2. simplicity: there is only one way to assign splits. >>>>>>> > > > >>>>>>> >>>>>>> > > > >>>>>>> So we only need one interface change: >>>>>>> > > > >>>>>>> - add the initially assigned splits to ReaderInfo so >>>>>>> the >>>>>>> > Enumerator >>>>>>> > > > >>>>>>> can access it. >>>>>>> > > > >>>>>>> and one behavior change: >>>>>>> > > > >>>>>>> - The SourceOperator should stop assigning splits to >>>>>>> the from >>>>>>> > state >>>>>>> > > > >>>>>>> restoration, but >>>>>>> > [message truncated...] >>>>>>> > >>>>>>> >>>>>> >>>> >>
