Hi Becket and Leonard, It seems adding `splitsOnRecovery` to `ReaderInfo` makes the split enumerator simpler and cleaner.
I have modified this FLIP again. Please have a look and let me know what you think. Best, Hongshun On Mon, Oct 13, 2025 at 10:48 AM Hongshun Wang <[email protected]> wrote: > Hi Becket, > Thanks for your explanation. > > > For the same three input above, the assignment should be consistently > the same. > > That is exactly what troubles me. For *assignment algorithms such as > hash, it does behave the same. What If we use round-robin? Each *the *reader > information, the same split will be assigned to different readers. There is > also what I used to list as an example.* > > 1. *Initial state:*: 2 parallelism, 2 splits. > 2. *Enumerator action:* Split 1 → Task 1, Split 2 → Task 2 , , > 3. *Failure scenario: *After Split 2 is assigned to Task 2 but before > next checkpoint success, task 1 restarts. > 4. *Recovery issue:* Split 2 is re-added to the enumerator. > Round-robin strategy assigns Split 2 to Task 1. Then Task 1 now has 2 > splits, Task 2 has 0 → Imbalanced distribution. > > > > Please let me know if you think a meeting would be more efficient. > Yes, I’d like to reach an agreement as soon as possible. If you’re > available, we could schedule a meeting with Lenenord as well. > > Best, > Hongshun > > On Sat, Oct 11, 2025 at 3:59 PM Becket Qin <[email protected]> wrote: > >> Hi Hongshun, >> >> I am confused. First of all, regardless of what the assignment algorithm >> is. Using SplitEnumeratorContext to return the splits only gives more >> information than using addSplitsBack(). So there should be no regression. >> >> Secondly, at this point. The SplitEnumerator should only take the >> following three input to generate the global splits assignment: >> 1. the *reader information (num readers, locations, etc)* >> 2. *all the splits to assign* >> 3. *configured assignment algorithm * >> Preferably, for the same three input above, the assignment should be >> consistently the same. I don't see why it should care about why a new >> reader is added, whether due to partial failover or global failover or job >> restart. >> >> If you want to do global redistribution on global failover and restart, >> but honor the existing assignment for partial failover. The enumerator will >> just do the following: >> 1. Generate a new global assignment (global redistribution) in start() >> because start() will only be invoked in global failover or restart. That >> means all the readers are also new with empty assignment. >> 2. After the global assignment is generated, it should be honored for the >> whole life cycle. there might be many reader registrations, again for >> different reasons but does not matter: >> - reader registration after this job restart >> - reader registration after this global failover >> - reader registration due to partial failover which may or may not >> have a addSplitsBack() call. >> Regardless of the reason, the split enumerator will just enforce the >> global assignment it has already generated, i.e. without split >> redistribution. >> >> Wouldn't that give the behavior you want? I feel the discussion somehow >> goes to circles. Please let me know if you think a meeting would be more >> efficient. >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> On Fri, Oct 10, 2025 at 7:58 PM Hongshun Wang <[email protected]> >> wrote: >> >>> Hi Becket, >>> >>> > Ignore a returned split if it has been assigned to a different reader, >>> otherwise put it back to unassigned splits / pending splits. Then the >>> enumerator assigns new splits to the newly added reader, which may use the >>> previous assignment as a reference. This should work regardless of whether >>> it is a global failover, partial failover, restart, etc. There is no need >>> for the SplitEnumerator to distinguish what failover scenario it is. >>> >>> In this case, it seems that global failover and partial failover share >>> the same distribution strategy If it has not been assigned to a different >>> reader. However, global failover needs to be redistributed(this is why we >>> need this FLIP) , while partial failover is not. I have no idea how we >>> distinguish them. >>> >>> What do you think? >>> >>> Best, >>> Hongshun >>> >>> On Sat, Oct 11, 2025 at 12:54 AM Becket Qin <[email protected]> >>> wrote: >>> >>>> Hi Hongshun, >>>> >>>> The problem we are trying to solve here is to give the splits back to >>>> the SplitEnumerator. There are only two types of splits to give back: >>>> 1) splits whose assignment has been checkpointed. - In this case, we >>>> rely on addReader() + SplitEnumeratorContext to give the splits back, this >>>> provides more information associated with those splits. >>>> 2) splits whose assignment has not been checkpointed. - In this case, >>>> we use addSplitsBack(), there is no reader info to give because the >>>> previous assignment did not take effect to begin with. >>>> >>>> From the SplitEnumerator implementation perspective, the contract is >>>> straightforward. >>>> 1. The SplitEnumerator is the source of truth for assignment. >>>> 2. When the enumerator receives the addSplits() call, it always add >>>> these splits back to unassigned splits / pending splits. >>>> 3. When the enumerator receives the addReader() call, that means the >>>> reader has no current assignment, and has returned its previous assignment >>>> based on the reader side info. The SplitEnumerator checks the >>>> SplitEnumeratorContext to retrieve the returned splits from that reader >>>> (i.e. previous assignment) and handle them according to its own source of >>>> truth knowledge of assignment - Ignore a returned split if it has been >>>> assigned to a different reader, otherwise put it back to unassigned splits >>>> / pending splits. Then the enumerator assigns new splits to the newly added >>>> reader, which may use the previous assignment as a reference. This should >>>> work regardless of whether it is a global failover, partial failover, >>>> restart, etc. There is no need for the SplitEnumerator to distinguish what >>>> failover scenario it is. >>>> >>>> Would this work? >>>> >>>> Thanks, >>>> >>>> Jiangjie (Becket) Qin >>>> >>>> On Fri, Oct 10, 2025 at 1:28 AM Hongshun Wang <[email protected]> >>>> wrote: >>>> >>>>> Hi Becket, >>>>> > why do we need to change the behavior of addSplitsBack()? Should it >>>>> remain the same? >>>>> >>>>> How does the enumerator get the splits from ReaderRegistrationEvent >>>>> and then reassign it? >>>>> >>>>> You have given a advice before: >>>>> > 1. Put all the reader information in the SplitEnumerator context. >>>>> 2. notify the enumerator about the new reader registration. 3. let the >>>>> split enumerator get whatever information it wants from the context and do >>>>> its job. >>>>> >>>>> However, each time a source task fails over, the >>>>> ConcurrentMap<Integer, ConcurrentMap<Integer, ReaderInfo>> >>>>> registeredReaders will remove this reader infos. When the source task is >>>>> registered again, it will be added again. *Thus, registeredReaders >>>>> cannot know whether is registered before. * >>>>> >>>>> Therefore, registeredReaders enumerator#addReader does not distinguish >>>>> the following situations: >>>>> However, each time one source task is failover. The >>>>> `ConcurrentMap<Integer, ConcurrentMap<Integer, ReaderInfo>> >>>>> registeredReaders` will remove this source. When source Task is registered >>>>> again, enumerator#addReader not distinguished three situations: >>>>> 1. The Reader is registered when the global restart. In this case, >>>>> redistribution the split from the infos. (take off all the splits from >>>>> ReaderInfo). >>>>> 2. The Reader is registered when a partial failover(before the first >>>>> successful checkpoint). In this case, ignore the split from the infos. >>>>> (leave alone all the splits from ReaderInfo). >>>>> 3. The Reader is registered when a partial failover(after the first >>>>> successful checkpoint).In this case, we need assign the split to same >>>>> reader again. (take off all the splits from ReaderInfo but assigned to it >>>>> again). >>>>> we still need the enumerator to distinguish them (using >>>>> pendingSplitAssignment & assignedSplitAssignment. However, it is redundant >>>>> to maintain split assigned information both in the enumerator and the >>>>> enumerator context. >>>>> >>>>> I think if we change the behavior of addSplitsBack, it will be more >>>>> simple. Just let the enumerator to handle these split based on >>>>> pendingSplitAssignment >>>>> & assignedSplitments. >>>>> >>>>> What do you think? >>>>> >>>>> Best, >>>>> Hongshun >>>>> >>>>> On Fri, Oct 10, 2025 at 12:55 PM Becket Qin <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi Hongshun, >>>>>> >>>>>> Thanks for updating the FLIP. A quick question: why do we need to >>>>>> change the behavior of addSplitsBack()? Should it remain the same? >>>>>> >>>>>> Regarding the case of restart with changed subscription. I think the >>>>>> only correct behavior is removing obsolete splits without any warning / >>>>>> exception. It is OK to add an info level logging if we want to. It is a >>>>>> clear intention if the user has explicitly changed subscription and >>>>>> restarted the job. There is no need to add a config to double confirm. >>>>>> >>>>>> Regards, >>>>>> >>>>>> Jiangjie (Becket) Qin >>>>>> >>>>>> On Thu, Oct 9, 2025 at 7:28 PM Hongshun Wang <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi Leonard, >>>>>>> >>>>>>> If the SplitEnumerator received all splits after a restart, it >>>>>>> becomes straightforward to clear and un-assign the unmatched >>>>>>> splits(checking whether matches the source options). However, a key >>>>>>> question arises: *should automatically discard obsolete splits, or >>>>>>> explicitly notify the user via an exception?* >>>>>>> >>>>>>> We provided a option `scan.partition-unsubscribe.strategy`: >>>>>>> 1. If Strict, throws an exception when encountering removed splits. >>>>>>> 2. If Lenient, automatically removes obsolete splits silently. >>>>>>> >>>>>>> What Do you think? >>>>>>> >>>>>>> Best, >>>>>>> Hongshun >>>>>>> >>>>>>> On Thu, Oct 9, 2025 at 9:37 PM Leonard Xu <[email protected]> wrote: >>>>>>> >>>>>>>> Thanks hongshun for the updating and pretty detailed analysis for >>>>>>>> edge cases, the updated FLIP looks good to me now. >>>>>>>> >>>>>>>> Only last implementation details about scenario in motivation >>>>>>>> section: >>>>>>>> >>>>>>>> *Restart with Changed subscription: During restart, if source >>>>>>>> options remove a topic or table. The splits which have already >>>>>>>> assigned can >>>>>>>> not be removed.* >>>>>>>> >>>>>>>> Could you clarify how we resolve this in Kafka connector ? >>>>>>>> >>>>>>>> Best, >>>>>>>> Leonard >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> 2025 10月 9 19:48,Hongshun Wang <[email protected]> 写道: >>>>>>>> >>>>>>>> Hi devs, >>>>>>>> If there are no further suggestions, I will start the voting >>>>>>>> tomorrow。 >>>>>>>> >>>>>>>> Best, >>>>>>>> Hongshun >>>>>>>> >>>>>>>> On Fri, Sep 26, 2025 at 7:48 PM Hongshun Wang < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> Hi Becket and Leonard, >>>>>>>>> >>>>>>>>> I have updated the content of this FLIP. The key point is that: >>>>>>>>> >>>>>>>>> When the split enumerator receives a split, *these splits must >>>>>>>>> have already existed in pendingSplitAssignment or assignedSplitments* >>>>>>>>> . >>>>>>>>> >>>>>>>>> - If the split is in pendingSplitAssignments, ignore it. >>>>>>>>> - If the split is in assignedSplitAssignments but has a >>>>>>>>> different taskId, ignore it (this indicates it was already >>>>>>>>> assigned to another task). >>>>>>>>> - If the split is in assignedSplitAssignments and shares the >>>>>>>>> same taskId, move the assignment from assignedSplitments to >>>>>>>>> pendingSplitAssignment >>>>>>>>> to re-assign again. >>>>>>>>> >>>>>>>>> >>>>>>>>> For better understanding why use these strategies. I added some >>>>>>>>> examples and pictures to show it. >>>>>>>>> >>>>>>>>> Would you like to help me check whether there are still some >>>>>>>>> problems? >>>>>>>>> >>>>>>>>> Best >>>>>>>>> Hongshun >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Sep 26, 2025 at 5:08 PM Leonard Xu <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Thanks Becket and Hongshun for the insightful discussion. >>>>>>>>>> >>>>>>>>>> The underlying implementation and communication mechanisms of >>>>>>>>>> Flink Source indeed involve many intricate details, we discussed the >>>>>>>>>> issue >>>>>>>>>> of splits re-assignment in specific scenarios, but fortunately, the >>>>>>>>>> final >>>>>>>>>> decision turned out to be pretty clear. >>>>>>>>>> >>>>>>>>>> +1 to Becket’s proposal to keeps the framework cleaner and more >>>>>>>>>> flexible. >>>>>>>>>> +1 to Hongshun’s point to provide comprehensive guidance for >>>>>>>>>> connector developers. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Leonard >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 2025 9月 26 16:30,Hongshun Wang <[email protected]> 写道: >>>>>>>>>> >>>>>>>>>> Hi Becket, >>>>>>>>>> >>>>>>>>>> I Got it. You’re suggesting we should not handle this in the >>>>>>>>>> source framework but instead let the split enumerator manage these >>>>>>>>>> three >>>>>>>>>> scenarios. >>>>>>>>>> >>>>>>>>>> Let me explain why I originally favored handling it in the >>>>>>>>>> framework: I'm concerned that connector developers might overlook >>>>>>>>>> certain >>>>>>>>>> edge cases (after all, we even payed extensive discussions to fully >>>>>>>>>> clarify >>>>>>>>>> the logic) >>>>>>>>>> >>>>>>>>>> However, your point keeps the framework cleaner and more >>>>>>>>>> flexible. Thus, I will take it. >>>>>>>>>> >>>>>>>>>> Perhaps, in this FLIP, we should focus on providing >>>>>>>>>> comprehensive guidance for connector developers: explain how to >>>>>>>>>> implement a split enumerator, including the underlying challenges >>>>>>>>>> and their >>>>>>>>>> solutions. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Additionally, we can use the Kafka connector as a reference >>>>>>>>>> implementation to demonstrate the practical steps. This way, >>>>>>>>>> developers who >>>>>>>>>> want to implement similar connectors can directly reference this >>>>>>>>>> example. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Hongshun >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Sep 26, 2025 at 1:27 PM Becket Qin <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> It would be good to not expose runtime details to the source >>>>>>>>>>> implementation if possible. >>>>>>>>>>> >>>>>>>>>>> Today, the split enumerator implementations are expected to >>>>>>>>>>> track the split assignment. >>>>>>>>>>> >>>>>>>>>>> Assuming the split enumerator implementation keeps a split >>>>>>>>>>> assignment map, that means the enumerator should already know >>>>>>>>>>> whether a >>>>>>>>>>> split is assigned or unassigned. So it can handle the three >>>>>>>>>>> scenarios you >>>>>>>>>>> mentioned. >>>>>>>>>>> >>>>>>>>>>> The split is reported by a reader during a global restoration. >>>>>>>>>>>> >>>>>>>>>>> The split enumerator should have just been restored / created. >>>>>>>>>>> If the enumerator expects a full reassignment of splits up on global >>>>>>>>>>> recovery, there should be no assigned splits to that reader in the >>>>>>>>>>> split >>>>>>>>>>> assignment mapping. >>>>>>>>>>> >>>>>>>>>>> The split is reported by a reader during a partial failure >>>>>>>>>>>> recovery. >>>>>>>>>>>> >>>>>>>>>>> In this case, when SplitEnumerator.addReader() is invoked, the >>>>>>>>>>> split assignment map in the enumerator implementation should >>>>>>>>>>> already have >>>>>>>>>>> some split assignments for the reader. Therefore it is a partial >>>>>>>>>>> failover. >>>>>>>>>>> If the source supports split reassignment on recovery, the >>>>>>>>>>> enumerator can >>>>>>>>>>> assign splits that are different from the reported assignment of >>>>>>>>>>> that >>>>>>>>>>> reader in the SplitEnumeratorContext, or it can also assign the same >>>>>>>>>>> splits. In any case, the enumerator knows that this is a partial >>>>>>>>>>> recovery >>>>>>>>>>> because the assignment map is non-empty. >>>>>>>>>>> >>>>>>>>>>> The split is not reported by a reader, but is assigned after the >>>>>>>>>>>> last successful checkpoint and was never acknowledged. >>>>>>>>>>> >>>>>>>>>>> This is actually one of the step in the partial failure recover. >>>>>>>>>>> SplitEnumerator.addSplitsBack() will be called first before >>>>>>>>>>> SplitReader.addReader() is called for the recovered reader. When the >>>>>>>>>>> SplitEnumerator.addSplitsBack() is invoked, it is for sure a partial >>>>>>>>>>> recovery. And the enumerator should remove these splits from the >>>>>>>>>>> split >>>>>>>>>>> assignment map as if they were never assigned. >>>>>>>>>>> >>>>>>>>>>> I think this should work, right? >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> >>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>>> >>>>>>>>>>> On Thu, Sep 25, 2025 at 8:34 PM Hongshun Wang < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Becket and Leonard, >>>>>>>>>>>> >>>>>>>>>>>> Thanks for your advice. >>>>>>>>>>>> >>>>>>>>>>>> > put all the reader information in the SplitEnumerator context >>>>>>>>>>>> I have a concern: the current registeredReaders in* >>>>>>>>>>>> SourceCoordinatorContext will be removed after subtaskResetor >>>>>>>>>>>> execution on >>>>>>>>>>>> failure*.However, this approach has merit. >>>>>>>>>>>> >>>>>>>>>>>> One more situation I found my previous design does not cover: >>>>>>>>>>>> >>>>>>>>>>>> 1. Initial state: Reader A reports splits (1, 2). >>>>>>>>>>>> 2. Enumerator action: Assigns split 1 to Reader A, and >>>>>>>>>>>> split 2 to Reader B. >>>>>>>>>>>> 3. Failure scenario: Reader A fails before checkpointing. >>>>>>>>>>>> Since this is a partial failure, only Reader A restarts. >>>>>>>>>>>> 4. Recovery issue: Upon recovery, Reader A re-reports split >>>>>>>>>>>> (1). >>>>>>>>>>>> >>>>>>>>>>>> In my previous design, the enumerator will ignore Reader A's >>>>>>>>>>>> re-registration which will cause data loss. >>>>>>>>>>>> >>>>>>>>>>>> Thus, when the enumerator receives a split, the split may >>>>>>>>>>>> originate from three scenarios: >>>>>>>>>>>> >>>>>>>>>>>> 1. The split is reported by a reader during a global >>>>>>>>>>>> restoration. >>>>>>>>>>>> 2. The split is reported by a reader during a partial >>>>>>>>>>>> failure recovery. >>>>>>>>>>>> 3. The split is not reported by a reader, but is assigned >>>>>>>>>>>> after the last successful checkpoint and was never acknowledged. >>>>>>>>>>>> >>>>>>>>>>>> In the first scenario (global restore), the split should >>>>>>>>>>>> be re-distributed. For the latter two scenarios (partial failover >>>>>>>>>>>> and >>>>>>>>>>>> post-checkpoint assignment), we need to reassign the split to >>>>>>>>>>>> its originally assigned subtask. >>>>>>>>>>>> >>>>>>>>>>>> By implementing a method in the SplitEnumerator context to >>>>>>>>>>>> track each assigned split's status, the system can correctly >>>>>>>>>>>> identify and >>>>>>>>>>>> resolve split ownership in all three scenarios.*What about >>>>>>>>>>>> adding a `SplitRecoveryType splitRecoveryType(Split split)` in >>>>>>>>>>>> SplitEnumeratorContext.* SplitRecoveryTypeis a enum including >>>>>>>>>>>> `UNASSIGNED`、`GLOBAL_RESTORE`、`PARTIAL_FAILOVER` and >>>>>>>>>>>> `POST_CHECKPOINT_ASSIGNMENT`. >>>>>>>>>>>> >>>>>>>>>>>> What do you think? Are there any details or scenarios I haven't >>>>>>>>>>>> considered? Looking forward to your advice. >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> Hongshun >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Sep 11, 2025 at 12:41 AM Becket Qin < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Thanks for the explanation, Hongshun. >>>>>>>>>>>>> >>>>>>>>>>>>> Current pattern of handling new reader registration following: >>>>>>>>>>>>> 1. put all the reader information in the SplitEnumerator >>>>>>>>>>>>> context >>>>>>>>>>>>> 2. notify the enumerator about the new reader registration. >>>>>>>>>>>>> 3. Let the split enumerator get whatever information it wants >>>>>>>>>>>>> from the >>>>>>>>>>>>> context and do its job. >>>>>>>>>>>>> This pattern decouples the information passing and the reader >>>>>>>>>>>>> registration >>>>>>>>>>>>> notification. This makes the API extensible - we can add more >>>>>>>>>>>>> information >>>>>>>>>>>>> (e.g. reported assigned splits in our case) about the reader >>>>>>>>>>>>> to the context >>>>>>>>>>>>> without introducing new methods. >>>>>>>>>>>>> >>>>>>>>>>>>> Introducing a new method of addSplitBackOnRecovery() is >>>>>>>>>>>>> redundant to the >>>>>>>>>>>>> above pattern. Do we really need it? >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> >>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Sep 8, 2025 at 8:18 PM Hongshun Wang < >>>>>>>>>>>>> [email protected]> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> > Hi Becket, >>>>>>>>>>>>> > >>>>>>>>>>>>> > > I am curious what would the enumerator do differently for >>>>>>>>>>>>> the splits >>>>>>>>>>>>> > added via addSplitsBackOnRecovery() V.S. addSplitsBack()? >>>>>>>>>>>>> > >>>>>>>>>>>>> > In this FLIP, there are two distinct scenarios in which the >>>>>>>>>>>>> enumerator >>>>>>>>>>>>> > receives splits being added back: >>>>>>>>>>>>> > 1. Job-level restore: The job is restored, splits from >>>>>>>>>>>>> reader’s state are >>>>>>>>>>>>> > reported by ReaderRegistrationEvent. >>>>>>>>>>>>> > 2. Reader-level restart: a reader is started but not the >>>>>>>>>>>>> whole job, >>>>>>>>>>>>> > splits assigned to it after the last successful checkpoint. >>>>>>>>>>>>> This is what >>>>>>>>>>>>> > addSplitsBack used to do. >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > In these two situations, the enumerator will choose >>>>>>>>>>>>> different strategies. >>>>>>>>>>>>> > 1. Job-level restore: the splits should be redistributed >>>>>>>>>>>>> across readers >>>>>>>>>>>>> > according to the current partitioner strategy. >>>>>>>>>>>>> > 2. Reader-level restart: the splits should be reassigned >>>>>>>>>>>>> directly back to >>>>>>>>>>>>> > the same reader they were originally assigned to, preserving >>>>>>>>>>>>> locality and >>>>>>>>>>>>> > avoiding unnecessary redistribution >>>>>>>>>>>>> > >>>>>>>>>>>>> > Therefore, the enumerator must clearly distinguish between >>>>>>>>>>>>> these two >>>>>>>>>>>>> > scenarios.I used to deprecate the former >>>>>>>>>>>>> addSplitsBack(List<SplitT> >>>>>>>>>>>>> > splits, int subtaskId) but add a new >>>>>>>>>>>>> addSplitsBack(List<SplitT> >>>>>>>>>>>>> > splits, int subtaskId, >>>>>>>>>>>>> > boolean reportedByReader). >>>>>>>>>>>>> > Leonard suggest to use another method >>>>>>>>>>>>> addSplitsBackOnRecovery but not >>>>>>>>>>>>> > influenced currently addSplitsBack. >>>>>>>>>>>>> > >>>>>>>>>>>>> > Best >>>>>>>>>>>>> > Hongshun >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > On 2025/09/08 17:20:31 Becket Qin wrote: >>>>>>>>>>>>> > > Hi Leonard, >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > > Could we introduce a new method like >>>>>>>>>>>>> addSplitsBackOnRecovery with >>>>>>>>>>>>> > default >>>>>>>>>>>>> > > > implementation. In this way, we can provide better >>>>>>>>>>>>> backward >>>>>>>>>>>>> > compatibility >>>>>>>>>>>>> > > > and also makes it easier for developers to understand. >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > I am curious what would the enumerator do differently for >>>>>>>>>>>>> the splits >>>>>>>>>>>>> > added >>>>>>>>>>>>> > > via addSplitsBackOnRecovery() V.S. addSplitsBack()? Today, >>>>>>>>>>>>> > addSplitsBack() >>>>>>>>>>>>> > > is also only called upon recovery. So the new method seems >>>>>>>>>>>>> confusing. One >>>>>>>>>>>>> > > thing worth clarifying is if the Source implements >>>>>>>>>>>>> > > SupportSplitReassignmentOnRecovery, upon recovery, should >>>>>>>>>>>>> the splits >>>>>>>>>>>>> > > reported by the readers also be added back to the >>>>>>>>>>>>> SplitEnumerator via the >>>>>>>>>>>>> > > addSplitsBack() call? Or should the SplitEnumerator >>>>>>>>>>>>> explicitly query the >>>>>>>>>>>>> > > registered reader information via the >>>>>>>>>>>>> SplitEnumeratorContext to get the >>>>>>>>>>>>> > > originally assigned splits when addReader() is invoked? I >>>>>>>>>>>>> was assuming >>>>>>>>>>>>> > the >>>>>>>>>>>>> > > latter in the beginning, so the behavior of >>>>>>>>>>>>> addSplitsBack() remains >>>>>>>>>>>>> > > unchanged, but I am not opposed in doing the former. >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > Also, can you elaborate on the backwards compatibility >>>>>>>>>>>>> issue you see if >>>>>>>>>>>>> > we >>>>>>>>>>>>> > > do not have a separate addSplitsBackOnRecovery() method? >>>>>>>>>>>>> Even without >>>>>>>>>>>>> > this >>>>>>>>>>>>> > > new method, the behavior remains exactly the same unless >>>>>>>>>>>>> the end users >>>>>>>>>>>>> > > implement the mix-in interface of >>>>>>>>>>>>> "SupportSplitReassignmentOnRecovery", >>>>>>>>>>>>> > > right? >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > Thanks, >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > Jiangjie (Becket) Qin >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > On Mon, Sep 8, 2025 at 1:48 AM Hongshun Wang < >>>>>>>>>>>>> [email protected]> >>>>>>>>>>>>> > > wrote: >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > > Hi devs, >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > It has been quite some time since this FLIP[1] was first >>>>>>>>>>>>> proposed. >>>>>>>>>>>>> > Thank >>>>>>>>>>>>> > > > you for your valuable feedback—based on your >>>>>>>>>>>>> suggestions, the FLIP has >>>>>>>>>>>>> > > > undergone several rounds of revisions. >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > Any more advice is welcome and appreciated. If there are >>>>>>>>>>>>> no further >>>>>>>>>>>>> > > > concerns, I plan to start the vote tomorrow. >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > Best >>>>>>>>>>>>> > > > Hongshun >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > [1] >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > >>>>>>>>>>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=373886480 >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > On Mon, Sep 8, 2025 at 4:42 PM Hongshun Wang < >>>>>>>>>>>>> [email protected]> >>>>>>>>>>>>> > > > wrote: >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > > Hi Leonard, >>>>>>>>>>>>> > > > > Thanks for your advice. It makes sense and I have >>>>>>>>>>>>> modified it. >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> > > > > Best, >>>>>>>>>>>>> > > > > Hongshun >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> > > > > On Mon, Sep 8, 2025 at 11:40 AM Leonard Xu < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> > > > >> Thanks Hongshun and Becket for the deep discussion. >>>>>>>>>>>>> > > > >> >>>>>>>>>>>>> > > > >> I only have one comment for one API design: >>>>>>>>>>>>> > > > >> >>>>>>>>>>>>> > > > >> > Deprecate the old addSplitsBack method, use a >>>>>>>>>>>>> addSplitsBack with >>>>>>>>>>>>> > > > >> param isReportedByReader instead. Because, The >>>>>>>>>>>>> enumerator can apply >>>>>>>>>>>>> > > > >> different reassignment policies based on the context. >>>>>>>>>>>>> > > > >> >>>>>>>>>>>>> > > > >> Could we introduce a new method like >>>>>>>>>>>>> *addSplitsBackOnRecovery* with >>>>>>>>>>>>> > > > default >>>>>>>>>>>>> > > > >> implementation. In this way, we can provide better >>>>>>>>>>>>> backward >>>>>>>>>>>>> > > > >> compatibility and also makes it easier for developers >>>>>>>>>>>>> to understand. >>>>>>>>>>>>> > > > >> >>>>>>>>>>>>> > > > >> Best, >>>>>>>>>>>>> > > > >> Leonard >>>>>>>>>>>>> > > > >> >>>>>>>>>>>>> > > > >> >>>>>>>>>>>>> > > > >> >>>>>>>>>>>>> > > > >> 2025 9月 3 20:26,Hongshun Wang <[email protected]> 写道: >>>>>>>>>>>>> > > > >> >>>>>>>>>>>>> > > > >> Hi Becket, >>>>>>>>>>>>> > > > >> >>>>>>>>>>>>> > > > >> I think that's a great idea! I have added the >>>>>>>>>>>>> > > > >> SupportSplitReassignmentOnRecovery interface in this >>>>>>>>>>>>> FLIP. If a >>>>>>>>>>>>> > Source >>>>>>>>>>>>> > > > >> implements this interface indicates that the source >>>>>>>>>>>>> operator needs >>>>>>>>>>>>> > to >>>>>>>>>>>>> > > > >> report splits to the enumerator and receive >>>>>>>>>>>>> reassignment.[1] >>>>>>>>>>>>> > > > >> >>>>>>>>>>>>> > > > >> Best, >>>>>>>>>>>>> > > > >> Hongshun >>>>>>>>>>>>> > > > >> >>>>>>>>>>>>> > > > >> [1] >>>>>>>>>>>>> > > > >> >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment >>>>>>>>>>>>> > > > >> >>>>>>>>>>>>> > > > >> On Thu, Aug 21, 2025 at 12:09 PM Becket Qin < >>>>>>>>>>>>> [email protected]> >>>>>>>>>>>>> > > > wrote: >>>>>>>>>>>>> > > > >> >>>>>>>>>>>>> > > > >>> Hi Hongshun, >>>>>>>>>>>>> > > > >>> >>>>>>>>>>>>> > > > >>> I think the convention for such optional features in >>>>>>>>>>>>> Source is via >>>>>>>>>>>>> > > > >>> mix-in interfaces. So instead of adding a method to >>>>>>>>>>>>> the >>>>>>>>>>>>> > SourceReader, >>>>>>>>>>>>> > > > maybe >>>>>>>>>>>>> > > > >>> we should introduce an interface >>>>>>>>>>>>> SupportSplitReassingmentOnRecovery >>>>>>>>>>>>> > > > with >>>>>>>>>>>>> > > > >>> this method. If a Source implementation implements >>>>>>>>>>>>> that interface, >>>>>>>>>>>>> > > > then the >>>>>>>>>>>>> > > > >>> SourceOperator will check the desired behavior and >>>>>>>>>>>>> act accordingly. >>>>>>>>>>>>> > > > >>> >>>>>>>>>>>>> > > > >>> Thanks, >>>>>>>>>>>>> > > > >>> >>>>>>>>>>>>> > > > >>> Jiangjie (Becket) Qin >>>>>>>>>>>>> > > > >>> >>>>>>>>>>>>> > > > >>> On Wed, Aug 20, 2025 at 8:52 PM Hongshun Wang < >>>>>>>>>>>>> > [email protected] >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> > > > >>> wrote: >>>>>>>>>>>>> > > > >>> >>>>>>>>>>>>> > > > >>>> Hi de vs, >>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>> > > > >>>> Would anyone like to discuss this FLIP? I'd >>>>>>>>>>>>> appreciate your >>>>>>>>>>>>> > feedback >>>>>>>>>>>>> > > > >>>> and suggestions. >>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>> > > > >>>> Best, >>>>>>>>>>>>> > > > >>>> Hongshun >>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>> > > > >>>> 2025年8月13日 14:23,Hongshun Wang <[email protected]> >>>>>>>>>>>>> 写道: >>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>> > > > >>>> Hi Becket, >>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>> > > > >>>> Thank you for your detailed feedback. The new >>>>>>>>>>>>> contract makes good >>>>>>>>>>>>> > > > sense >>>>>>>>>>>>> > > > >>>> to me and effectively addresses the issues I >>>>>>>>>>>>> encountered at the >>>>>>>>>>>>> > > > beginning >>>>>>>>>>>>> > > > >>>> of the design. >>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>> > > > >>>> That said, I recommend not reporting splits by >>>>>>>>>>>>> default, primarily >>>>>>>>>>>>> > for >>>>>>>>>>>>> > > > >>>> compatibility and practical reasons: >>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>> > > > >>>> > For these reasons, we do not expect the Split >>>>>>>>>>>>> objects to be >>>>>>>>>>>>> > huge, >>>>>>>>>>>>> > > > >>>> and we are not trying to design for huge Split >>>>>>>>>>>>> objects either as >>>>>>>>>>>>> > they >>>>>>>>>>>>> > > > will >>>>>>>>>>>>> > > > >>>> have problems even today. >>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>> > > > >>>> 1. >>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>> > > > >>>> Not all existing connector match this rule >>>>>>>>>>>>> > > > >>>> For example, in mysql cdc connector, a binlog >>>>>>>>>>>>> split may contain >>>>>>>>>>>>> > > > >>>> hundreds (or even more) snapshot split >>>>>>>>>>>>> completion records. This >>>>>>>>>>>>> > > > state is >>>>>>>>>>>>> > > > >>>> large and is currently transmitted incrementally >>>>>>>>>>>>> through >>>>>>>>>>>>> > multiple >>>>>>>>>>>>> > > > >>>> BinlogSplitMetaEvent messages. Since the binlog >>>>>>>>>>>>> reader operates >>>>>>>>>>>>> > > > >>>> with single parallelism, reporting the full >>>>>>>>>>>>> split state on >>>>>>>>>>>>> > recovery >>>>>>>>>>>>> > > > >>>> could be inefficient or even infeasible. >>>>>>>>>>>>> > > > >>>> For such sources, it would be better to provide >>>>>>>>>>>>> a mechanism to >>>>>>>>>>>>> > skip >>>>>>>>>>>>> > > > >>>> split reporting during restart until they >>>>>>>>>>>>> redesign and reduce >>>>>>>>>>>>> > the >>>>>>>>>>>>> > > > >>>> split size. >>>>>>>>>>>>> > > > >>>> 2. >>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>> > > > >>>> Not all enumerators maintain unassigned splits >>>>>>>>>>>>> in state. >>>>>>>>>>>>> > > > >>>> Some SplitEnumerator(such as kafka connector) >>>>>>>>>>>>> implementations >>>>>>>>>>>>> > do >>>>>>>>>>>>> > > > >>>> not track or persistently manage unassigned >>>>>>>>>>>>> splits. Requiring >>>>>>>>>>>>> > them >>>>>>>>>>>>> > > > to >>>>>>>>>>>>> > > > >>>> handle re-registration would add unnecessary >>>>>>>>>>>>> complexity. Even >>>>>>>>>>>>> > > > though we >>>>>>>>>>>>> > > > >>>> maybe implements in kafka connector, currently, >>>>>>>>>>>>> kafka connector >>>>>>>>>>>>> > is >>>>>>>>>>>>> > > > decouple >>>>>>>>>>>>> > > > >>>> with flink version, we also need to make sure >>>>>>>>>>>>> the elder version >>>>>>>>>>>>> > is >>>>>>>>>>>>> > > > >>>> compatible. >>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>> > > > >>>> ------------------------------ >>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>> > > > >>>> To address these concerns, I propose introducing a >>>>>>>>>>>>> new method: >>>>>>>>>>>>> > boolean >>>>>>>>>>>>> > > > >>>> SourceReader#shouldReassignSplitsOnRecovery() with >>>>>>>>>>>>> a default >>>>>>>>>>>>> > > > >>>> implementation returning false. This allows source >>>>>>>>>>>>> readers to opt >>>>>>>>>>>>> > in >>>>>>>>>>>>> > > > >>>> to split reassignment only when necessary. Since >>>>>>>>>>>>> the new contract >>>>>>>>>>>>> > > > already >>>>>>>>>>>>> > > > >>>> places the responsibility for split assignment on >>>>>>>>>>>>> the enumerator, >>>>>>>>>>>>> > not >>>>>>>>>>>>> > > > >>>> reporting splits by default is a safe and clean >>>>>>>>>>>>> default behavior. >>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>> > > > >>>> ------------------------------ >>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>> > > > >>>> I’ve updated the implementation and the FIP >>>>>>>>>>>>> accordingly[1]. It >>>>>>>>>>>>> > quite a >>>>>>>>>>>>> > > > >>>> big change. In particular, for the Kafka connector, >>>>>>>>>>>>> we can now use >>>>>>>>>>>>> > a >>>>>>>>>>>>> > > > >>>> pluggable SplitPartitioner to support different >>>>>>>>>>>>> split assignment >>>>>>>>>>>>> > > > >>>> strategies (e.g., default, round-robin). >>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>> > > > >>>> Could you please review it when you have a chance? >>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>> > > > >>>> Best, >>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>> > > > >>>> Hongshun >>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>> > > > >>>> [1] >>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment >>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>> > > > >>>> On Sat, Aug 9, 2025 at 3:03 AM Becket Qin < >>>>>>>>>>>>> [email protected]> >>>>>>>>>>>>> > > > wrote: >>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>> > > > >>>>> Hi Hongshun, >>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>> > > > >>>>> I am not too concerned about the transmission >>>>>>>>>>>>> cost. Because the >>>>>>>>>>>>> > full >>>>>>>>>>>>> > > > >>>>> split transmission has to happen in the initial >>>>>>>>>>>>> assignment phase >>>>>>>>>>>>> > > > already. >>>>>>>>>>>>> > > > >>>>> And in the future, we probably want to also >>>>>>>>>>>>> introduce some kind >>>>>>>>>>>>> > of >>>>>>>>>>>>> > > > workload >>>>>>>>>>>>> > > > >>>>> balance across source readers, e.g. based on the >>>>>>>>>>>>> per-split >>>>>>>>>>>>> > > > throughput or >>>>>>>>>>>>> > > > >>>>> the per-source-reader workload in heterogeneous >>>>>>>>>>>>> clusters. For >>>>>>>>>>>>> > these >>>>>>>>>>>>> > > > >>>>> reasons, we do not expect the Split objects to be >>>>>>>>>>>>> huge, and we >>>>>>>>>>>>> > are >>>>>>>>>>>>> > > > not >>>>>>>>>>>>> > > > >>>>> trying to design for huge Split objects either as >>>>>>>>>>>>> they will have >>>>>>>>>>>>> > > > problems >>>>>>>>>>>>> > > > >>>>> even today. >>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>> > > > >>>>> Good point on the potential split loss, please see >>>>>>>>>>>>> the reply >>>>>>>>>>>>> > below: >>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>> > > > >>>>> Scenario 2: >>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>> > > > >>>>>> 1. Reader A reports splits (1 and 2), and Reader >>>>>>>>>>>>> B reports (3 >>>>>>>>>>>>> > and 4) >>>>>>>>>>>>> > > > >>>>>> upon restart. >>>>>>>>>>>>> > > > >>>>>> 2. Before the enumerator receives all reports and >>>>>>>>>>>>> performs >>>>>>>>>>>>> > > > >>>>>> reassignment, a checkpoint is triggered. >>>>>>>>>>>>> > > > >>>>>> 3. Since no splits have been reassigned yet, both >>>>>>>>>>>>> readers have >>>>>>>>>>>>> > empty >>>>>>>>>>>>> > > > >>>>>> states. >>>>>>>>>>>>> > > > >>>>>> 4. When restarting from this checkpoint, all four >>>>>>>>>>>>> splits are >>>>>>>>>>>>> > lost. >>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>> > > > >>>>> The reader registration happens in the >>>>>>>>>>>>> SourceOperator.open(), >>>>>>>>>>>>> > which >>>>>>>>>>>>> > > > >>>>> means the task is still in the initializing state, >>>>>>>>>>>>> therefore the >>>>>>>>>>>>> > > > checkpoint >>>>>>>>>>>>> > > > >>>>> should not be triggered until the enumerator >>>>>>>>>>>>> receives all the >>>>>>>>>>>>> > split >>>>>>>>>>>>> > > > reports. >>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>> > > > >>>>> There is a nuance here. Today, the RPC call from >>>>>>>>>>>>> the TM to the JM >>>>>>>>>>>>> > is >>>>>>>>>>>>> > > > >>>>> async. So it is possible that the >>>>>>>>>>>>> SourceOpertor.open() has >>>>>>>>>>>>> > returned, >>>>>>>>>>>>> > > > but >>>>>>>>>>>>> > > > >>>>> the enumerator has not received the split reports. >>>>>>>>>>>>> However, >>>>>>>>>>>>> > because >>>>>>>>>>>>> > > > the >>>>>>>>>>>>> > > > >>>>> task status update RPC call goes to the same >>>>>>>>>>>>> channel as the split >>>>>>>>>>>>> > > > reports >>>>>>>>>>>>> > > > >>>>> call, so the task status RPC call will happen >>>>>>>>>>>>> after the split >>>>>>>>>>>>> > > > reports call >>>>>>>>>>>>> > > > >>>>> on the JM side. Therefore, on the JM side, the >>>>>>>>>>>>> SourceCoordinator >>>>>>>>>>>>> > will >>>>>>>>>>>>> > > > >>>>> always first receive the split reports, then >>>>>>>>>>>>> receive the >>>>>>>>>>>>> > checkpoint >>>>>>>>>>>>> > > > request. >>>>>>>>>>>>> > > > >>>>> This "happen before" relationship is kind of >>>>>>>>>>>>> important to >>>>>>>>>>>>> > guarantee >>>>>>>>>>>>> > > > >>>>> the consistent state between enumerator and >>>>>>>>>>>>> readers. >>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>> > > > >>>>> Scenario 1: >>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>> > > > >>>>>> 1. Upon restart, Reader A reports assigned splits >>>>>>>>>>>>> (1 and 2), and >>>>>>>>>>>>> > > > >>>>>> Reader B reports (3 and 4). >>>>>>>>>>>>> > > > >>>>>> 2. The enumerator receives these reports but only >>>>>>>>>>>>> reassigns >>>>>>>>>>>>> > splits 1 >>>>>>>>>>>>> > > > >>>>>> and 2 — not 3 and 4. >>>>>>>>>>>>> > > > >>>>>> 3. A checkpoint or savepoint is then triggered. >>>>>>>>>>>>> Only splits 1 >>>>>>>>>>>>> > and 2 >>>>>>>>>>>>> > > > >>>>>> are recorded in the reader states; splits 3 and 4 >>>>>>>>>>>>> are not >>>>>>>>>>>>> > persisted. >>>>>>>>>>>>> > > > >>>>>> 4. If the job is later restarted from this >>>>>>>>>>>>> checkpoint, splits 3 >>>>>>>>>>>>> > and >>>>>>>>>>>>> > > > 4 >>>>>>>>>>>>> > > > >>>>>> will be permanently lost. >>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>> > > > >>>>> This scenario is possible. One solution is to let >>>>>>>>>>>>> the enumerator >>>>>>>>>>>>> > > > >>>>> implementation handle this. That means if the >>>>>>>>>>>>> enumerator relies >>>>>>>>>>>>> > on >>>>>>>>>>>>> > > > the >>>>>>>>>>>>> > > > >>>>> initial split reports from the source readers, it >>>>>>>>>>>>> should maintain >>>>>>>>>>>>> > > > these >>>>>>>>>>>>> > > > >>>>> reports by itself. In the above example, the >>>>>>>>>>>>> enumerator will need >>>>>>>>>>>>> > to >>>>>>>>>>>>> > > > >>>>> remember that 3 and 4 are not assigned and put it >>>>>>>>>>>>> into its own >>>>>>>>>>>>> > state. >>>>>>>>>>>>> > > > >>>>> The current contract is that anything assigned to >>>>>>>>>>>>> the >>>>>>>>>>>>> > SourceReaders >>>>>>>>>>>>> > > > >>>>> are completely owned by the SourceReaders. >>>>>>>>>>>>> Enumerators can >>>>>>>>>>>>> > remember >>>>>>>>>>>>> > > > the >>>>>>>>>>>>> > > > >>>>> assignments but cannot change them, even when the >>>>>>>>>>>>> source reader >>>>>>>>>>>>> > > > recovers / >>>>>>>>>>>>> > > > >>>>> restarts. >>>>>>>>>>>>> > > > >>>>> With this FLIP, the contract becomes that the >>>>>>>>>>>>> source readers will >>>>>>>>>>>>> > > > >>>>> return the ownership of the splits to the >>>>>>>>>>>>> enumerator. So the >>>>>>>>>>>>> > > > enumerator is >>>>>>>>>>>>> > > > >>>>> responsible for maintaining these splits, until >>>>>>>>>>>>> they are assigned >>>>>>>>>>>>> > to >>>>>>>>>>>>> > > > a >>>>>>>>>>>>> > > > >>>>> source reader again. >>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>> > > > >>>>> There are other cases where there may be conflict >>>>>>>>>>>>> information >>>>>>>>>>>>> > between >>>>>>>>>>>>> > > > >>>>> reader and enumerator. For example, consider the >>>>>>>>>>>>> following >>>>>>>>>>>>> > sequence: >>>>>>>>>>>>> > > > >>>>> 1. reader A reports splits (1 and 2) up on restart. >>>>>>>>>>>>> > > > >>>>> 2. enumerator receives the report and assigns both >>>>>>>>>>>>> 1 and 2 to >>>>>>>>>>>>> > reader >>>>>>>>>>>>> > > > B. >>>>>>>>>>>>> > > > >>>>> 3. reader A failed before checkpointing. And this >>>>>>>>>>>>> is a partial >>>>>>>>>>>>> > > > >>>>> failure, so only reader A restarts. >>>>>>>>>>>>> > > > >>>>> 4. When reader A recovers, it will again report >>>>>>>>>>>>> splits (1 and 2) >>>>>>>>>>>>> > to >>>>>>>>>>>>> > > > >>>>> the enumerator. >>>>>>>>>>>>> > > > >>>>> 5. The enumerator should ignore this report >>>>>>>>>>>>> because it has >>>>>>>>>>>>> > > > >>>>> assigned splits (1 and 2) to reader B. >>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>> > > > >>>>> So with the new contract, the enumerator should be >>>>>>>>>>>>> the source of >>>>>>>>>>>>> > > > truth >>>>>>>>>>>>> > > > >>>>> for split ownership. >>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>> > > > >>>>> Thanks, >>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>> > > > >>>>> Jiangjie (Becket) Qin >>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>> > > > >>>>> On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang < >>>>>>>>>>>>> > > > [email protected]> >>>>>>>>>>>>> > > > >>>>> wrote: >>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>> > > > >>>>>> Hi Becket, >>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>> > > > >>>>>> I did consider this approach at the beginning >>>>>>>>>>>>> (and it was also >>>>>>>>>>>>> > > > >>>>>> mentioned in this FLIP), since it would allow >>>>>>>>>>>>> more flexibility >>>>>>>>>>>>> > in >>>>>>>>>>>>> > > > >>>>>> reassigning all splits. However, there are a few >>>>>>>>>>>>> potential >>>>>>>>>>>>> > issues. >>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>> > > > >>>>>> 1. High Transmission Cost >>>>>>>>>>>>> > > > >>>>>> If we pass the full split objects (rather than >>>>>>>>>>>>> just split IDs), >>>>>>>>>>>>> > the >>>>>>>>>>>>> > > > >>>>>> data size could be significant, leading to high >>>>>>>>>>>>> overhead during >>>>>>>>>>>>> > > > >>>>>> transmission — especially when many splits are >>>>>>>>>>>>> involved. >>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>> > > > >>>>>> 2. Risk of Split Loss >>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>> > > > >>>>>> Risk of split loss exists unless we have a >>>>>>>>>>>>> mechanism to make >>>>>>>>>>>>> > sure >>>>>>>>>>>>> > > > >>>>>> only can checkpoint after all the splits are >>>>>>>>>>>>> reassigned. >>>>>>>>>>>>> > > > >>>>>> There are scenarios where splits could be lost >>>>>>>>>>>>> due to >>>>>>>>>>>>> > inconsistent >>>>>>>>>>>>> > > > >>>>>> state handling during recovery: >>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>> > > > >>>>>> Scenario 1: >>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>> > > > >>>>>> 1. Upon restart, Reader A reports assigned >>>>>>>>>>>>> splits (1 and 2), >>>>>>>>>>>>> > and >>>>>>>>>>>>> > > > >>>>>> Reader B reports (3 and 4). >>>>>>>>>>>>> > > > >>>>>> 2. The enumerator receives these reports but >>>>>>>>>>>>> only reassigns >>>>>>>>>>>>> > > > >>>>>> splits 1 and 2 — not 3 and 4. >>>>>>>>>>>>> > > > >>>>>> 3. A checkpoint or savepoint is then >>>>>>>>>>>>> triggered. Only splits 1 >>>>>>>>>>>>> > and >>>>>>>>>>>>> > > > >>>>>> 2 are recorded in the reader states; splits 3 >>>>>>>>>>>>> and 4 are not >>>>>>>>>>>>> > > > persisted. >>>>>>>>>>>>> > > > >>>>>> 4. If the job is later restarted from this >>>>>>>>>>>>> checkpoint, splits >>>>>>>>>>>>> > 3 >>>>>>>>>>>>> > > > >>>>>> and 4 will be permanently lost. >>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>> > > > >>>>>> Scenario 2: >>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>> > > > >>>>>> 1. Reader A reports splits (1 and 2), and >>>>>>>>>>>>> Reader B reports (3 >>>>>>>>>>>>> > and >>>>>>>>>>>>> > > > >>>>>> 4) upon restart. >>>>>>>>>>>>> > > > >>>>>> 2. Before the enumerator receives all reports >>>>>>>>>>>>> and performs >>>>>>>>>>>>> > > > >>>>>> reassignment, a checkpoint is triggered. >>>>>>>>>>>>> > > > >>>>>> 3. Since no splits have been reassigned yet, >>>>>>>>>>>>> both readers >>>>>>>>>>>>> > have >>>>>>>>>>>>> > > > >>>>>> empty states. >>>>>>>>>>>>> > > > >>>>>> 4. When restarting from this checkpoint, all >>>>>>>>>>>>> four splits are >>>>>>>>>>>>> > > > lost. >>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>> > > > >>>>>> Let me know if you have thoughts on how we might >>>>>>>>>>>>> mitigate these >>>>>>>>>>>>> > > > risks! >>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>> > > > >>>>>> Best >>>>>>>>>>>>> > > > >>>>>> Hongshun >>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>> > > > >>>>>> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin < >>>>>>>>>>>>> [email protected]> >>>>>>>>>>>>> > > > >>>>>> wrote: >>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>> > > > >>>>>>> Hi Hongshun, >>>>>>>>>>>>> > > > >>>>>>> >>>>>>>>>>>>> > > > >>>>>>> The steps sound reasonable to me in general. In >>>>>>>>>>>>> terms of the >>>>>>>>>>>>> > > > updated >>>>>>>>>>>>> > > > >>>>>>> FLIP wiki, it would be good to see if we can >>>>>>>>>>>>> keep the protocol >>>>>>>>>>>>> > > > simple. One >>>>>>>>>>>>> > > > >>>>>>> alternative way to achieve this behavior is >>>>>>>>>>>>> following: >>>>>>>>>>>>> > > > >>>>>>> >>>>>>>>>>>>> > > > >>>>>>> 1. Upon SourceOperator startup, the >>>>>>>>>>>>> SourceOperator sends >>>>>>>>>>>>> > > > >>>>>>> ReaderRegistrationEvent with the currently >>>>>>>>>>>>> assigned splits to >>>>>>>>>>>>> > the >>>>>>>>>>>>> > > > >>>>>>> enumerator. It does not add these splits to the >>>>>>>>>>>>> SourceReader. >>>>>>>>>>>>> > > > >>>>>>> 2. The enumerator will always use the >>>>>>>>>>>>> > > > >>>>>>> SourceEnumeratorContext.assignSplits() to assign >>>>>>>>>>>>> the splits. >>>>>>>>>>>>> > (not >>>>>>>>>>>>> > > > via the >>>>>>>>>>>>> > > > >>>>>>> response of the SourceRegistrationEvent, this >>>>>>>>>>>>> allows async >>>>>>>>>>>>> > split >>>>>>>>>>>>> > > > assignment >>>>>>>>>>>>> > > > >>>>>>> in case the enumerator wants to wait until all >>>>>>>>>>>>> the readers are >>>>>>>>>>>>> > > > registered) >>>>>>>>>>>>> > > > >>>>>>> 3. The SourceOperator will only call >>>>>>>>>>>>> SourceReader.addSplits() >>>>>>>>>>>>> > when >>>>>>>>>>>>> > > > >>>>>>> it receives the AddSplitEvent from the >>>>>>>>>>>>> enumerator. >>>>>>>>>>>>> > > > >>>>>>> >>>>>>>>>>>>> > > > >>>>>>> This protocol has a few benefits: >>>>>>>>>>>>> > > > >>>>>>> 1. it basically allows arbitrary split >>>>>>>>>>>>> reassignment upon >>>>>>>>>>>>> > restart >>>>>>>>>>>>> > > > >>>>>>> 2. simplicity: there is only one way to assign >>>>>>>>>>>>> splits. >>>>>>>>>>>>> > > > >>>>>>> >>>>>>>>>>>>> > > > >>>>>>> So we only need one interface change: >>>>>>>>>>>>> > > > >>>>>>> - add the initially assigned splits to >>>>>>>>>>>>> ReaderInfo so the >>>>>>>>>>>>> > Enumerator >>>>>>>>>>>>> > > > >>>>>>> can access it. >>>>>>>>>>>>> > > > >>>>>>> and one behavior change: >>>>>>>>>>>>> > > > >>>>>>> - The SourceOperator should stop assigning >>>>>>>>>>>>> splits to the from >>>>>>>>>>>>> > state >>>>>>>>>>>>> > > > >>>>>>> restoration, but >>>>>>>>>>>>> > [message truncated...] >>>>>>>>>>>>> > >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>
