Hi Becket, Get it! I have modified it again.
Best, Hongshun On Wed, Oct 15, 2025 at 1:18 PM Becket Qin <[email protected]> wrote: > Hi Hongshun, > > Thanks for updating the FLIP. It looks much cleaner. Some minor comments: > > 1. Rename the `splitsOnRecovery` in ReaderInfo to > `reportedSplitsOnRegistration`. > 2. The SourceOperator should always send the ReaderRegistrationEvent with > the `reportedSplitsOnRegistration` list. But it will not add the splits to > readers if `SupportSplitReassignmentOnRecovery` is implemented. > 3. "*Thus, If a connector want to use this FLIP-537, enumerator must keep > a unAssignedSplit in state*" - this is up to the > enumerator implementation to decide. It is not a must have for all the > enumerator impl. > 4. The RoundRobin algorithm in KafkaSource is not deterministic. Why not > just get all the splits and do a round robin based on the numReaders? E.g. > Sort all the splits, and assign the splits to reader 0, 1, 2, 3...N, 0, 1, > 2, 3... N... > > Thanks, > > Jiangjie (Becket) Qin > > On Tue, Oct 14, 2025 at 7:28 PM Hongshun Wang <[email protected]> > wrote: > >> Hi devs, >> >> If there is no other problem, I will start a vote later. >> >> Best, >> Hongshun >> >> On Mon, Oct 13, 2025 at 4:17 PM Hongshun Wang <[email protected]> >> wrote: >> >>> Hi Becket and Leonard, >>> >>> It seems adding `splitsOnRecovery` to `ReaderInfo` makes the split >>> enumerator simpler and cleaner. >>> >>> I have modified this FLIP again. Please have a look and let me know what >>> you think. >>> >>> Best, >>> Hongshun >>> >>> On Mon, Oct 13, 2025 at 10:48 AM Hongshun Wang <[email protected]> >>> wrote: >>> >>>> Hi Becket, >>>> Thanks for your explanation. >>>> >>>> > For the same three input above, the assignment should be consistently >>>> the same. >>>> >>>> That is exactly what troubles me. For *assignment algorithms such as >>>> hash, it does behave the same. What If we use round-robin? Each *the >>>> *reader >>>> information, the same split will be assigned to different readers. There is >>>> also what I used to list as an example.* >>>> >>>> 1. *Initial state:*: 2 parallelism, 2 splits. >>>> 2. *Enumerator action:* Split 1 → Task 1, Split 2 → Task 2 , , >>>> 3. *Failure scenario: *After Split 2 is assigned to Task 2 but >>>> before next checkpoint success, task 1 restarts. >>>> 4. *Recovery issue:* Split 2 is re-added to the enumerator. >>>> Round-robin strategy assigns Split 2 to Task 1. Then Task 1 now has 2 >>>> splits, Task 2 has 0 → Imbalanced distribution. >>>> >>>> >>>> > Please let me know if you think a meeting would be more efficient. >>>> Yes, I’d like to reach an agreement as soon as possible. If you’re >>>> available, we could schedule a meeting with Lenenord as well. >>>> >>>> Best, >>>> Hongshun >>>> >>>> On Sat, Oct 11, 2025 at 3:59 PM Becket Qin <[email protected]> >>>> wrote: >>>> >>>>> Hi Hongshun, >>>>> >>>>> I am confused. First of all, regardless of what the assignment >>>>> algorithm is. Using SplitEnumeratorContext to return the splits only gives >>>>> more information than using addSplitsBack(). So there should be no >>>>> regression. >>>>> >>>>> Secondly, at this point. The SplitEnumerator should only take the >>>>> following three input to generate the global splits assignment: >>>>> 1. the *reader information (num readers, locations, etc)* >>>>> 2. *all the splits to assign* >>>>> 3. *configured assignment algorithm * >>>>> Preferably, for the same three input above, the assignment should be >>>>> consistently the same. I don't see why it should care about why a new >>>>> reader is added, whether due to partial failover or global failover or job >>>>> restart. >>>>> >>>>> If you want to do global redistribution on global failover and >>>>> restart, but honor the existing assignment for partial failover. The >>>>> enumerator will just do the following: >>>>> 1. Generate a new global assignment (global redistribution) in start() >>>>> because start() will only be invoked in global failover or restart. That >>>>> means all the readers are also new with empty assignment. >>>>> 2. After the global assignment is generated, it should be honored for >>>>> the whole life cycle. there might be many reader registrations, again for >>>>> different reasons but does not matter: >>>>> - reader registration after this job restart >>>>> - reader registration after this global failover >>>>> - reader registration due to partial failover which may or may not >>>>> have a addSplitsBack() call. >>>>> Regardless of the reason, the split enumerator will just enforce >>>>> the global assignment it has already generated, i.e. without split >>>>> redistribution. >>>>> >>>>> Wouldn't that give the behavior you want? I feel the discussion >>>>> somehow goes to circles. Please let me know if you think a meeting would >>>>> be >>>>> more efficient. >>>>> >>>>> Thanks, >>>>> >>>>> Jiangjie (Becket) Qin >>>>> >>>>> On Fri, Oct 10, 2025 at 7:58 PM Hongshun Wang <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi Becket, >>>>>> >>>>>> > Ignore a returned split if it has been assigned to a different >>>>>> reader, otherwise put it back to unassigned splits / pending splits. Then >>>>>> the enumerator assigns new splits to the newly added reader, which may >>>>>> use >>>>>> the previous assignment as a reference. This should work regardless of >>>>>> whether it is a global failover, partial failover, restart, etc. There is >>>>>> no need for the SplitEnumerator to distinguish what failover scenario it >>>>>> is. >>>>>> >>>>>> In this case, it seems that global failover and partial failover >>>>>> share the same distribution strategy If it has not been assigned to a >>>>>> different reader. However, global failover needs to be redistributed(this >>>>>> is why we need this FLIP) , while partial failover is not. I have no idea >>>>>> how we distinguish them. >>>>>> >>>>>> What do you think? >>>>>> >>>>>> Best, >>>>>> Hongshun >>>>>> >>>>>> On Sat, Oct 11, 2025 at 12:54 AM Becket Qin <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi Hongshun, >>>>>>> >>>>>>> The problem we are trying to solve here is to give the splits back >>>>>>> to the SplitEnumerator. There are only two types of splits to give back: >>>>>>> 1) splits whose assignment has been checkpointed. - In this case, we >>>>>>> rely on addReader() + SplitEnumeratorContext to give the splits back, >>>>>>> this >>>>>>> provides more information associated with those splits. >>>>>>> 2) splits whose assignment has not been checkpointed. - In this >>>>>>> case, we use addSplitsBack(), there is no reader info to give because >>>>>>> the >>>>>>> previous assignment did not take effect to begin with. >>>>>>> >>>>>>> From the SplitEnumerator implementation perspective, the contract is >>>>>>> straightforward. >>>>>>> 1. The SplitEnumerator is the source of truth for assignment. >>>>>>> 2. When the enumerator receives the addSplits() call, it always add >>>>>>> these splits back to unassigned splits / pending splits. >>>>>>> 3. When the enumerator receives the addReader() call, that means the >>>>>>> reader has no current assignment, and has returned its previous >>>>>>> assignment >>>>>>> based on the reader side info. The SplitEnumerator checks the >>>>>>> SplitEnumeratorContext to retrieve the returned splits from that reader >>>>>>> (i.e. previous assignment) and handle them according to its own source >>>>>>> of >>>>>>> truth knowledge of assignment - Ignore a returned split if it has been >>>>>>> assigned to a different reader, otherwise put it back to unassigned >>>>>>> splits >>>>>>> / pending splits. Then the enumerator assigns new splits to the newly >>>>>>> added >>>>>>> reader, which may use the previous assignment as a reference. This >>>>>>> should >>>>>>> work regardless of whether it is a global failover, partial failover, >>>>>>> restart, etc. There is no need for the SplitEnumerator to distinguish >>>>>>> what >>>>>>> failover scenario it is. >>>>>>> >>>>>>> Would this work? >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> Jiangjie (Becket) Qin >>>>>>> >>>>>>> On Fri, Oct 10, 2025 at 1:28 AM Hongshun Wang < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Hi Becket, >>>>>>>> > why do we need to change the behavior of addSplitsBack()? Should >>>>>>>> it remain the same? >>>>>>>> >>>>>>>> How does the enumerator get the splits from ReaderRegistrationEvent >>>>>>>> and then reassign it? >>>>>>>> >>>>>>>> You have given a advice before: >>>>>>>> > 1. Put all the reader information in the SplitEnumerator >>>>>>>> context. 2. notify the enumerator about the new reader registration. >>>>>>>> 3. >>>>>>>> let the split enumerator get whatever information it wants from the >>>>>>>> context >>>>>>>> and do its job. >>>>>>>> >>>>>>>> However, each time a source task fails over, the >>>>>>>> ConcurrentMap<Integer, ConcurrentMap<Integer, ReaderInfo>> >>>>>>>> registeredReaders will remove this reader infos. When the source task >>>>>>>> is >>>>>>>> registered again, it will be added again. *Thus, registeredReaders >>>>>>>> cannot know whether is registered before. * >>>>>>>> >>>>>>>> Therefore, registeredReaders enumerator#addReader does not >>>>>>>> distinguish the following situations: >>>>>>>> However, each time one source task is failover. The >>>>>>>> `ConcurrentMap<Integer, ConcurrentMap<Integer, ReaderInfo>> >>>>>>>> registeredReaders` will remove this source. When source Task is >>>>>>>> registered >>>>>>>> again, enumerator#addReader not distinguished three situations: >>>>>>>> 1. The Reader is registered when the global restart. In this case, >>>>>>>> redistribution the split from the infos. (take off all the splits from >>>>>>>> ReaderInfo). >>>>>>>> 2. The Reader is registered when a partial failover(before the >>>>>>>> first successful checkpoint). In this case, ignore the split from the >>>>>>>> infos. (leave alone all the splits from ReaderInfo). >>>>>>>> 3. The Reader is registered when a partial failover(after the first >>>>>>>> successful checkpoint).In this case, we need assign the split to same >>>>>>>> reader again. (take off all the splits from ReaderInfo but assigned to >>>>>>>> it >>>>>>>> again). >>>>>>>> we still need the enumerator to distinguish them (using >>>>>>>> pendingSplitAssignment & assignedSplitAssignment. However, it is >>>>>>>> redundant >>>>>>>> to maintain split assigned information both in the enumerator and the >>>>>>>> enumerator context. >>>>>>>> >>>>>>>> I think if we change the behavior of addSplitsBack, it will be more >>>>>>>> simple. Just let the enumerator to handle these split based on >>>>>>>> pendingSplitAssignment >>>>>>>> & assignedSplitments. >>>>>>>> >>>>>>>> What do you think? >>>>>>>> >>>>>>>> Best, >>>>>>>> Hongshun >>>>>>>> >>>>>>>> On Fri, Oct 10, 2025 at 12:55 PM Becket Qin <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Hongshun, >>>>>>>>> >>>>>>>>> Thanks for updating the FLIP. A quick question: why do we need to >>>>>>>>> change the behavior of addSplitsBack()? Should it remain the same? >>>>>>>>> >>>>>>>>> Regarding the case of restart with changed subscription. I think >>>>>>>>> the only correct behavior is removing obsolete splits without any >>>>>>>>> warning / >>>>>>>>> exception. It is OK to add an info level logging if we want to. It is >>>>>>>>> a >>>>>>>>> clear intention if the user has explicitly changed subscription and >>>>>>>>> restarted the job. There is no need to add a config to double confirm. >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> >>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>> >>>>>>>>> On Thu, Oct 9, 2025 at 7:28 PM Hongshun Wang < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> Hi Leonard, >>>>>>>>>> >>>>>>>>>> If the SplitEnumerator received all splits after a restart, it >>>>>>>>>> becomes straightforward to clear and un-assign the unmatched >>>>>>>>>> splits(checking whether matches the source options). However, a key >>>>>>>>>> question arises: *should automatically discard obsolete splits, >>>>>>>>>> or explicitly notify the user via an exception?* >>>>>>>>>> >>>>>>>>>> We provided a option `scan.partition-unsubscribe.strategy`: >>>>>>>>>> 1. If Strict, throws an exception when encountering removed >>>>>>>>>> splits. >>>>>>>>>> 2. If Lenient, automatically removes obsolete splits silently. >>>>>>>>>> >>>>>>>>>> What Do you think? >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Hongshun >>>>>>>>>> >>>>>>>>>> On Thu, Oct 9, 2025 at 9:37 PM Leonard Xu <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Thanks hongshun for the updating and pretty detailed analysis >>>>>>>>>>> for edge cases, the updated FLIP looks good to me now. >>>>>>>>>>> >>>>>>>>>>> Only last implementation details about scenario in motivation >>>>>>>>>>> section: >>>>>>>>>>> >>>>>>>>>>> *Restart with Changed subscription: During restart, if source >>>>>>>>>>> options remove a topic or table. The splits which have already >>>>>>>>>>> assigned can >>>>>>>>>>> not be removed.* >>>>>>>>>>> >>>>>>>>>>> Could you clarify how we resolve this in Kafka connector ? >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Leonard >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> 2025 10月 9 19:48,Hongshun Wang <[email protected]> 写道: >>>>>>>>>>> >>>>>>>>>>> Hi devs, >>>>>>>>>>> If there are no further suggestions, I will start the voting >>>>>>>>>>> tomorrow。 >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Hongshun >>>>>>>>>>> >>>>>>>>>>> On Fri, Sep 26, 2025 at 7:48 PM Hongshun Wang < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Becket and Leonard, >>>>>>>>>>>> >>>>>>>>>>>> I have updated the content of this FLIP. The key point is that: >>>>>>>>>>>> >>>>>>>>>>>> When the split enumerator receives a split, *these splits must >>>>>>>>>>>> have already existed in pendingSplitAssignment or >>>>>>>>>>>> assignedSplitments* >>>>>>>>>>>> . >>>>>>>>>>>> >>>>>>>>>>>> - If the split is in pendingSplitAssignments, ignore it. >>>>>>>>>>>> - If the split is in assignedSplitAssignments but has a >>>>>>>>>>>> different taskId, ignore it (this indicates it was already >>>>>>>>>>>> assigned to another task). >>>>>>>>>>>> - If the split is in assignedSplitAssignments and shares >>>>>>>>>>>> the same taskId, move the assignment from assignedSplitments >>>>>>>>>>>> to pendingSplitAssignment to re-assign again. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> For better understanding why use these strategies. I added some >>>>>>>>>>>> examples and pictures to show it. >>>>>>>>>>>> >>>>>>>>>>>> Would you like to help me check whether there are still some >>>>>>>>>>>> problems? >>>>>>>>>>>> >>>>>>>>>>>> Best >>>>>>>>>>>> Hongshun >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Sep 26, 2025 at 5:08 PM Leonard Xu <[email protected]> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Thanks Becket and Hongshun for the insightful discussion. >>>>>>>>>>>>> >>>>>>>>>>>>> The underlying implementation and communication mechanisms of >>>>>>>>>>>>> Flink Source indeed involve many intricate details, we discussed >>>>>>>>>>>>> the issue >>>>>>>>>>>>> of splits re-assignment in specific scenarios, but fortunately, >>>>>>>>>>>>> the final >>>>>>>>>>>>> decision turned out to be pretty clear. >>>>>>>>>>>>> >>>>>>>>>>>>> +1 to Becket’s proposal to keeps the framework cleaner and >>>>>>>>>>>>> more flexible. >>>>>>>>>>>>> +1 to Hongshun’s point to provide comprehensive guidance for >>>>>>>>>>>>> connector developers. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Best, >>>>>>>>>>>>> Leonard >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> 2025 9月 26 16:30,Hongshun Wang <[email protected]> 写道: >>>>>>>>>>>>> >>>>>>>>>>>>> Hi Becket, >>>>>>>>>>>>> >>>>>>>>>>>>> I Got it. You’re suggesting we should not handle this in the >>>>>>>>>>>>> source framework but instead let the split enumerator manage >>>>>>>>>>>>> these three >>>>>>>>>>>>> scenarios. >>>>>>>>>>>>> >>>>>>>>>>>>> Let me explain why I originally favored handling it in the >>>>>>>>>>>>> framework: I'm concerned that connector developers might overlook >>>>>>>>>>>>> certain >>>>>>>>>>>>> edge cases (after all, we even payed extensive discussions to >>>>>>>>>>>>> fully clarify >>>>>>>>>>>>> the logic) >>>>>>>>>>>>> >>>>>>>>>>>>> However, your point keeps the framework cleaner and more >>>>>>>>>>>>> flexible. Thus, I will take it. >>>>>>>>>>>>> >>>>>>>>>>>>> Perhaps, in this FLIP, we should focus on providing >>>>>>>>>>>>> comprehensive guidance for connector developers: explain how >>>>>>>>>>>>> to implement a split enumerator, including the underlying >>>>>>>>>>>>> challenges and >>>>>>>>>>>>> their solutions. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Additionally, we can use the Kafka connector as a reference >>>>>>>>>>>>> implementation to demonstrate the practical steps. This way, >>>>>>>>>>>>> developers who >>>>>>>>>>>>> want to implement similar connectors can directly reference this >>>>>>>>>>>>> example. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Best, >>>>>>>>>>>>> Hongshun >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Sep 26, 2025 at 1:27 PM Becket Qin < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> It would be good to not expose runtime details to the source >>>>>>>>>>>>>> implementation if possible. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Today, the split enumerator implementations are expected to >>>>>>>>>>>>>> track the split assignment. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Assuming the split enumerator implementation keeps a split >>>>>>>>>>>>>> assignment map, that means the enumerator should already know >>>>>>>>>>>>>> whether a >>>>>>>>>>>>>> split is assigned or unassigned. So it can handle the three >>>>>>>>>>>>>> scenarios you >>>>>>>>>>>>>> mentioned. >>>>>>>>>>>>>> >>>>>>>>>>>>>> The split is reported by a reader during a global restoration. >>>>>>>>>>>>>>> >>>>>>>>>>>>>> The split enumerator should have just been restored / >>>>>>>>>>>>>> created. If the enumerator expects a full reassignment of splits >>>>>>>>>>>>>> up on >>>>>>>>>>>>>> global recovery, there should be no assigned splits to that >>>>>>>>>>>>>> reader in the >>>>>>>>>>>>>> split assignment mapping. >>>>>>>>>>>>>> >>>>>>>>>>>>>> The split is reported by a reader during a partial failure >>>>>>>>>>>>>>> recovery. >>>>>>>>>>>>>>> >>>>>>>>>>>>>> In this case, when SplitEnumerator.addReader() is invoked, >>>>>>>>>>>>>> the split assignment map in the enumerator implementation should >>>>>>>>>>>>>> already >>>>>>>>>>>>>> have some split assignments for the reader. Therefore it is a >>>>>>>>>>>>>> partial >>>>>>>>>>>>>> failover. If the source supports split reassignment on recovery, >>>>>>>>>>>>>> the >>>>>>>>>>>>>> enumerator can assign splits that are different from the reported >>>>>>>>>>>>>> assignment of that reader in the SplitEnumeratorContext, or it >>>>>>>>>>>>>> can also >>>>>>>>>>>>>> assign the same splits. In any case, the enumerator knows that >>>>>>>>>>>>>> this is a >>>>>>>>>>>>>> partial recovery because the assignment map is non-empty. >>>>>>>>>>>>>> >>>>>>>>>>>>>> The split is not reported by a reader, but is assigned after >>>>>>>>>>>>>>> the last successful checkpoint and was never acknowledged. >>>>>>>>>>>>>> >>>>>>>>>>>>>> This is actually one of the step in the partial failure >>>>>>>>>>>>>> recover. SplitEnumerator.addSplitsBack() will be called first >>>>>>>>>>>>>> before >>>>>>>>>>>>>> SplitReader.addReader() is called for the recovered reader. When >>>>>>>>>>>>>> the >>>>>>>>>>>>>> SplitEnumerator.addSplitsBack() is invoked, it is for sure a >>>>>>>>>>>>>> partial >>>>>>>>>>>>>> recovery. And the enumerator should remove these splits from the >>>>>>>>>>>>>> split >>>>>>>>>>>>>> assignment map as if they were never assigned. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I think this should work, right? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, Sep 25, 2025 at 8:34 PM Hongshun Wang < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Becket and Leonard, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks for your advice. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> > put all the reader information in the SplitEnumerator >>>>>>>>>>>>>>> context >>>>>>>>>>>>>>> I have a concern: the current registeredReaders in* >>>>>>>>>>>>>>> SourceCoordinatorContext will be removed after subtaskResetor >>>>>>>>>>>>>>> execution on >>>>>>>>>>>>>>> failure*.However, this approach has merit. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> One more situation I found my previous design does not cover: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 1. Initial state: Reader A reports splits (1, 2). >>>>>>>>>>>>>>> 2. Enumerator action: Assigns split 1 to Reader A, and >>>>>>>>>>>>>>> split 2 to Reader B. >>>>>>>>>>>>>>> 3. Failure scenario: Reader A fails before >>>>>>>>>>>>>>> checkpointing. Since this is a partial failure, only Reader >>>>>>>>>>>>>>> A restarts. >>>>>>>>>>>>>>> 4. Recovery issue: Upon recovery, Reader A re-reports >>>>>>>>>>>>>>> split (1). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> In my previous design, the enumerator will ignore Reader A's >>>>>>>>>>>>>>> re-registration which will cause data loss. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thus, when the enumerator receives a split, the split may >>>>>>>>>>>>>>> originate from three scenarios: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 1. The split is reported by a reader during a global >>>>>>>>>>>>>>> restoration. >>>>>>>>>>>>>>> 2. The split is reported by a reader during a partial >>>>>>>>>>>>>>> failure recovery. >>>>>>>>>>>>>>> 3. The split is not reported by a reader, but is >>>>>>>>>>>>>>> assigned after the last successful checkpoint and was never >>>>>>>>>>>>>>> acknowledged. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> In the first scenario (global restore), the split should >>>>>>>>>>>>>>> be re-distributed. For the latter two scenarios (partial >>>>>>>>>>>>>>> failover and >>>>>>>>>>>>>>> post-checkpoint assignment), we need to reassign the split to >>>>>>>>>>>>>>> its originally assigned subtask. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> By implementing a method in the SplitEnumerator context to >>>>>>>>>>>>>>> track each assigned split's status, the system can correctly >>>>>>>>>>>>>>> identify and >>>>>>>>>>>>>>> resolve split ownership in all three scenarios.*What about >>>>>>>>>>>>>>> adding a `SplitRecoveryType splitRecoveryType(Split split)` in >>>>>>>>>>>>>>> SplitEnumeratorContext.* SplitRecoveryTypeis a enum >>>>>>>>>>>>>>> including `UNASSIGNED`、`GLOBAL_RESTORE`、`PARTIAL_FAILOVER` and >>>>>>>>>>>>>>> `POST_CHECKPOINT_ASSIGNMENT`. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> What do you think? Are there any details or scenarios I >>>>>>>>>>>>>>> haven't considered? Looking forward to your advice. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>> Hongshun >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thu, Sep 11, 2025 at 12:41 AM Becket Qin < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks for the explanation, Hongshun. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Current pattern of handling new reader registration >>>>>>>>>>>>>>>> following: >>>>>>>>>>>>>>>> 1. put all the reader information in the SplitEnumerator >>>>>>>>>>>>>>>> context >>>>>>>>>>>>>>>> 2. notify the enumerator about the new reader registration. >>>>>>>>>>>>>>>> 3. Let the split enumerator get whatever information it >>>>>>>>>>>>>>>> wants from the >>>>>>>>>>>>>>>> context and do its job. >>>>>>>>>>>>>>>> This pattern decouples the information passing and the >>>>>>>>>>>>>>>> reader registration >>>>>>>>>>>>>>>> notification. This makes the API extensible - we can add >>>>>>>>>>>>>>>> more information >>>>>>>>>>>>>>>> (e.g. reported assigned splits in our case) about the >>>>>>>>>>>>>>>> reader to the context >>>>>>>>>>>>>>>> without introducing new methods. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Introducing a new method of addSplitBackOnRecovery() is >>>>>>>>>>>>>>>> redundant to the >>>>>>>>>>>>>>>> above pattern. Do we really need it? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Mon, Sep 8, 2025 at 8:18 PM Hongshun Wang < >>>>>>>>>>>>>>>> [email protected]> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> > Hi Becket, >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > > I am curious what would the enumerator do differently >>>>>>>>>>>>>>>> for the splits >>>>>>>>>>>>>>>> > added via addSplitsBackOnRecovery() V.S. addSplitsBack()? >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > In this FLIP, there are two distinct scenarios in which >>>>>>>>>>>>>>>> the enumerator >>>>>>>>>>>>>>>> > receives splits being added back: >>>>>>>>>>>>>>>> > 1. Job-level restore: The job is restored, splits from >>>>>>>>>>>>>>>> reader’s state are >>>>>>>>>>>>>>>> > reported by ReaderRegistrationEvent. >>>>>>>>>>>>>>>> > 2. Reader-level restart: a reader is started but not the >>>>>>>>>>>>>>>> whole job, >>>>>>>>>>>>>>>> > splits assigned to it after the last successful >>>>>>>>>>>>>>>> checkpoint. This is what >>>>>>>>>>>>>>>> > addSplitsBack used to do. >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > In these two situations, the enumerator will choose >>>>>>>>>>>>>>>> different strategies. >>>>>>>>>>>>>>>> > 1. Job-level restore: the splits should be redistributed >>>>>>>>>>>>>>>> across readers >>>>>>>>>>>>>>>> > according to the current partitioner strategy. >>>>>>>>>>>>>>>> > 2. Reader-level restart: the splits should be reassigned >>>>>>>>>>>>>>>> directly back to >>>>>>>>>>>>>>>> > the same reader they were originally assigned to, >>>>>>>>>>>>>>>> preserving locality and >>>>>>>>>>>>>>>> > avoiding unnecessary redistribution >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > Therefore, the enumerator must clearly distinguish >>>>>>>>>>>>>>>> between these two >>>>>>>>>>>>>>>> > scenarios.I used to deprecate the former >>>>>>>>>>>>>>>> addSplitsBack(List<SplitT> >>>>>>>>>>>>>>>> > splits, int subtaskId) but add a new >>>>>>>>>>>>>>>> addSplitsBack(List<SplitT> >>>>>>>>>>>>>>>> > splits, int subtaskId, >>>>>>>>>>>>>>>> > boolean reportedByReader). >>>>>>>>>>>>>>>> > Leonard suggest to use another method >>>>>>>>>>>>>>>> addSplitsBackOnRecovery but not >>>>>>>>>>>>>>>> > influenced currently addSplitsBack. >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > Best >>>>>>>>>>>>>>>> > Hongshun >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > On 2025/09/08 17:20:31 Becket Qin wrote: >>>>>>>>>>>>>>>> > > Hi Leonard, >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > > Could we introduce a new method like >>>>>>>>>>>>>>>> addSplitsBackOnRecovery with >>>>>>>>>>>>>>>> > default >>>>>>>>>>>>>>>> > > > implementation. In this way, we can provide better >>>>>>>>>>>>>>>> backward >>>>>>>>>>>>>>>> > compatibility >>>>>>>>>>>>>>>> > > > and also makes it easier for developers to understand. >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > I am curious what would the enumerator do differently >>>>>>>>>>>>>>>> for the splits >>>>>>>>>>>>>>>> > added >>>>>>>>>>>>>>>> > > via addSplitsBackOnRecovery() V.S. addSplitsBack()? >>>>>>>>>>>>>>>> Today, >>>>>>>>>>>>>>>> > addSplitsBack() >>>>>>>>>>>>>>>> > > is also only called upon recovery. So the new method >>>>>>>>>>>>>>>> seems confusing. One >>>>>>>>>>>>>>>> > > thing worth clarifying is if the Source implements >>>>>>>>>>>>>>>> > > SupportSplitReassignmentOnRecovery, upon recovery, >>>>>>>>>>>>>>>> should the splits >>>>>>>>>>>>>>>> > > reported by the readers also be added back to the >>>>>>>>>>>>>>>> SplitEnumerator via the >>>>>>>>>>>>>>>> > > addSplitsBack() call? Or should the SplitEnumerator >>>>>>>>>>>>>>>> explicitly query the >>>>>>>>>>>>>>>> > > registered reader information via the >>>>>>>>>>>>>>>> SplitEnumeratorContext to get the >>>>>>>>>>>>>>>> > > originally assigned splits when addReader() is invoked? >>>>>>>>>>>>>>>> I was assuming >>>>>>>>>>>>>>>> > the >>>>>>>>>>>>>>>> > > latter in the beginning, so the behavior of >>>>>>>>>>>>>>>> addSplitsBack() remains >>>>>>>>>>>>>>>> > > unchanged, but I am not opposed in doing the former. >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > Also, can you elaborate on the backwards compatibility >>>>>>>>>>>>>>>> issue you see if >>>>>>>>>>>>>>>> > we >>>>>>>>>>>>>>>> > > do not have a separate addSplitsBackOnRecovery() >>>>>>>>>>>>>>>> method? Even without >>>>>>>>>>>>>>>> > this >>>>>>>>>>>>>>>> > > new method, the behavior remains exactly the same >>>>>>>>>>>>>>>> unless the end users >>>>>>>>>>>>>>>> > > implement the mix-in interface of >>>>>>>>>>>>>>>> "SupportSplitReassignmentOnRecovery", >>>>>>>>>>>>>>>> > > right? >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > Thanks, >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > Jiangjie (Becket) Qin >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > On Mon, Sep 8, 2025 at 1:48 AM Hongshun Wang < >>>>>>>>>>>>>>>> [email protected]> >>>>>>>>>>>>>>>> > > wrote: >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > > Hi devs, >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > It has been quite some time since this FLIP[1] was >>>>>>>>>>>>>>>> first proposed. >>>>>>>>>>>>>>>> > Thank >>>>>>>>>>>>>>>> > > > you for your valuable feedback—based on your >>>>>>>>>>>>>>>> suggestions, the FLIP has >>>>>>>>>>>>>>>> > > > undergone several rounds of revisions. >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > Any more advice is welcome and appreciated. If there >>>>>>>>>>>>>>>> are no further >>>>>>>>>>>>>>>> > > > concerns, I plan to start the vote tomorrow. >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > Best >>>>>>>>>>>>>>>> > > > Hongshun >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > [1] >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=373886480 >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > On Mon, Sep 8, 2025 at 4:42 PM Hongshun Wang < >>>>>>>>>>>>>>>> [email protected]> >>>>>>>>>>>>>>>> > > > wrote: >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > > Hi Leonard, >>>>>>>>>>>>>>>> > > > > Thanks for your advice. It makes sense and I have >>>>>>>>>>>>>>>> modified it. >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> > > > > Best, >>>>>>>>>>>>>>>> > > > > Hongshun >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> > > > > On Mon, Sep 8, 2025 at 11:40 AM Leonard Xu < >>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> > > > >> Thanks Hongshun and Becket for the deep discussion. >>>>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>>>> > > > >> I only have one comment for one API design: >>>>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>>>> > > > >> > Deprecate the old addSplitsBack method, use a >>>>>>>>>>>>>>>> addSplitsBack with >>>>>>>>>>>>>>>> > > > >> param isReportedByReader instead. Because, The >>>>>>>>>>>>>>>> enumerator can apply >>>>>>>>>>>>>>>> > > > >> different reassignment policies based on the >>>>>>>>>>>>>>>> context. >>>>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>>>> > > > >> Could we introduce a new method like >>>>>>>>>>>>>>>> *addSplitsBackOnRecovery* with >>>>>>>>>>>>>>>> > > > default >>>>>>>>>>>>>>>> > > > >> implementation. In this way, we can provide better >>>>>>>>>>>>>>>> backward >>>>>>>>>>>>>>>> > > > >> compatibility and also makes it easier for >>>>>>>>>>>>>>>> developers to understand. >>>>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>>>> > > > >> Best, >>>>>>>>>>>>>>>> > > > >> Leonard >>>>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>>>> > > > >> 2025 9月 3 20:26,Hongshun Wang <[email protected]> >>>>>>>>>>>>>>>> 写道: >>>>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>>>> > > > >> Hi Becket, >>>>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>>>> > > > >> I think that's a great idea! I have added the >>>>>>>>>>>>>>>> > > > >> SupportSplitReassignmentOnRecovery interface in >>>>>>>>>>>>>>>> this FLIP. If a >>>>>>>>>>>>>>>> > Source >>>>>>>>>>>>>>>> > > > >> implements this interface indicates that the >>>>>>>>>>>>>>>> source operator needs >>>>>>>>>>>>>>>> > to >>>>>>>>>>>>>>>> > > > >> report splits to the enumerator and receive >>>>>>>>>>>>>>>> reassignment.[1] >>>>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>>>> > > > >> Best, >>>>>>>>>>>>>>>> > > > >> Hongshun >>>>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>>>> > > > >> [1] >>>>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment >>>>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>>>> > > > >> On Thu, Aug 21, 2025 at 12:09 PM Becket Qin < >>>>>>>>>>>>>>>> [email protected]> >>>>>>>>>>>>>>>> > > > wrote: >>>>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>>>> > > > >>> Hi Hongshun, >>>>>>>>>>>>>>>> > > > >>> >>>>>>>>>>>>>>>> > > > >>> I think the convention for such optional features >>>>>>>>>>>>>>>> in Source is via >>>>>>>>>>>>>>>> > > > >>> mix-in interfaces. So instead of adding a method >>>>>>>>>>>>>>>> to the >>>>>>>>>>>>>>>> > SourceReader, >>>>>>>>>>>>>>>> > > > maybe >>>>>>>>>>>>>>>> > > > >>> we should introduce an interface >>>>>>>>>>>>>>>> SupportSplitReassingmentOnRecovery >>>>>>>>>>>>>>>> > > > with >>>>>>>>>>>>>>>> > > > >>> this method. If a Source implementation >>>>>>>>>>>>>>>> implements that interface, >>>>>>>>>>>>>>>> > > > then the >>>>>>>>>>>>>>>> > > > >>> SourceOperator will check the desired behavior >>>>>>>>>>>>>>>> and act accordingly. >>>>>>>>>>>>>>>> > > > >>> >>>>>>>>>>>>>>>> > > > >>> Thanks, >>>>>>>>>>>>>>>> > > > >>> >>>>>>>>>>>>>>>> > > > >>> Jiangjie (Becket) Qin >>>>>>>>>>>>>>>> > > > >>> >>>>>>>>>>>>>>>> > > > >>> On Wed, Aug 20, 2025 at 8:52 PM Hongshun Wang < >>>>>>>>>>>>>>>> > [email protected] >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> > > > >>> wrote: >>>>>>>>>>>>>>>> > > > >>> >>>>>>>>>>>>>>>> > > > >>>> Hi de vs, >>>>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>>>> > > > >>>> Would anyone like to discuss this FLIP? I'd >>>>>>>>>>>>>>>> appreciate your >>>>>>>>>>>>>>>> > feedback >>>>>>>>>>>>>>>> > > > >>>> and suggestions. >>>>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>>>> > > > >>>> Best, >>>>>>>>>>>>>>>> > > > >>>> Hongshun >>>>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>>>> > > > >>>> 2025年8月13日 14:23,Hongshun Wang <[email protected]> >>>>>>>>>>>>>>>> 写道: >>>>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>>>> > > > >>>> Hi Becket, >>>>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>>>> > > > >>>> Thank you for your detailed feedback. The new >>>>>>>>>>>>>>>> contract makes good >>>>>>>>>>>>>>>> > > > sense >>>>>>>>>>>>>>>> > > > >>>> to me and effectively addresses the issues I >>>>>>>>>>>>>>>> encountered at the >>>>>>>>>>>>>>>> > > > beginning >>>>>>>>>>>>>>>> > > > >>>> of the design. >>>>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>>>> > > > >>>> That said, I recommend not reporting splits by >>>>>>>>>>>>>>>> default, primarily >>>>>>>>>>>>>>>> > for >>>>>>>>>>>>>>>> > > > >>>> compatibility and practical reasons: >>>>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>>>> > > > >>>> > For these reasons, we do not expect the Split >>>>>>>>>>>>>>>> objects to be >>>>>>>>>>>>>>>> > huge, >>>>>>>>>>>>>>>> > > > >>>> and we are not trying to design for huge Split >>>>>>>>>>>>>>>> objects either as >>>>>>>>>>>>>>>> > they >>>>>>>>>>>>>>>> > > > will >>>>>>>>>>>>>>>> > > > >>>> have problems even today. >>>>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>>>> > > > >>>> 1. >>>>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>>>> > > > >>>> Not all existing connector match this rule >>>>>>>>>>>>>>>> > > > >>>> For example, in mysql cdc connector, a binlog >>>>>>>>>>>>>>>> split may contain >>>>>>>>>>>>>>>> > > > >>>> hundreds (or even more) snapshot split >>>>>>>>>>>>>>>> completion records. This >>>>>>>>>>>>>>>> > > > state is >>>>>>>>>>>>>>>> > > > >>>> large and is currently transmitted >>>>>>>>>>>>>>>> incrementally through >>>>>>>>>>>>>>>> > multiple >>>>>>>>>>>>>>>> > > > >>>> BinlogSplitMetaEvent messages. Since the >>>>>>>>>>>>>>>> binlog reader operates >>>>>>>>>>>>>>>> > > > >>>> with single parallelism, reporting the full >>>>>>>>>>>>>>>> split state on >>>>>>>>>>>>>>>> > recovery >>>>>>>>>>>>>>>> > > > >>>> could be inefficient or even infeasible. >>>>>>>>>>>>>>>> > > > >>>> For such sources, it would be better to >>>>>>>>>>>>>>>> provide a mechanism to >>>>>>>>>>>>>>>> > skip >>>>>>>>>>>>>>>> > > > >>>> split reporting during restart until they >>>>>>>>>>>>>>>> redesign and reduce >>>>>>>>>>>>>>>> > the >>>>>>>>>>>>>>>> > > > >>>> split size. >>>>>>>>>>>>>>>> > > > >>>> 2. >>>>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>>>> > > > >>>> Not all enumerators maintain unassigned >>>>>>>>>>>>>>>> splits in state. >>>>>>>>>>>>>>>> > > > >>>> Some SplitEnumerator(such as kafka connector) >>>>>>>>>>>>>>>> implementations >>>>>>>>>>>>>>>> > do >>>>>>>>>>>>>>>> > > > >>>> not track or persistently manage unassigned >>>>>>>>>>>>>>>> splits. Requiring >>>>>>>>>>>>>>>> > them >>>>>>>>>>>>>>>> > > > to >>>>>>>>>>>>>>>> > > > >>>> handle re-registration would add unnecessary >>>>>>>>>>>>>>>> complexity. Even >>>>>>>>>>>>>>>> > > > though we >>>>>>>>>>>>>>>> > > > >>>> maybe implements in kafka connector, >>>>>>>>>>>>>>>> currently, kafka connector >>>>>>>>>>>>>>>> > is >>>>>>>>>>>>>>>> > > > decouple >>>>>>>>>>>>>>>> > > > >>>> with flink version, we also need to make sure >>>>>>>>>>>>>>>> the elder version >>>>>>>>>>>>>>>> > is >>>>>>>>>>>>>>>> > > > >>>> compatible. >>>>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>>>> > > > >>>> ------------------------------ >>>>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>>>> > > > >>>> To address these concerns, I propose introducing >>>>>>>>>>>>>>>> a new method: >>>>>>>>>>>>>>>> > boolean >>>>>>>>>>>>>>>> > > > >>>> SourceReader#shouldReassignSplitsOnRecovery() >>>>>>>>>>>>>>>> with a default >>>>>>>>>>>>>>>> > > > >>>> implementation returning false. This allows >>>>>>>>>>>>>>>> source readers to opt >>>>>>>>>>>>>>>> > in >>>>>>>>>>>>>>>> > > > >>>> to split reassignment only when necessary. Since >>>>>>>>>>>>>>>> the new contract >>>>>>>>>>>>>>>> > > > already >>>>>>>>>>>>>>>> > > > >>>> places the responsibility for split assignment >>>>>>>>>>>>>>>> on the enumerator, >>>>>>>>>>>>>>>> > not >>>>>>>>>>>>>>>> > > > >>>> reporting splits by default is a safe and clean >>>>>>>>>>>>>>>> default behavior. >>>>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>>>> > > > >>>> ------------------------------ >>>>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>>>> > > > >>>> I’ve updated the implementation and the FIP >>>>>>>>>>>>>>>> accordingly[1]. It >>>>>>>>>>>>>>>> > quite a >>>>>>>>>>>>>>>> > > > >>>> big change. In particular, for the Kafka >>>>>>>>>>>>>>>> connector, we can now use >>>>>>>>>>>>>>>> > a >>>>>>>>>>>>>>>> > > > >>>> pluggable SplitPartitioner to support different >>>>>>>>>>>>>>>> split assignment >>>>>>>>>>>>>>>> > > > >>>> strategies (e.g., default, round-robin). >>>>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>>>> > > > >>>> Could you please review it when you have a >>>>>>>>>>>>>>>> chance? >>>>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>>>> > > > >>>> Best, >>>>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>>>> > > > >>>> Hongshun >>>>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>>>> > > > >>>> [1] >>>>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment >>>>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>>>> > > > >>>> On Sat, Aug 9, 2025 at 3:03 AM Becket Qin < >>>>>>>>>>>>>>>> [email protected]> >>>>>>>>>>>>>>>> > > > wrote: >>>>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>>>> > > > >>>>> Hi Hongshun, >>>>>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>>>>> > > > >>>>> I am not too concerned about the transmission >>>>>>>>>>>>>>>> cost. Because the >>>>>>>>>>>>>>>> > full >>>>>>>>>>>>>>>> > > > >>>>> split transmission has to happen in the initial >>>>>>>>>>>>>>>> assignment phase >>>>>>>>>>>>>>>> > > > already. >>>>>>>>>>>>>>>> > > > >>>>> And in the future, we probably want to also >>>>>>>>>>>>>>>> introduce some kind >>>>>>>>>>>>>>>> > of >>>>>>>>>>>>>>>> > > > workload >>>>>>>>>>>>>>>> > > > >>>>> balance across source readers, e.g. based on >>>>>>>>>>>>>>>> the per-split >>>>>>>>>>>>>>>> > > > throughput or >>>>>>>>>>>>>>>> > > > >>>>> the per-source-reader workload in heterogeneous >>>>>>>>>>>>>>>> clusters. For >>>>>>>>>>>>>>>> > these >>>>>>>>>>>>>>>> > > > >>>>> reasons, we do not expect the Split objects to >>>>>>>>>>>>>>>> be huge, and we >>>>>>>>>>>>>>>> > are >>>>>>>>>>>>>>>> > > > not >>>>>>>>>>>>>>>> > > > >>>>> trying to design for huge Split objects either >>>>>>>>>>>>>>>> as they will have >>>>>>>>>>>>>>>> > > > problems >>>>>>>>>>>>>>>> > > > >>>>> even today. >>>>>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>>>>> > > > >>>>> Good point on the potential split loss, please >>>>>>>>>>>>>>>> see the reply >>>>>>>>>>>>>>>> > below: >>>>>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>>>>> > > > >>>>> Scenario 2: >>>>>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>>>>> > > > >>>>>> 1. Reader A reports splits (1 and 2), and >>>>>>>>>>>>>>>> Reader B reports (3 >>>>>>>>>>>>>>>> > and 4) >>>>>>>>>>>>>>>> > > > >>>>>> upon restart. >>>>>>>>>>>>>>>> > > > >>>>>> 2. Before the enumerator receives all reports >>>>>>>>>>>>>>>> and performs >>>>>>>>>>>>>>>> > > > >>>>>> reassignment, a checkpoint is triggered. >>>>>>>>>>>>>>>> > > > >>>>>> 3. Since no splits have been reassigned yet, >>>>>>>>>>>>>>>> both readers have >>>>>>>>>>>>>>>> > empty >>>>>>>>>>>>>>>> > > > >>>>>> states. >>>>>>>>>>>>>>>> > > > >>>>>> 4. When restarting from this checkpoint, all >>>>>>>>>>>>>>>> four splits are >>>>>>>>>>>>>>>> > lost. >>>>>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>>>>> > > > >>>>> The reader registration happens in the >>>>>>>>>>>>>>>> SourceOperator.open(), >>>>>>>>>>>>>>>> > which >>>>>>>>>>>>>>>> > > > >>>>> means the task is still in the initializing >>>>>>>>>>>>>>>> state, therefore the >>>>>>>>>>>>>>>> > > > checkpoint >>>>>>>>>>>>>>>> > > > >>>>> should not be triggered until the enumerator >>>>>>>>>>>>>>>> receives all the >>>>>>>>>>>>>>>> > split >>>>>>>>>>>>>>>> > > > reports. >>>>>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>>>>> > > > >>>>> There is a nuance here. Today, the RPC call >>>>>>>>>>>>>>>> from the TM to the JM >>>>>>>>>>>>>>>> > is >>>>>>>>>>>>>>>> > > > >>>>> async. So it is possible that the >>>>>>>>>>>>>>>> SourceOpertor.open() has >>>>>>>>>>>>>>>> > returned, >>>>>>>>>>>>>>>> > > > but >>>>>>>>>>>>>>>> > > > >>>>> the enumerator has not received the split >>>>>>>>>>>>>>>> reports. However, >>>>>>>>>>>>>>>> > because >>>>>>>>>>>>>>>> > > > the >>>>>>>>>>>>>>>> > > > >>>>> task status update RPC call goes to the same >>>>>>>>>>>>>>>> channel as the split >>>>>>>>>>>>>>>> > > > reports >>>>>>>>>>>>>>>> > > > >>>>> call, so the task status RPC call will happen >>>>>>>>>>>>>>>> after the split >>>>>>>>>>>>>>>> > > > reports call >>>>>>>>>>>>>>>> > > > >>>>> on the JM side. Therefore, on the JM side, the >>>>>>>>>>>>>>>> SourceCoordinator >>>>>>>>>>>>>>>> > will >>>>>>>>>>>>>>>> > > > >>>>> always first receive the split reports, then >>>>>>>>>>>>>>>> receive the >>>>>>>>>>>>>>>> > checkpoint >>>>>>>>>>>>>>>> > > > request. >>>>>>>>>>>>>>>> > > > >>>>> This "happen before" relationship is kind of >>>>>>>>>>>>>>>> important to >>>>>>>>>>>>>>>> > guarantee >>>>>>>>>>>>>>>> > > > >>>>> the consistent state between enumerator and >>>>>>>>>>>>>>>> readers. >>>>>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>>>>> > > > >>>>> Scenario 1: >>>>>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>>>>> > > > >>>>>> 1. Upon restart, Reader A reports assigned >>>>>>>>>>>>>>>> splits (1 and 2), and >>>>>>>>>>>>>>>> > > > >>>>>> Reader B reports (3 and 4). >>>>>>>>>>>>>>>> > > > >>>>>> 2. The enumerator receives these reports but >>>>>>>>>>>>>>>> only reassigns >>>>>>>>>>>>>>>> > splits 1 >>>>>>>>>>>>>>>> > > > >>>>>> and 2 — not 3 and 4. >>>>>>>>>>>>>>>> > > > >>>>>> 3. A checkpoint or savepoint is then >>>>>>>>>>>>>>>> triggered. Only splits 1 >>>>>>>>>>>>>>>> > and 2 >>>>>>>>>>>>>>>> > > > >>>>>> are recorded in the reader states; splits 3 >>>>>>>>>>>>>>>> and 4 are not >>>>>>>>>>>>>>>> > persisted. >>>>>>>>>>>>>>>> > > > >>>>>> 4. If the job is later restarted from this >>>>>>>>>>>>>>>> checkpoint, splits 3 >>>>>>>>>>>>>>>> > and >>>>>>>>>>>>>>>> > > > 4 >>>>>>>>>>>>>>>> > > > >>>>>> will be permanently lost. >>>>>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>>>>> > > > >>>>> This scenario is possible. One solution is to >>>>>>>>>>>>>>>> let the enumerator >>>>>>>>>>>>>>>> > > > >>>>> implementation handle this. That means if the >>>>>>>>>>>>>>>> enumerator relies >>>>>>>>>>>>>>>> > on >>>>>>>>>>>>>>>> > > > the >>>>>>>>>>>>>>>> > > > >>>>> initial split reports from the source readers, >>>>>>>>>>>>>>>> it should maintain >>>>>>>>>>>>>>>> > > > these >>>>>>>>>>>>>>>> > > > >>>>> reports by itself. In the above example, the >>>>>>>>>>>>>>>> enumerator will need >>>>>>>>>>>>>>>> > to >>>>>>>>>>>>>>>> > > > >>>>> remember that 3 and 4 are not assigned and put >>>>>>>>>>>>>>>> it into its own >>>>>>>>>>>>>>>> > state. >>>>>>>>>>>>>>>> > > > >>>>> The current contract is that anything assigned >>>>>>>>>>>>>>>> to the >>>>>>>>>>>>>>>> > SourceReaders >>>>>>>>>>>>>>>> > > > >>>>> are completely owned by the SourceReaders. >>>>>>>>>>>>>>>> Enumerators can >>>>>>>>>>>>>>>> > remember >>>>>>>>>>>>>>>> > > > the >>>>>>>>>>>>>>>> > > > >>>>> assignments but cannot change them, even when >>>>>>>>>>>>>>>> the source reader >>>>>>>>>>>>>>>> > > > recovers / >>>>>>>>>>>>>>>> > > > >>>>> restarts. >>>>>>>>>>>>>>>> > > > >>>>> With this FLIP, the contract becomes that the >>>>>>>>>>>>>>>> source readers will >>>>>>>>>>>>>>>> > > > >>>>> return the ownership of the splits to the >>>>>>>>>>>>>>>> enumerator. So the >>>>>>>>>>>>>>>> > > > enumerator is >>>>>>>>>>>>>>>> > > > >>>>> responsible for maintaining these splits, until >>>>>>>>>>>>>>>> they are assigned >>>>>>>>>>>>>>>> > to >>>>>>>>>>>>>>>> > > > a >>>>>>>>>>>>>>>> > > > >>>>> source reader again. >>>>>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>>>>> > > > >>>>> There are other cases where there may be >>>>>>>>>>>>>>>> conflict information >>>>>>>>>>>>>>>> > between >>>>>>>>>>>>>>>> > > > >>>>> reader and enumerator. For example, consider >>>>>>>>>>>>>>>> the following >>>>>>>>>>>>>>>> > sequence: >>>>>>>>>>>>>>>> > > > >>>>> 1. reader A reports splits (1 and 2) up on >>>>>>>>>>>>>>>> restart. >>>>>>>>>>>>>>>> > > > >>>>> 2. enumerator receives the report and assigns >>>>>>>>>>>>>>>> both 1 and 2 to >>>>>>>>>>>>>>>> > reader >>>>>>>>>>>>>>>> > > > B. >>>>>>>>>>>>>>>> > > > >>>>> 3. reader A failed before checkpointing. And >>>>>>>>>>>>>>>> this is a partial >>>>>>>>>>>>>>>> > > > >>>>> failure, so only reader A restarts. >>>>>>>>>>>>>>>> > > > >>>>> 4. When reader A recovers, it will again report >>>>>>>>>>>>>>>> splits (1 and 2) >>>>>>>>>>>>>>>> > to >>>>>>>>>>>>>>>> > > > >>>>> the enumerator. >>>>>>>>>>>>>>>> > > > >>>>> 5. The enumerator should ignore this report >>>>>>>>>>>>>>>> because it has >>>>>>>>>>>>>>>> > > > >>>>> assigned splits (1 and 2) to reader B. >>>>>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>>>>> > > > >>>>> So with the new contract, the enumerator should >>>>>>>>>>>>>>>> be the source of >>>>>>>>>>>>>>>> > > > truth >>>>>>>>>>>>>>>> > > > >>>>> for split ownership. >>>>>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>>>>> > > > >>>>> Thanks, >>>>>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>>>>> > > > >>>>> Jiangjie (Becket) Qin >>>>>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>>>>> > > > >>>>> On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang < >>>>>>>>>>>>>>>> > > > [email protected]> >>>>>>>>>>>>>>>> > > > >>>>> wrote: >>>>>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>>>>> > > > >>>>>> Hi Becket, >>>>>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>>>>> > > > >>>>>> I did consider this approach at the beginning >>>>>>>>>>>>>>>> (and it was also >>>>>>>>>>>>>>>> > > > >>>>>> mentioned in this FLIP), since it would allow >>>>>>>>>>>>>>>> more flexibility >>>>>>>>>>>>>>>> > in >>>>>>>>>>>>>>>> > > > >>>>>> reassigning all splits. However, there are a >>>>>>>>>>>>>>>> few potential >>>>>>>>>>>>>>>> > issues. >>>>>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>>>>> > > > >>>>>> 1. High Transmission Cost >>>>>>>>>>>>>>>> > > > >>>>>> If we pass the full split objects (rather than >>>>>>>>>>>>>>>> just split IDs), >>>>>>>>>>>>>>>> > the >>>>>>>>>>>>>>>> > > > >>>>>> data size could be significant, leading to >>>>>>>>>>>>>>>> high overhead during >>>>>>>>>>>>>>>> > > > >>>>>> transmission — especially when many splits are >>>>>>>>>>>>>>>> involved. >>>>>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>>>>> > > > >>>>>> 2. Risk of Split Loss >>>>>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>>>>> > > > >>>>>> Risk of split loss exists unless we have a >>>>>>>>>>>>>>>> mechanism to make >>>>>>>>>>>>>>>> > sure >>>>>>>>>>>>>>>> > > > >>>>>> only can checkpoint after all the splits are >>>>>>>>>>>>>>>> reassigned. >>>>>>>>>>>>>>>> > > > >>>>>> There are scenarios where splits could be lost >>>>>>>>>>>>>>>> due to >>>>>>>>>>>>>>>> > inconsistent >>>>>>>>>>>>>>>> > > > >>>>>> state handling during recovery: >>>>>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>>>>> > > > >>>>>> Scenario 1: >>>>>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>>>>> > > > >>>>>> 1. Upon restart, Reader A reports assigned >>>>>>>>>>>>>>>> splits (1 and 2), >>>>>>>>>>>>>>>> > and >>>>>>>>>>>>>>>> > > > >>>>>> Reader B reports (3 and 4). >>>>>>>>>>>>>>>> > > > >>>>>> 2. The enumerator receives these reports >>>>>>>>>>>>>>>> but only reassigns >>>>>>>>>>>>>>>> > > > >>>>>> splits 1 and 2 — not 3 and 4. >>>>>>>>>>>>>>>> > > > >>>>>> 3. A checkpoint or savepoint is then >>>>>>>>>>>>>>>> triggered. Only splits 1 >>>>>>>>>>>>>>>> > and >>>>>>>>>>>>>>>> > > > >>>>>> 2 are recorded in the reader states; splits >>>>>>>>>>>>>>>> 3 and 4 are not >>>>>>>>>>>>>>>> > > > persisted. >>>>>>>>>>>>>>>> > > > >>>>>> 4. If the job is later restarted from this >>>>>>>>>>>>>>>> checkpoint, splits >>>>>>>>>>>>>>>> > 3 >>>>>>>>>>>>>>>> > > > >>>>>> and 4 will be permanently lost. >>>>>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>>>>> > > > >>>>>> Scenario 2: >>>>>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>>>>> > > > >>>>>> 1. Reader A reports splits (1 and 2), and >>>>>>>>>>>>>>>> Reader B reports (3 >>>>>>>>>>>>>>>> > and >>>>>>>>>>>>>>>> > > > >>>>>> 4) upon restart. >>>>>>>>>>>>>>>> > > > >>>>>> 2. Before the enumerator receives all >>>>>>>>>>>>>>>> reports and performs >>>>>>>>>>>>>>>> > > > >>>>>> reassignment, a checkpoint is triggered. >>>>>>>>>>>>>>>> > > > >>>>>> 3. Since no splits have been reassigned >>>>>>>>>>>>>>>> yet, both readers >>>>>>>>>>>>>>>> > have >>>>>>>>>>>>>>>> > > > >>>>>> empty states. >>>>>>>>>>>>>>>> > > > >>>>>> 4. When restarting from this checkpoint, >>>>>>>>>>>>>>>> all four splits are >>>>>>>>>>>>>>>> > > > lost. >>>>>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>>>>> > > > >>>>>> Let me know if you have thoughts on how we >>>>>>>>>>>>>>>> might mitigate these >>>>>>>>>>>>>>>> > > > risks! >>>>>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>>>>> > > > >>>>>> Best >>>>>>>>>>>>>>>> > > > >>>>>> Hongshun >>>>>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>>>>> > > > >>>>>> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin < >>>>>>>>>>>>>>>> [email protected]> >>>>>>>>>>>>>>>> > > > >>>>>> wrote: >>>>>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>>>>> > > > >>>>>>> Hi Hongshun, >>>>>>>>>>>>>>>> > > > >>>>>>> >>>>>>>>>>>>>>>> > > > >>>>>>> The steps sound reasonable to me in general. >>>>>>>>>>>>>>>> In terms of the >>>>>>>>>>>>>>>> > > > updated >>>>>>>>>>>>>>>> > > > >>>>>>> FLIP wiki, it would be good to see if we can >>>>>>>>>>>>>>>> keep the protocol >>>>>>>>>>>>>>>> > > > simple. One >>>>>>>>>>>>>>>> > > > >>>>>>> alternative way to achieve this behavior is >>>>>>>>>>>>>>>> following: >>>>>>>>>>>>>>>> > > > >>>>>>> >>>>>>>>>>>>>>>> > > > >>>>>>> 1. Upon SourceOperator startup, the >>>>>>>>>>>>>>>> SourceOperator sends >>>>>>>>>>>>>>>> > > > >>>>>>> ReaderRegistrationEvent with the currently >>>>>>>>>>>>>>>> assigned splits to >>>>>>>>>>>>>>>> > the >>>>>>>>>>>>>>>> > > > >>>>>>> enumerator. It does not add these splits to >>>>>>>>>>>>>>>> the SourceReader. >>>>>>>>>>>>>>>> > > > >>>>>>> 2. The enumerator will always use the >>>>>>>>>>>>>>>> > > > >>>>>>> SourceEnumeratorContext.assignSplits() to >>>>>>>>>>>>>>>> assign the splits. >>>>>>>>>>>>>>>> > (not >>>>>>>>>>>>>>>> > > > via the >>>>>>>>>>>>>>>> > > > >>>>>>> response of the SourceRegistrationEvent, this >>>>>>>>>>>>>>>> allows async >>>>>>>>>>>>>>>> > split >>>>>>>>>>>>>>>> > > > assignment >>>>>>>>>>>>>>>> > > > >>>>>>> in case the enumerator wants to wait until >>>>>>>>>>>>>>>> all the readers are >>>>>>>>>>>>>>>> > > > registered) >>>>>>>>>>>>>>>> > > > >>>>>>> 3. The SourceOperator will only call >>>>>>>>>>>>>>>> SourceReader.addSplits() >>>>>>>>>>>>>>>> > when >>>>>>>>>>>>>>>> > > > >>>>>>> it receives the AddSplitEvent from the >>>>>>>>>>>>>>>> enumerator. >>>>>>>>>>>>>>>> > > > >>>>>>> >>>>>>>>>>>>>>>> > > > >>>>>>> This protocol has a few benefits: >>>>>>>>>>>>>>>> > > > >>>>>>> 1. it basically allows arbitrary split >>>>>>>>>>>>>>>> reassignment upon >>>>>>>>>>>>>>>> > restart >>>>>>>>>>>>>>>> > > > >>>>>>> 2. simplicity: there is only one way to >>>>>>>>>>>>>>>> assign splits. >>>>>>>>>>>>>>>> > > > >>>>>>> >>>>>>>>>>>>>>>> > > > >>>>>>> So we only need one interface change: >>>>>>>>>>>>>>>> > > > >>>>>>> - add the initially assigned splits to >>>>>>>>>>>>>>>> ReaderInfo so the >>>>>>>>>>>>>>>> > Enumerator >>>>>>>>>>>>>>>> > > > >>>>>>> can access it. >>>>>>>>>>>>>>>> > > > >>>>>>> and one behavior change: >>>>>>>>>>>>>>>> > > > >>>>>>> - The SourceOperator should stop assigning >>>>>>>>>>>>>>>> splits to the from >>>>>>>>>>>>>>>> > state >>>>>>>>>>>>>>>> > > > >>>>>>> restoration, but >>>>>>>>>>>>>>>> > [message truncated...] >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>
