Thanks Becket and Hongshun for the insightful discussion. The underlying implementation and communication mechanisms of Flink Source indeed involve many intricate details, we discussed the issue of splits re-assignment in specific scenarios, but fortunately, the final decision turned out to be pretty clear.
+1 to Becket’s proposal to keeps the framework cleaner and more flexible. +1 to Hongshun’s point to provide comprehensive guidance for connector developers. Best, Leonard > 2025 9月 26 16:30,Hongshun Wang <loserwang1...@gmail.com> 写道: > > Hi Becket, > > I Got it. You’re suggesting we should not handle this in the source framework > but instead let the split enumerator manage these three scenarios. > > Let me explain why I originally favored handling it in the framework: I'm > concerned that connector developers might overlook certain edge cases (after > all, we even payed extensive discussions to fully clarify the logic) > > However, your point keeps the framework cleaner and more flexible. Thus, I > will take it. > > Perhaps, in this FLIP, we should focus on providing comprehensive guidance > for connector developers: explain how to implement a split enumerator, > including the underlying challenges and their solutions. > > Additionally, we can use the Kafka connector as a reference implementation to > demonstrate the practical steps. This way, developers who want to implement > similar connectors can directly reference this example. > > > Best, > Hongshun > > > > On Fri, Sep 26, 2025 at 1:27 PM Becket Qin <becket....@gmail.com > <mailto:becket....@gmail.com>> wrote: >> It would be good to not expose runtime details to the source implementation >> if possible. >> >> Today, the split enumerator implementations are expected to track the split >> assignment. >> >> Assuming the split enumerator implementation keeps a split assignment map, >> that means the enumerator should already know whether a split is assigned or >> unassigned. So it can handle the three scenarios you mentioned. >> >>> The split is reported by a reader during a global restoration. >> The split enumerator should have just been restored / created. If the >> enumerator expects a full reassignment of splits up on global recovery, >> there should be no assigned splits to that reader in the split assignment >> mapping. >> >>> The split is reported by a reader during a partial failure recovery. >> In this case, when SplitEnumerator.addReader() is invoked, the split >> assignment map in the enumerator implementation should already have some >> split assignments for the reader. Therefore it is a partial failover. If the >> source supports split reassignment on recovery, the enumerator can assign >> splits that are different from the reported assignment of that reader in the >> SplitEnumeratorContext, or it can also assign the same splits. In any case, >> the enumerator knows that this is a partial recovery because the assignment >> map is non-empty. >> >>> The split is not reported by a reader, but is assigned after the last >>> successful checkpoint and was never acknowledged. >> This is actually one of the step in the partial failure recover. >> SplitEnumerator.addSplitsBack() will be called first before >> SplitReader.addReader() is called for the recovered reader. When the >> SplitEnumerator.addSplitsBack() is invoked, it is for sure a partial >> recovery. And the enumerator should remove these splits from the split >> assignment map as if they were never assigned. >> >> I think this should work, right? >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> On Thu, Sep 25, 2025 at 8:34 PM Hongshun Wang <loserwang1...@gmail.com >> <mailto:loserwang1...@gmail.com>> wrote: >>> Hi Becket and Leonard, >>> >>> Thanks for your advice. >>> >>> > put all the reader information in the SplitEnumerator context >>> I have a concern: the current registeredReaders in SourceCoordinatorContext >>> will be removed after subtaskResetor execution on failure.However, this >>> approach has merit. >>> >>> One more situation I found my previous design does not cover: >>> Initial state: Reader A reports splits (1, 2). >>> Enumerator action: Assigns split 1 to Reader A, and split 2 to Reader B. >>> Failure scenario: Reader A fails before checkpointing. Since this is a >>> partial failure, only Reader A restarts. >>> Recovery issue: Upon recovery, Reader A re-reports split (1). >>> In my previous design, the enumerator will ignore Reader A's >>> re-registration which will cause data loss. >>> >>> Thus, when the enumerator receives a split, the split may originate from >>> three scenarios: >>> The split is reported by a reader during a global restoration. >>> The split is reported by a reader during a partial failure recovery. >>> The split is not reported by a reader, but is assigned after the last >>> successful checkpoint and was never acknowledged. >>> In the first scenario (global restore), the split should be re-distributed. >>> For the latter two scenarios (partial failover and post-checkpoint >>> assignment), we need to reassign the split to its originally assigned >>> subtask. >>> >>> By implementing a method in the SplitEnumerator context to track each >>> assigned split's status, the system can correctly identify and resolve >>> split ownership in all three scenarios.What about adding a >>> `SplitRecoveryType splitRecoveryType(Split split)` in >>> SplitEnumeratorContext. SplitRecoveryTypeis a enum including >>> `UNASSIGNED`、`GLOBAL_RESTORE`、`PARTIAL_FAILOVER` and >>> `POST_CHECKPOINT_ASSIGNMENT`. >>> >>> What do you think? Are there any details or scenarios I haven't considered? >>> Looking forward to your advice. >>> >>> Best, >>> Hongshun >>> >>> On Thu, Sep 11, 2025 at 12:41 AM Becket Qin <becket....@gmail.com >>> <mailto:becket....@gmail.com>> wrote: >>>> Thanks for the explanation, Hongshun. >>>> >>>> Current pattern of handling new reader registration following: >>>> 1. put all the reader information in the SplitEnumerator context >>>> 2. notify the enumerator about the new reader registration. >>>> 3. Let the split enumerator get whatever information it wants from the >>>> context and do its job. >>>> This pattern decouples the information passing and the reader registration >>>> notification. This makes the API extensible - we can add more information >>>> (e.g. reported assigned splits in our case) about the reader to the context >>>> without introducing new methods. >>>> >>>> Introducing a new method of addSplitBackOnRecovery() is redundant to the >>>> above pattern. Do we really need it? >>>> >>>> Thanks, >>>> >>>> Jiangjie (Becket) Qin >>>> >>>> On Mon, Sep 8, 2025 at 8:18 PM Hongshun Wang <loserwang1...@gmail.com >>>> <mailto:loserwang1...@gmail.com>> >>>> wrote: >>>> >>>> > Hi Becket, >>>> > >>>> > > I am curious what would the enumerator do differently for the splits >>>> > added via addSplitsBackOnRecovery() V.S. addSplitsBack()? >>>> > >>>> > In this FLIP, there are two distinct scenarios in which the enumerator >>>> > receives splits being added back: >>>> > 1. Job-level restore: The job is restored, splits from reader’s state >>>> > are >>>> > reported by ReaderRegistrationEvent. >>>> > 2. Reader-level restart: a reader is started but not the whole job, >>>> > splits assigned to it after the last successful checkpoint. This is what >>>> > addSplitsBack used to do. >>>> > >>>> > >>>> > In these two situations, the enumerator will choose different strategies. >>>> > 1. Job-level restore: the splits should be redistributed across readers >>>> > according to the current partitioner strategy. >>>> > 2. Reader-level restart: the splits should be reassigned directly back to >>>> > the same reader they were originally assigned to, preserving locality and >>>> > avoiding unnecessary redistribution >>>> > >>>> > Therefore, the enumerator must clearly distinguish between these two >>>> > scenarios.I used to deprecate the former addSplitsBack(List<SplitT> >>>> > splits, int subtaskId) but add a new addSplitsBack(List<SplitT> >>>> > splits, int subtaskId, >>>> > boolean reportedByReader). >>>> > Leonard suggest to use another method addSplitsBackOnRecovery but not >>>> > influenced currently addSplitsBack. >>>> > >>>> > Best >>>> > Hongshun >>>> > >>>> > >>>> > >>>> > On 2025/09/08 17:20:31 Becket Qin wrote: >>>> > > Hi Leonard, >>>> > > >>>> > > >>>> > > > Could we introduce a new method like addSplitsBackOnRecovery with >>>> > default >>>> > > > implementation. In this way, we can provide better backward >>>> > compatibility >>>> > > > and also makes it easier for developers to understand. >>>> > > >>>> > > >>>> > > I am curious what would the enumerator do differently for the splits >>>> > added >>>> > > via addSplitsBackOnRecovery() V.S. addSplitsBack()? Today, >>>> > addSplitsBack() >>>> > > is also only called upon recovery. So the new method seems confusing. >>>> > > One >>>> > > thing worth clarifying is if the Source implements >>>> > > SupportSplitReassignmentOnRecovery, upon recovery, should the splits >>>> > > reported by the readers also be added back to the SplitEnumerator via >>>> > > the >>>> > > addSplitsBack() call? Or should the SplitEnumerator explicitly query >>>> > > the >>>> > > registered reader information via the SplitEnumeratorContext to get the >>>> > > originally assigned splits when addReader() is invoked? I was assuming >>>> > the >>>> > > latter in the beginning, so the behavior of addSplitsBack() remains >>>> > > unchanged, but I am not opposed in doing the former. >>>> > > >>>> > > Also, can you elaborate on the backwards compatibility issue you see if >>>> > we >>>> > > do not have a separate addSplitsBackOnRecovery() method? Even without >>>> > this >>>> > > new method, the behavior remains exactly the same unless the end users >>>> > > implement the mix-in interface of "SupportSplitReassignmentOnRecovery", >>>> > > right? >>>> > > >>>> > > Thanks, >>>> > > >>>> > > Jiangjie (Becket) Qin >>>> > > >>>> > > On Mon, Sep 8, 2025 at 1:48 AM Hongshun Wang <lo...@gmail.com >>>> > > <mailto:lo...@gmail.com>> >>>> > > wrote: >>>> > > >>>> > > > Hi devs, >>>> > > > >>>> > > > It has been quite some time since this FLIP[1] was first proposed. >>>> > Thank >>>> > > > you for your valuable feedback—based on your suggestions, the FLIP >>>> > > > has >>>> > > > undergone several rounds of revisions. >>>> > > > >>>> > > > Any more advice is welcome and appreciated. If there are no further >>>> > > > concerns, I plan to start the vote tomorrow. >>>> > > > >>>> > > > Best >>>> > > > Hongshun >>>> > > > >>>> > > > [1] >>>> > > > >>>> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=373886480 >>>> > > > >>>> > > > On Mon, Sep 8, 2025 at 4:42 PM Hongshun Wang <lo...@gmail.com >>>> > > > <mailto:lo...@gmail.com>> >>>> > > > wrote: >>>> > > > >>>> > > > > Hi Leonard, >>>> > > > > Thanks for your advice. It makes sense and I have modified it. >>>> > > > > >>>> > > > > Best, >>>> > > > > Hongshun >>>> > > > > >>>> > > > > On Mon, Sep 8, 2025 at 11:40 AM Leonard Xu <xb...@gmail.com >>>> > > > > <mailto:xb...@gmail.com>> wrote: >>>> > > > > >>>> > > > >> Thanks Hongshun and Becket for the deep discussion. >>>> > > > >> >>>> > > > >> I only have one comment for one API design: >>>> > > > >> >>>> > > > >> > Deprecate the old addSplitsBack method, use a addSplitsBack >>>> > > > >> > with >>>> > > > >> param isReportedByReader instead. Because, The enumerator can >>>> > > > >> apply >>>> > > > >> different reassignment policies based on the context. >>>> > > > >> >>>> > > > >> Could we introduce a new method like *addSplitsBackOnRecovery* >>>> > > > >> with >>>> > > > default >>>> > > > >> implementation. In this way, we can provide better backward >>>> > > > >> compatibility and also makes it easier for developers to >>>> > > > >> understand. >>>> > > > >> >>>> > > > >> Best, >>>> > > > >> Leonard >>>> > > > >> >>>> > > > >> >>>> > > > >> >>>> > > > >> 2025 9月 3 20:26,Hongshun Wang <lo...@gmail.com >>>> > > > >> <mailto:lo...@gmail.com>> 写道: >>>> > > > >> >>>> > > > >> Hi Becket, >>>> > > > >> >>>> > > > >> I think that's a great idea! I have added the >>>> > > > >> SupportSplitReassignmentOnRecovery interface in this FLIP. If a >>>> > Source >>>> > > > >> implements this interface indicates that the source operator needs >>>> > to >>>> > > > >> report splits to the enumerator and receive reassignment.[1] >>>> > > > >> >>>> > > > >> Best, >>>> > > > >> Hongshun >>>> > > > >> >>>> > > > >> [1] >>>> > > > >> >>>> > > > >>>> > >>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment >>>> > > > >> >>>> > > > >> On Thu, Aug 21, 2025 at 12:09 PM Becket Qin <be...@gmail.com >>>> > > > >> <mailto:be...@gmail.com>> >>>> > > > wrote: >>>> > > > >> >>>> > > > >>> Hi Hongshun, >>>> > > > >>> >>>> > > > >>> I think the convention for such optional features in Source is >>>> > > > >>> via >>>> > > > >>> mix-in interfaces. So instead of adding a method to the >>>> > SourceReader, >>>> > > > maybe >>>> > > > >>> we should introduce an interface >>>> > > > >>> SupportSplitReassingmentOnRecovery >>>> > > > with >>>> > > > >>> this method. If a Source implementation implements that >>>> > > > >>> interface, >>>> > > > then the >>>> > > > >>> SourceOperator will check the desired behavior and act >>>> > > > >>> accordingly. >>>> > > > >>> >>>> > > > >>> Thanks, >>>> > > > >>> >>>> > > > >>> Jiangjie (Becket) Qin >>>> > > > >>> >>>> > > > >>> On Wed, Aug 20, 2025 at 8:52 PM Hongshun Wang < >>>> > loserwang1...@gmail.com <mailto:loserwang1...@gmail.com> >>>> > > > > >>>> > > > >>> wrote: >>>> > > > >>> >>>> > > > >>>> Hi de vs, >>>> > > > >>>> >>>> > > > >>>> Would anyone like to discuss this FLIP? I'd appreciate your >>>> > feedback >>>> > > > >>>> and suggestions. >>>> > > > >>>> >>>> > > > >>>> Best, >>>> > > > >>>> Hongshun >>>> > > > >>>> >>>> > > > >>>> >>>> > > > >>>> 2025年8月13日 14:23,Hongshun Wang <lo...@gmail.com >>>> > > > >>>> <mailto:lo...@gmail.com>> 写道: >>>> > > > >>>> >>>> > > > >>>> Hi Becket, >>>> > > > >>>> >>>> > > > >>>> Thank you for your detailed feedback. The new contract makes >>>> > > > >>>> good >>>> > > > sense >>>> > > > >>>> to me and effectively addresses the issues I encountered at the >>>> > > > beginning >>>> > > > >>>> of the design. >>>> > > > >>>> >>>> > > > >>>> That said, I recommend not reporting splits by default, >>>> > > > >>>> primarily >>>> > for >>>> > > > >>>> compatibility and practical reasons: >>>> > > > >>>> >>>> > > > >>>> > For these reasons, we do not expect the Split objects to be >>>> > huge, >>>> > > > >>>> and we are not trying to design for huge Split objects either as >>>> > they >>>> > > > will >>>> > > > >>>> have problems even today. >>>> > > > >>>> >>>> > > > >>>> 1. >>>> > > > >>>> >>>> > > > >>>> Not all existing connector match this rule >>>> > > > >>>> For example, in mysql cdc connector, a binlog split may >>>> > > > >>>> contain >>>> > > > >>>> hundreds (or even more) snapshot split completion records. >>>> > > > >>>> This >>>> > > > state is >>>> > > > >>>> large and is currently transmitted incrementally through >>>> > multiple >>>> > > > >>>> BinlogSplitMetaEvent messages. Since the binlog reader >>>> > > > >>>> operates >>>> > > > >>>> with single parallelism, reporting the full split state on >>>> > recovery >>>> > > > >>>> could be inefficient or even infeasible. >>>> > > > >>>> For such sources, it would be better to provide a mechanism >>>> > > > >>>> to >>>> > skip >>>> > > > >>>> split reporting during restart until they redesign and reduce >>>> > the >>>> > > > >>>> split size. >>>> > > > >>>> 2. >>>> > > > >>>> >>>> > > > >>>> Not all enumerators maintain unassigned splits in state. >>>> > > > >>>> Some SplitEnumerator(such as kafka connector) implementations >>>> > do >>>> > > > >>>> not track or persistently manage unassigned splits. Requiring >>>> > them >>>> > > > to >>>> > > > >>>> handle re-registration would add unnecessary complexity. Even >>>> > > > though we >>>> > > > >>>> maybe implements in kafka connector, currently, kafka >>>> > > > >>>> connector >>>> > is >>>> > > > decouple >>>> > > > >>>> with flink version, we also need to make sure the elder >>>> > > > >>>> version >>>> > is >>>> > > > >>>> compatible. >>>> > > > >>>> >>>> > > > >>>> ------------------------------ >>>> > > > >>>> >>>> > > > >>>> To address these concerns, I propose introducing a new method: >>>> > boolean >>>> > > > >>>> SourceReader#shouldReassignSplitsOnRecovery() with a default >>>> > > > >>>> implementation returning false. This allows source readers to >>>> > > > >>>> opt >>>> > in >>>> > > > >>>> to split reassignment only when necessary. Since the new >>>> > > > >>>> contract >>>> > > > already >>>> > > > >>>> places the responsibility for split assignment on the >>>> > > > >>>> enumerator, >>>> > not >>>> > > > >>>> reporting splits by default is a safe and clean default >>>> > > > >>>> behavior. >>>> > > > >>>> >>>> > > > >>>> >>>> > > > >>>> ------------------------------ >>>> > > > >>>> >>>> > > > >>>> >>>> > > > >>>> I’ve updated the implementation and the FIP accordingly[1]. It >>>> > quite a >>>> > > > >>>> big change. In particular, for the Kafka connector, we can now >>>> > > > >>>> use >>>> > a >>>> > > > >>>> pluggable SplitPartitioner to support different split assignment >>>> > > > >>>> strategies (e.g., default, round-robin). >>>> > > > >>>> >>>> > > > >>>> >>>> > > > >>>> Could you please review it when you have a chance? >>>> > > > >>>> >>>> > > > >>>> >>>> > > > >>>> Best, >>>> > > > >>>> >>>> > > > >>>> Hongshun >>>> > > > >>>> >>>> > > > >>>> >>>> > > > >>>> [1] >>>> > > > >>>> >>>> > > > >>>> > >>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment >>>> > > > >>>> >>>> > > > >>>> On Sat, Aug 9, 2025 at 3:03 AM Becket Qin <be...@gmail.com >>>> > > > >>>> <mailto:be...@gmail.com>> >>>> > > > wrote: >>>> > > > >>>> >>>> > > > >>>>> Hi Hongshun, >>>> > > > >>>>> >>>> > > > >>>>> I am not too concerned about the transmission cost. Because the >>>> > full >>>> > > > >>>>> split transmission has to happen in the initial assignment >>>> > > > >>>>> phase >>>> > > > already. >>>> > > > >>>>> And in the future, we probably want to also introduce some kind >>>> > of >>>> > > > workload >>>> > > > >>>>> balance across source readers, e.g. based on the per-split >>>> > > > throughput or >>>> > > > >>>>> the per-source-reader workload in heterogeneous clusters. For >>>> > these >>>> > > > >>>>> reasons, we do not expect the Split objects to be huge, and we >>>> > are >>>> > > > not >>>> > > > >>>>> trying to design for huge Split objects either as they will >>>> > > > >>>>> have >>>> > > > problems >>>> > > > >>>>> even today. >>>> > > > >>>>> >>>> > > > >>>>> Good point on the potential split loss, please see the reply >>>> > below: >>>> > > > >>>>> >>>> > > > >>>>> Scenario 2: >>>> > > > >>>>> >>>> > > > >>>>> >>>> > > > >>>>>> 1. Reader A reports splits (1 and 2), and Reader B reports (3 >>>> > and 4) >>>> > > > >>>>>> upon restart. >>>> > > > >>>>>> 2. Before the enumerator receives all reports and performs >>>> > > > >>>>>> reassignment, a checkpoint is triggered. >>>> > > > >>>>>> 3. Since no splits have been reassigned yet, both readers have >>>> > empty >>>> > > > >>>>>> states. >>>> > > > >>>>>> 4. When restarting from this checkpoint, all four splits are >>>> > lost. >>>> > > > >>>>> >>>> > > > >>>>> The reader registration happens in the SourceOperator.open(), >>>> > which >>>> > > > >>>>> means the task is still in the initializing state, therefore >>>> > > > >>>>> the >>>> > > > checkpoint >>>> > > > >>>>> should not be triggered until the enumerator receives all the >>>> > split >>>> > > > reports. >>>> > > > >>>>> >>>> > > > >>>>> There is a nuance here. Today, the RPC call from the TM to the >>>> > > > >>>>> JM >>>> > is >>>> > > > >>>>> async. So it is possible that the SourceOpertor.open() has >>>> > returned, >>>> > > > but >>>> > > > >>>>> the enumerator has not received the split reports. However, >>>> > because >>>> > > > the >>>> > > > >>>>> task status update RPC call goes to the same channel as the >>>> > > > >>>>> split >>>> > > > reports >>>> > > > >>>>> call, so the task status RPC call will happen after the split >>>> > > > reports call >>>> > > > >>>>> on the JM side. Therefore, on the JM side, the >>>> > > > >>>>> SourceCoordinator >>>> > will >>>> > > > >>>>> always first receive the split reports, then receive the >>>> > checkpoint >>>> > > > request. >>>> > > > >>>>> This "happen before" relationship is kind of important to >>>> > guarantee >>>> > > > >>>>> the consistent state between enumerator and readers. >>>> > > > >>>>> >>>> > > > >>>>> Scenario 1: >>>> > > > >>>>> >>>> > > > >>>>> >>>> > > > >>>>>> 1. Upon restart, Reader A reports assigned splits (1 and 2), >>>> > > > >>>>>> and >>>> > > > >>>>>> Reader B reports (3 and 4). >>>> > > > >>>>>> 2. The enumerator receives these reports but only reassigns >>>> > splits 1 >>>> > > > >>>>>> and 2 — not 3 and 4. >>>> > > > >>>>>> 3. A checkpoint or savepoint is then triggered. Only splits 1 >>>> > and 2 >>>> > > > >>>>>> are recorded in the reader states; splits 3 and 4 are not >>>> > persisted. >>>> > > > >>>>>> 4. If the job is later restarted from this checkpoint, splits >>>> > > > >>>>>> 3 >>>> > and >>>> > > > 4 >>>> > > > >>>>>> will be permanently lost. >>>> > > > >>>>> >>>> > > > >>>>> This scenario is possible. One solution is to let the >>>> > > > >>>>> enumerator >>>> > > > >>>>> implementation handle this. That means if the enumerator relies >>>> > on >>>> > > > the >>>> > > > >>>>> initial split reports from the source readers, it should >>>> > > > >>>>> maintain >>>> > > > these >>>> > > > >>>>> reports by itself. In the above example, the enumerator will >>>> > > > >>>>> need >>>> > to >>>> > > > >>>>> remember that 3 and 4 are not assigned and put it into its own >>>> > state. >>>> > > > >>>>> The current contract is that anything assigned to the >>>> > SourceReaders >>>> > > > >>>>> are completely owned by the SourceReaders. Enumerators can >>>> > remember >>>> > > > the >>>> > > > >>>>> assignments but cannot change them, even when the source reader >>>> > > > recovers / >>>> > > > >>>>> restarts. >>>> > > > >>>>> With this FLIP, the contract becomes that the source readers >>>> > > > >>>>> will >>>> > > > >>>>> return the ownership of the splits to the enumerator. So the >>>> > > > enumerator is >>>> > > > >>>>> responsible for maintaining these splits, until they are >>>> > > > >>>>> assigned >>>> > to >>>> > > > a >>>> > > > >>>>> source reader again. >>>> > > > >>>>> >>>> > > > >>>>> There are other cases where there may be conflict information >>>> > between >>>> > > > >>>>> reader and enumerator. For example, consider the following >>>> > sequence: >>>> > > > >>>>> 1. reader A reports splits (1 and 2) up on restart. >>>> > > > >>>>> 2. enumerator receives the report and assigns both 1 and 2 to >>>> > reader >>>> > > > B. >>>> > > > >>>>> 3. reader A failed before checkpointing. And this is a partial >>>> > > > >>>>> failure, so only reader A restarts. >>>> > > > >>>>> 4. When reader A recovers, it will again report splits (1 and >>>> > > > >>>>> 2) >>>> > to >>>> > > > >>>>> the enumerator. >>>> > > > >>>>> 5. The enumerator should ignore this report because it has >>>> > > > >>>>> assigned splits (1 and 2) to reader B. >>>> > > > >>>>> >>>> > > > >>>>> So with the new contract, the enumerator should be the source >>>> > > > >>>>> of >>>> > > > truth >>>> > > > >>>>> for split ownership. >>>> > > > >>>>> >>>> > > > >>>>> Thanks, >>>> > > > >>>>> >>>> > > > >>>>> Jiangjie (Becket) Qin >>>> > > > >>>>> >>>> > > > >>>>> On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang < >>>> > > > loserwang1...@gmail.com <mailto:loserwang1...@gmail.com>> >>>> > > > >>>>> wrote: >>>> > > > >>>>> >>>> > > > >>>>>> Hi Becket, >>>> > > > >>>>>> >>>> > > > >>>>>> >>>> > > > >>>>>> I did consider this approach at the beginning (and it was also >>>> > > > >>>>>> mentioned in this FLIP), since it would allow more flexibility >>>> > in >>>> > > > >>>>>> reassigning all splits. However, there are a few potential >>>> > issues. >>>> > > > >>>>>> >>>> > > > >>>>>> 1. High Transmission Cost >>>> > > > >>>>>> If we pass the full split objects (rather than just split >>>> > > > >>>>>> IDs), >>>> > the >>>> > > > >>>>>> data size could be significant, leading to high overhead >>>> > > > >>>>>> during >>>> > > > >>>>>> transmission — especially when many splits are involved. >>>> > > > >>>>>> >>>> > > > >>>>>> 2. Risk of Split Loss >>>> > > > >>>>>> >>>> > > > >>>>>> Risk of split loss exists unless we have a mechanism to make >>>> > sure >>>> > > > >>>>>> only can checkpoint after all the splits are reassigned. >>>> > > > >>>>>> There are scenarios where splits could be lost due to >>>> > inconsistent >>>> > > > >>>>>> state handling during recovery: >>>> > > > >>>>>> >>>> > > > >>>>>> >>>> > > > >>>>>> Scenario 1: >>>> > > > >>>>>> >>>> > > > >>>>>> >>>> > > > >>>>>> 1. Upon restart, Reader A reports assigned splits (1 and >>>> > > > >>>>>> 2), >>>> > and >>>> > > > >>>>>> Reader B reports (3 and 4). >>>> > > > >>>>>> 2. The enumerator receives these reports but only reassigns >>>> > > > >>>>>> splits 1 and 2 — not 3 and 4. >>>> > > > >>>>>> 3. A checkpoint or savepoint is then triggered. Only >>>> > > > >>>>>> splits 1 >>>> > and >>>> > > > >>>>>> 2 are recorded in the reader states; splits 3 and 4 are not >>>> > > > persisted. >>>> > > > >>>>>> 4. If the job is later restarted from this checkpoint, >>>> > > > >>>>>> splits >>>> > 3 >>>> > > > >>>>>> and 4 will be permanently lost. >>>> > > > >>>>>> >>>> > > > >>>>>> >>>> > > > >>>>>> Scenario 2: >>>> > > > >>>>>> >>>> > > > >>>>>> 1. Reader A reports splits (1 and 2), and Reader B reports >>>> > > > >>>>>> (3 >>>> > and >>>> > > > >>>>>> 4) upon restart. >>>> > > > >>>>>> 2. Before the enumerator receives all reports and performs >>>> > > > >>>>>> reassignment, a checkpoint is triggered. >>>> > > > >>>>>> 3. Since no splits have been reassigned yet, both readers >>>> > have >>>> > > > >>>>>> empty states. >>>> > > > >>>>>> 4. When restarting from this checkpoint, all four splits >>>> > > > >>>>>> are >>>> > > > lost. >>>> > > > >>>>>> >>>> > > > >>>>>> >>>> > > > >>>>>> Let me know if you have thoughts on how we might mitigate >>>> > > > >>>>>> these >>>> > > > risks! >>>> > > > >>>>>> >>>> > > > >>>>>> Best >>>> > > > >>>>>> Hongshun >>>> > > > >>>>>> >>>> > > > >>>>>> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin <be...@gmail.com >>>> > > > >>>>>> <mailto:be...@gmail.com>> >>>> > > > >>>>>> wrote: >>>> > > > >>>>>> >>>> > > > >>>>>>> Hi Hongshun, >>>> > > > >>>>>>> >>>> > > > >>>>>>> The steps sound reasonable to me in general. In terms of the >>>> > > > updated >>>> > > > >>>>>>> FLIP wiki, it would be good to see if we can keep the >>>> > > > >>>>>>> protocol >>>> > > > simple. One >>>> > > > >>>>>>> alternative way to achieve this behavior is following: >>>> > > > >>>>>>> >>>> > > > >>>>>>> 1. Upon SourceOperator startup, the SourceOperator sends >>>> > > > >>>>>>> ReaderRegistrationEvent with the currently assigned splits to >>>> > the >>>> > > > >>>>>>> enumerator. It does not add these splits to the SourceReader. >>>> > > > >>>>>>> 2. The enumerator will always use the >>>> > > > >>>>>>> SourceEnumeratorContext.assignSplits() to assign the splits. >>>> > (not >>>> > > > via the >>>> > > > >>>>>>> response of the SourceRegistrationEvent, this allows async >>>> > split >>>> > > > assignment >>>> > > > >>>>>>> in case the enumerator wants to wait until all the readers >>>> > > > >>>>>>> are >>>> > > > registered) >>>> > > > >>>>>>> 3. The SourceOperator will only call SourceReader.addSplits() >>>> > when >>>> > > > >>>>>>> it receives the AddSplitEvent from the enumerator. >>>> > > > >>>>>>> >>>> > > > >>>>>>> This protocol has a few benefits: >>>> > > > >>>>>>> 1. it basically allows arbitrary split reassignment upon >>>> > restart >>>> > > > >>>>>>> 2. simplicity: there is only one way to assign splits. >>>> > > > >>>>>>> >>>> > > > >>>>>>> So we only need one interface change: >>>> > > > >>>>>>> - add the initially assigned splits to ReaderInfo so the >>>> > Enumerator >>>> > > > >>>>>>> can access it. >>>> > > > >>>>>>> and one behavior change: >>>> > > > >>>>>>> - The SourceOperator should stop assigning splits to the from >>>> > state >>>> > > > >>>>>>> restoration, but >>>> > [message truncated...] >>>> >