Hi Hongshun,

Thanks for updating the FLIP. It looks much cleaner. Some minor comments:

1. Rename the `splitsOnRecovery` in ReaderInfo to
`reportedSplitsOnRegistration`.
2. The SourceOperator should always send the ReaderRegistrationEvent with
the `reportedSplitsOnRegistration` list. But it will not add the splits to
readers if `SupportSplitReassignmentOnRecovery` is implemented.
3. "*Thus, If a connector want to use this FLIP-537, enumerator must keep a
unAssignedSplit in state*" - this is up to the enumerator implementation to
decide. It is not a must have for all the enumerator impl.
4. The RoundRobin algorithm in KafkaSource is not deterministic. Why not
just get all the splits and do a round robin based on the numReaders? E.g.
Sort all the splits, and assign the splits to reader 0, 1, 2, 3...N, 0, 1,
2, 3... N...

Thanks,

Jiangjie (Becket) Qin

On Tue, Oct 14, 2025 at 7:28 PM Hongshun Wang <[email protected]>
wrote:

> Hi devs,
>
> If there is no other problem, I will start a vote later.
>
> Best,
> Hongshun
>
> On Mon, Oct 13, 2025 at 4:17 PM Hongshun Wang <[email protected]>
> wrote:
>
>> Hi Becket and Leonard,
>>
>> It seems adding  `splitsOnRecovery` to `ReaderInfo` makes the split
>> enumerator simpler and cleaner.
>>
>> I have modified this FLIP again. Please have a look and let me know what
>> you think.
>>
>> Best,
>> Hongshun
>>
>> On Mon, Oct 13, 2025 at 10:48 AM Hongshun Wang <[email protected]>
>> wrote:
>>
>>> Hi Becket,
>>> Thanks for your explanation.
>>>
>>> > For the same three input above, the assignment should be consistently
>>> the same.
>>>
>>> That is exactly what troubles me. For *assignment algorithms such as
>>> hash, it does behave the same. What If we use round-robin? Each *the *reader
>>> information, the same split will be assigned to different readers. There is
>>> also what I used to list as an example.*
>>>
>>>    1. *Initial state:*: 2 parallelism, 2 splits.
>>>    2. *Enumerator action:*  Split 1 → Task 1, Split 2 → Task 2 ,  ,
>>>    3. *Failure scenario: *After Split 2 is assigned to Task 2 but
>>>    before next checkpoint success, task 1 restarts.
>>>    4. *Recovery issue:* Split 2 is re-added to the enumerator.
>>>    Round-robin strategy assigns Split 2 to Task 1. Then Task 1 now has 2
>>>    splits, Task 2 has 0 → Imbalanced distribution.
>>>
>>>
>>> > Please let me know if you think a meeting would be more efficient.
>>> Yes, I’d like to reach an agreement as soon as possible. If you’re
>>> available, we could schedule a meeting with Lenenord as well.
>>>
>>> Best,
>>> Hongshun
>>>
>>> On Sat, Oct 11, 2025 at 3:59 PM Becket Qin <[email protected]> wrote:
>>>
>>>> Hi Hongshun,
>>>>
>>>> I am confused. First of all, regardless of what the assignment
>>>> algorithm is. Using SplitEnumeratorContext to return the splits only gives
>>>> more information than using addSplitsBack(). So there should be no
>>>> regression.
>>>>
>>>> Secondly, at this point. The SplitEnumerator should only take the
>>>> following three input to generate the global splits assignment:
>>>> 1. the *reader information (num readers, locations, etc)*
>>>> 2. *all the splits to assign*
>>>> 3. *configured assignment algorithm *
>>>> Preferably, for the same three input above, the assignment should be
>>>> consistently the same. I don't see why it should care about why a new
>>>> reader is added, whether due to partial failover or global failover or job
>>>> restart.
>>>>
>>>> If you want to do global redistribution on global failover and restart,
>>>> but honor the existing assignment for partial failover. The enumerator will
>>>> just do the following:
>>>> 1. Generate a new global assignment (global redistribution) in start()
>>>> because start() will only be invoked in global failover or restart. That
>>>> means all the readers are also new with empty assignment.
>>>> 2. After the global assignment is generated, it should be honored for
>>>> the whole life cycle. there might be many reader registrations, again for
>>>> different reasons but does not matter:
>>>>     - reader registration after this job restart
>>>>     - reader registration after this global failover
>>>>     - reader registration due to partial failover which may or may not
>>>> have a addSplitsBack() call.
>>>>     Regardless of the reason, the split enumerator will just enforce
>>>> the global assignment it has already generated, i.e. without split
>>>> redistribution.
>>>>
>>>> Wouldn't that give the behavior you want? I feel the discussion somehow
>>>> goes to circles. Please let me know if you think a meeting would be more
>>>> efficient.
>>>>
>>>> Thanks,
>>>>
>>>> Jiangjie (Becket) Qin
>>>>
>>>> On Fri, Oct 10, 2025 at 7:58 PM Hongshun Wang <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Becket,
>>>>>
>>>>> > Ignore a returned split if it has been assigned to a different
>>>>> reader, otherwise put it back to unassigned splits / pending splits. Then
>>>>> the enumerator assigns new splits to the newly added reader, which may use
>>>>> the previous assignment as a reference. This should work regardless of
>>>>> whether it is a global failover, partial failover, restart, etc. There is
>>>>> no need for the SplitEnumerator to distinguish what failover scenario it 
>>>>> is.
>>>>>
>>>>> In this case, it seems that global failover and partial failover share
>>>>> the same distribution strategy If it has not been assigned to a different
>>>>> reader. However, global failover needs to be redistributed(this is why we
>>>>> need this FLIP) , while partial failover is not. I have no idea how we
>>>>> distinguish them.
>>>>>
>>>>> What do you think?
>>>>>
>>>>> Best,
>>>>> Hongshun
>>>>>
>>>>> On Sat, Oct 11, 2025 at 12:54 AM Becket Qin <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Hongshun,
>>>>>>
>>>>>> The problem we are trying to solve here is to give the splits back to
>>>>>> the SplitEnumerator. There are only two types of splits to give back:
>>>>>> 1) splits whose assignment has been checkpointed. - In this case, we
>>>>>> rely on addReader() + SplitEnumeratorContext to give the splits back, 
>>>>>> this
>>>>>> provides more information associated with those splits.
>>>>>> 2) splits whose assignment has not been checkpointed. -  In this
>>>>>> case, we use addSplitsBack(), there is no reader info to give because the
>>>>>> previous assignment did not take effect to begin with.
>>>>>>
>>>>>> From the SplitEnumerator implementation perspective, the contract is
>>>>>> straightforward.
>>>>>> 1. The SplitEnumerator is the source of truth for assignment.
>>>>>> 2. When the enumerator receives the addSplits() call, it always add
>>>>>> these splits back to unassigned splits / pending splits.
>>>>>> 3. When the enumerator receives the addReader() call, that means the
>>>>>> reader has no current assignment, and has returned its previous 
>>>>>> assignment
>>>>>> based on the reader side info. The SplitEnumerator checks the
>>>>>> SplitEnumeratorContext to retrieve the returned splits from that reader
>>>>>> (i.e. previous assignment) and handle them according to its own source of
>>>>>> truth knowledge of assignment - Ignore a returned split if it has been
>>>>>> assigned to a different reader, otherwise put it back to unassigned 
>>>>>> splits
>>>>>> / pending splits. Then the enumerator assigns new splits to the newly 
>>>>>> added
>>>>>> reader, which may use the previous assignment as a reference. This should
>>>>>> work regardless of whether it is a global failover, partial failover,
>>>>>> restart, etc. There is no need for the SplitEnumerator to distinguish 
>>>>>> what
>>>>>> failover scenario it is.
>>>>>>
>>>>>> Would this work?
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Jiangjie (Becket) Qin
>>>>>>
>>>>>> On Fri, Oct 10, 2025 at 1:28 AM Hongshun Wang <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Hi Becket,
>>>>>>>  > why do we need to change the behavior of addSplitsBack()? Should
>>>>>>> it remain the same?
>>>>>>>
>>>>>>> How does the enumerator get the splits from ReaderRegistrationEvent
>>>>>>> and then reassign it?
>>>>>>>
>>>>>>> You have given a advice before:
>>>>>>> > 1. Put all the reader information in the SplitEnumerator context.
>>>>>>> 2. notify the enumerator about the new reader registration. 3. let the
>>>>>>> split enumerator get whatever information it wants from the context and 
>>>>>>> do
>>>>>>> its job.
>>>>>>>
>>>>>>> However, each time a source task fails over, the
>>>>>>> ConcurrentMap<Integer, ConcurrentMap<Integer, ReaderInfo>>
>>>>>>> registeredReaders will remove this reader infos. When the source task is
>>>>>>> registered again, it will be added again. *Thus, registeredReaders
>>>>>>> cannot know whether is registered before. *
>>>>>>>
>>>>>>> Therefore, registeredReaders enumerator#addReader does not
>>>>>>> distinguish the following situations:
>>>>>>> However, each time one source task is failover. The
>>>>>>> `ConcurrentMap<Integer, ConcurrentMap<Integer, ReaderInfo>>
>>>>>>> registeredReaders` will remove this source. When source Task is 
>>>>>>> registered
>>>>>>> again, enumerator#addReader not distinguished three situations:
>>>>>>> 1. The Reader is registered when the global restart. In this case,
>>>>>>> redistribution the split from the infos. (take off all the splits from
>>>>>>> ReaderInfo).
>>>>>>> 2. The Reader is registered when a partial failover(before the first
>>>>>>> successful checkpoint). In this case,  ignore the split from the infos.
>>>>>>> (leave alone all the splits from ReaderInfo).
>>>>>>> 3. The Reader is registered when a partial failover(after the first
>>>>>>> successful checkpoint).In this case, we need assign the split to same
>>>>>>> reader again. (take off all the splits from ReaderInfo but assigned to 
>>>>>>> it
>>>>>>> again).
>>>>>>> we still need the enumerator to distinguish them (using
>>>>>>> pendingSplitAssignment & assignedSplitAssignment. However, it is 
>>>>>>> redundant
>>>>>>> to maintain split assigned information both in the enumerator and the
>>>>>>> enumerator context.
>>>>>>>
>>>>>>> I think if we change the behavior of addSplitsBack, it will be more
>>>>>>> simple. Just let the enumerator to handle these split based on 
>>>>>>> pendingSplitAssignment
>>>>>>> & assignedSplitments.
>>>>>>>
>>>>>>> What do you think?
>>>>>>>
>>>>>>> Best,
>>>>>>> Hongshun
>>>>>>>
>>>>>>> On Fri, Oct 10, 2025 at 12:55 PM Becket Qin <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Hongshun,
>>>>>>>>
>>>>>>>> Thanks for updating the FLIP. A quick question: why do we need to
>>>>>>>> change the behavior of addSplitsBack()? Should it remain the same?
>>>>>>>>
>>>>>>>> Regarding the case of restart with changed subscription. I think
>>>>>>>> the only correct behavior is removing obsolete splits without any 
>>>>>>>> warning /
>>>>>>>> exception. It is OK to add an info level logging if we want to. It is a
>>>>>>>> clear intention if the user has explicitly changed subscription and
>>>>>>>> restarted the job. There is no need to add a config to double confirm.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>>
>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>
>>>>>>>> On Thu, Oct 9, 2025 at 7:28 PM Hongshun Wang <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Hi Leonard,
>>>>>>>>>
>>>>>>>>> If the SplitEnumerator received all splits after a restart, it
>>>>>>>>> becomes straightforward to clear and un-assign the unmatched
>>>>>>>>> splits(checking whether matches the source options). However, a key
>>>>>>>>> question arises: *should  automatically discard obsolete splits,
>>>>>>>>> or explicitly notify the user via an exception?*
>>>>>>>>>
>>>>>>>>> We provided a option `scan.partition-unsubscribe.strategy`:
>>>>>>>>> 1. If Strict, throws an exception when encountering removed
>>>>>>>>> splits.
>>>>>>>>> 2. If Lenient, automatically removes obsolete splits silently.
>>>>>>>>>
>>>>>>>>> What Do you think?
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Hongshun
>>>>>>>>>
>>>>>>>>> On Thu, Oct 9, 2025 at 9:37 PM Leonard Xu <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks hongshun for the updating and pretty detailed analysis for
>>>>>>>>>> edge cases,  the updated FLIP looks good to me now.
>>>>>>>>>>
>>>>>>>>>> Only last implementation details about scenario in motivation
>>>>>>>>>> section:
>>>>>>>>>>
>>>>>>>>>> *Restart with Changed subscription: During restart, if source
>>>>>>>>>> options remove a topic or table. The splits which have already 
>>>>>>>>>> assigned can
>>>>>>>>>> not be removed.*
>>>>>>>>>>
>>>>>>>>>> Could you clarify how we resolve this in Kafka connector ?
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Leonard
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 2025 10月 9 19:48,Hongshun Wang <[email protected]> 写道:
>>>>>>>>>>
>>>>>>>>>> Hi devs,
>>>>>>>>>> If there are no further suggestions, I will start the voting
>>>>>>>>>> tomorrow。
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Hongshun
>>>>>>>>>>
>>>>>>>>>> On Fri, Sep 26, 2025 at 7:48 PM Hongshun Wang <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Becket and Leonard,
>>>>>>>>>>>
>>>>>>>>>>> I have updated the content of this FLIP. The key point is that:
>>>>>>>>>>>
>>>>>>>>>>> When the split enumerator receives a split, *these splits must
>>>>>>>>>>> have already existed in pendingSplitAssignment or 
>>>>>>>>>>> assignedSplitments*
>>>>>>>>>>> .
>>>>>>>>>>>
>>>>>>>>>>>    - If the split is in pendingSplitAssignments, ignore it.
>>>>>>>>>>>    - If the split is in assignedSplitAssignments but has a
>>>>>>>>>>>    different taskId, ignore it (this indicates it was already
>>>>>>>>>>>    assigned to another task).
>>>>>>>>>>>    - If the split is in assignedSplitAssignments and shares the
>>>>>>>>>>>    same taskId, move the assignment from assignedSplitments to 
>>>>>>>>>>> pendingSplitAssignment
>>>>>>>>>>>    to re-assign again.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> For better understanding why use these strategies. I added some
>>>>>>>>>>> examples and pictures to show it.
>>>>>>>>>>>
>>>>>>>>>>> Would you like to help me check whether there are still some
>>>>>>>>>>> problems?
>>>>>>>>>>>
>>>>>>>>>>> Best
>>>>>>>>>>> Hongshun
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Sep 26, 2025 at 5:08 PM Leonard Xu <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thanks Becket and Hongshun for the insightful discussion.
>>>>>>>>>>>>
>>>>>>>>>>>> The underlying implementation and communication mechanisms of
>>>>>>>>>>>> Flink Source indeed involve many intricate details, we discussed 
>>>>>>>>>>>> the issue
>>>>>>>>>>>> of splits re-assignment in specific scenarios, but fortunately, 
>>>>>>>>>>>> the final
>>>>>>>>>>>> decision turned out to be pretty clear.
>>>>>>>>>>>>
>>>>>>>>>>>>  +1 to Becket’s proposal to keeps the framework cleaner and
>>>>>>>>>>>> more flexible.
>>>>>>>>>>>> +1 to Hongshun’s point to provide comprehensive guidance for
>>>>>>>>>>>> connector developers.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Leonard
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> 2025 9月 26 16:30,Hongshun Wang <[email protected]> 写道:
>>>>>>>>>>>>
>>>>>>>>>>>> Hi Becket,
>>>>>>>>>>>>
>>>>>>>>>>>> I Got it. You’re suggesting we should not handle this in the
>>>>>>>>>>>> source framework but instead let the split enumerator manage these 
>>>>>>>>>>>> three
>>>>>>>>>>>> scenarios.
>>>>>>>>>>>>
>>>>>>>>>>>> Let me explain why I originally favored handling it in the
>>>>>>>>>>>> framework: I'm concerned that connector developers might overlook 
>>>>>>>>>>>> certain
>>>>>>>>>>>> edge cases (after all, we even payed extensive discussions to 
>>>>>>>>>>>> fully clarify
>>>>>>>>>>>> the logic)
>>>>>>>>>>>>
>>>>>>>>>>>> However, your point keeps the framework cleaner and more
>>>>>>>>>>>> flexible. Thus, I will take it.
>>>>>>>>>>>>
>>>>>>>>>>>> Perhaps, in this FLIP, we should focus on providing
>>>>>>>>>>>> comprehensive guidance for connector developers: explain how
>>>>>>>>>>>> to implement a split enumerator, including the underlying 
>>>>>>>>>>>> challenges and
>>>>>>>>>>>> their solutions.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Additionally, we can use the Kafka connector as a reference
>>>>>>>>>>>> implementation to demonstrate the practical steps. This way, 
>>>>>>>>>>>> developers who
>>>>>>>>>>>> want to implement similar connectors can directly reference this 
>>>>>>>>>>>> example.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Hongshun
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Sep 26, 2025 at 1:27 PM Becket Qin <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> It would be good to not expose runtime details to the source
>>>>>>>>>>>>> implementation if possible.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Today, the split enumerator implementations are expected to
>>>>>>>>>>>>> track the split assignment.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Assuming the split enumerator implementation keeps a split
>>>>>>>>>>>>> assignment map, that means the enumerator should already know 
>>>>>>>>>>>>> whether a
>>>>>>>>>>>>> split is assigned or unassigned. So it can handle the three 
>>>>>>>>>>>>> scenarios you
>>>>>>>>>>>>> mentioned.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The split is reported by a reader during a global restoration.
>>>>>>>>>>>>>>
>>>>>>>>>>>>> The split enumerator should have just been restored / created.
>>>>>>>>>>>>> If the enumerator expects a full reassignment of splits up on 
>>>>>>>>>>>>> global
>>>>>>>>>>>>> recovery, there should be no assigned splits to that reader in 
>>>>>>>>>>>>> the split
>>>>>>>>>>>>> assignment mapping.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The split is reported by a reader during a partial failure
>>>>>>>>>>>>>> recovery.
>>>>>>>>>>>>>>
>>>>>>>>>>>>> In this case, when SplitEnumerator.addReader() is invoked, the
>>>>>>>>>>>>> split assignment map in the enumerator implementation should 
>>>>>>>>>>>>> already have
>>>>>>>>>>>>> some split assignments for the reader. Therefore it is a partial 
>>>>>>>>>>>>> failover.
>>>>>>>>>>>>> If the source supports split reassignment on recovery, the 
>>>>>>>>>>>>> enumerator can
>>>>>>>>>>>>> assign splits that are different from the reported assignment of 
>>>>>>>>>>>>> that
>>>>>>>>>>>>> reader in the SplitEnumeratorContext, or it can also assign the 
>>>>>>>>>>>>> same
>>>>>>>>>>>>> splits. In any case, the enumerator knows that this is a partial 
>>>>>>>>>>>>> recovery
>>>>>>>>>>>>> because the assignment map is non-empty.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The split is not reported by a reader, but is assigned after
>>>>>>>>>>>>>> the last successful checkpoint and was never acknowledged.
>>>>>>>>>>>>>
>>>>>>>>>>>>> This is actually one of the step in the partial failure
>>>>>>>>>>>>> recover. SplitEnumerator.addSplitsBack() will be called first 
>>>>>>>>>>>>> before
>>>>>>>>>>>>> SplitReader.addReader() is called for the recovered reader. When 
>>>>>>>>>>>>> the
>>>>>>>>>>>>> SplitEnumerator.addSplitsBack() is invoked, it is for sure a 
>>>>>>>>>>>>> partial
>>>>>>>>>>>>> recovery. And the enumerator should remove these splits from the 
>>>>>>>>>>>>> split
>>>>>>>>>>>>> assignment map as if they were never assigned.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I think this should work, right?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Sep 25, 2025 at 8:34 PM Hongshun Wang <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Becket and Leonard,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for your advice.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> > put all the reader information in the SplitEnumerator
>>>>>>>>>>>>>> context
>>>>>>>>>>>>>> I have a concern: the current registeredReaders in*
>>>>>>>>>>>>>> SourceCoordinatorContext will be removed after subtaskResetor 
>>>>>>>>>>>>>> execution on
>>>>>>>>>>>>>> failure*.However, this approach has merit.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> One more situation I found my previous design does not cover:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>    1. Initial state: Reader A reports splits (1, 2).
>>>>>>>>>>>>>>    2. Enumerator action: Assigns split 1 to Reader A, and
>>>>>>>>>>>>>>    split 2 to Reader B.
>>>>>>>>>>>>>>    3. Failure scenario: Reader A fails before checkpointing.
>>>>>>>>>>>>>>    Since this is a partial failure, only Reader A restarts.
>>>>>>>>>>>>>>    4. Recovery issue: Upon recovery, Reader A re-reports
>>>>>>>>>>>>>>    split (1).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> In my previous design, the enumerator will ignore Reader A's
>>>>>>>>>>>>>> re-registration which will cause data loss.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thus, when the enumerator receives a split, the split may
>>>>>>>>>>>>>> originate from three scenarios:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>    1. The split is reported by a reader during a global
>>>>>>>>>>>>>>    restoration.
>>>>>>>>>>>>>>    2. The split is reported by a reader during a partial
>>>>>>>>>>>>>>    failure recovery.
>>>>>>>>>>>>>>    3. The split is not reported by a reader, but is assigned
>>>>>>>>>>>>>>    after the last successful checkpoint and was never 
>>>>>>>>>>>>>> acknowledged.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> In the first scenario (global restore), the split should
>>>>>>>>>>>>>> be re-distributed. For the latter two scenarios (partial 
>>>>>>>>>>>>>> failover and
>>>>>>>>>>>>>> post-checkpoint assignment), we need to reassign the split to
>>>>>>>>>>>>>> its originally assigned subtask.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> By implementing a method in the SplitEnumerator context to
>>>>>>>>>>>>>> track each assigned split's status, the system can correctly 
>>>>>>>>>>>>>> identify and
>>>>>>>>>>>>>> resolve split ownership in all three scenarios.*What about
>>>>>>>>>>>>>> adding a  `SplitRecoveryType splitRecoveryType(Split split)` in
>>>>>>>>>>>>>> SplitEnumeratorContext.* SplitRecoveryTypeis a enum
>>>>>>>>>>>>>> including `UNASSIGNED`、`GLOBAL_RESTORE`、`PARTIAL_FAILOVER` and
>>>>>>>>>>>>>> `POST_CHECKPOINT_ASSIGNMENT`.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> What do you think? Are there any details or scenarios I
>>>>>>>>>>>>>> haven't considered? Looking forward to your advice.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> Hongshun
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Sep 11, 2025 at 12:41 AM Becket Qin <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks for the explanation, Hongshun.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Current pattern of handling new reader registration
>>>>>>>>>>>>>>> following:
>>>>>>>>>>>>>>> 1. put all the reader information in the SplitEnumerator
>>>>>>>>>>>>>>> context
>>>>>>>>>>>>>>> 2. notify the enumerator about the new reader registration.
>>>>>>>>>>>>>>> 3. Let the split enumerator get whatever information it
>>>>>>>>>>>>>>> wants from the
>>>>>>>>>>>>>>> context and do its job.
>>>>>>>>>>>>>>> This pattern decouples the information passing and the
>>>>>>>>>>>>>>> reader registration
>>>>>>>>>>>>>>> notification. This makes the API extensible - we can add
>>>>>>>>>>>>>>> more information
>>>>>>>>>>>>>>> (e.g. reported assigned splits in our case) about the reader
>>>>>>>>>>>>>>> to the context
>>>>>>>>>>>>>>> without introducing new methods.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Introducing a new method of addSplitBackOnRecovery() is
>>>>>>>>>>>>>>> redundant to the
>>>>>>>>>>>>>>> above pattern. Do we really need it?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Mon, Sep 8, 2025 at 8:18 PM Hongshun Wang <
>>>>>>>>>>>>>>> [email protected]>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> > Hi Becket,
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > > I am curious what would the enumerator do differently
>>>>>>>>>>>>>>> for the splits
>>>>>>>>>>>>>>> > added via addSplitsBackOnRecovery() V.S. addSplitsBack()?
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> >  In this FLIP, there are two distinct scenarios in which
>>>>>>>>>>>>>>> the enumerator
>>>>>>>>>>>>>>> > receives splits being added back:
>>>>>>>>>>>>>>> > 1.  Job-level restore: The job is restored,  splits from
>>>>>>>>>>>>>>> reader’s state are
>>>>>>>>>>>>>>> > reported by ReaderRegistrationEvent.
>>>>>>>>>>>>>>> > 2.  Reader-level restart: a reader is started but not the
>>>>>>>>>>>>>>> whole  job,
>>>>>>>>>>>>>>> >  splits assigned to it after the last successful
>>>>>>>>>>>>>>> checkpoint. This is what
>>>>>>>>>>>>>>> > addSplitsBack used to do.
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > In these two situations, the enumerator will choose
>>>>>>>>>>>>>>> different strategies.
>>>>>>>>>>>>>>> > 1. Job-level restore: the splits should be redistributed
>>>>>>>>>>>>>>> across readers
>>>>>>>>>>>>>>> > according to the current partitioner strategy.
>>>>>>>>>>>>>>> > 2. Reader-level restart: the splits should be reassigned
>>>>>>>>>>>>>>> directly back to
>>>>>>>>>>>>>>> > the same reader they were originally assigned to,
>>>>>>>>>>>>>>> preserving locality and
>>>>>>>>>>>>>>> > avoiding unnecessary redistribution
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > Therefore, the enumerator must clearly distinguish between
>>>>>>>>>>>>>>> these two
>>>>>>>>>>>>>>> > scenarios.I used to deprecate the former
>>>>>>>>>>>>>>> addSplitsBack(List<SplitT>
>>>>>>>>>>>>>>> > splits, int subtaskId) but add a new
>>>>>>>>>>>>>>> addSplitsBack(List<SplitT>
>>>>>>>>>>>>>>> > splits, int subtaskId,
>>>>>>>>>>>>>>> > boolean reportedByReader).
>>>>>>>>>>>>>>> > Leonard suggest to use another method
>>>>>>>>>>>>>>> addSplitsBackOnRecovery but not
>>>>>>>>>>>>>>> > influenced  currently addSplitsBack.
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > Best
>>>>>>>>>>>>>>> > Hongshun
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > On 2025/09/08 17:20:31 Becket Qin wrote:
>>>>>>>>>>>>>>> > > Hi Leonard,
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > > > Could we introduce a new method like
>>>>>>>>>>>>>>> addSplitsBackOnRecovery  with
>>>>>>>>>>>>>>> > default
>>>>>>>>>>>>>>> > > > implementation. In this way, we can provide better
>>>>>>>>>>>>>>> backward
>>>>>>>>>>>>>>> > compatibility
>>>>>>>>>>>>>>> > > > and also makes it easier for developers to understand.
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > > I am curious what would the enumerator do differently
>>>>>>>>>>>>>>> for the splits
>>>>>>>>>>>>>>> > added
>>>>>>>>>>>>>>> > > via addSplitsBackOnRecovery() V.S. addSplitsBack()?
>>>>>>>>>>>>>>> Today,
>>>>>>>>>>>>>>> > addSplitsBack()
>>>>>>>>>>>>>>> > > is also only called upon recovery. So the new method
>>>>>>>>>>>>>>> seems confusing. One
>>>>>>>>>>>>>>> > > thing worth clarifying is if the Source implements
>>>>>>>>>>>>>>> > > SupportSplitReassignmentOnRecovery, upon recovery,
>>>>>>>>>>>>>>> should the splits
>>>>>>>>>>>>>>> > > reported by the readers also be added back to the
>>>>>>>>>>>>>>> SplitEnumerator via the
>>>>>>>>>>>>>>> > > addSplitsBack() call? Or should the SplitEnumerator
>>>>>>>>>>>>>>> explicitly query the
>>>>>>>>>>>>>>> > > registered reader information via the
>>>>>>>>>>>>>>> SplitEnumeratorContext to get the
>>>>>>>>>>>>>>> > > originally assigned splits when addReader() is invoked?
>>>>>>>>>>>>>>> I was assuming
>>>>>>>>>>>>>>> > the
>>>>>>>>>>>>>>> > > latter in the beginning, so the behavior of
>>>>>>>>>>>>>>> addSplitsBack() remains
>>>>>>>>>>>>>>> > > unchanged, but I am not opposed in doing the former.
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > > Also, can you elaborate on the backwards compatibility
>>>>>>>>>>>>>>> issue you see if
>>>>>>>>>>>>>>> > we
>>>>>>>>>>>>>>> > > do not have a separate addSplitsBackOnRecovery() method?
>>>>>>>>>>>>>>> Even without
>>>>>>>>>>>>>>> > this
>>>>>>>>>>>>>>> > > new method, the behavior remains exactly the same unless
>>>>>>>>>>>>>>> the end users
>>>>>>>>>>>>>>> > > implement the mix-in interface of
>>>>>>>>>>>>>>> "SupportSplitReassignmentOnRecovery",
>>>>>>>>>>>>>>> > > right?
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > > Thanks,
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > > Jiangjie (Becket) Qin
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > > On Mon, Sep 8, 2025 at 1:48 AM Hongshun Wang <
>>>>>>>>>>>>>>> [email protected]>
>>>>>>>>>>>>>>> > > wrote:
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > > > Hi devs,
>>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>>> > > > It has been quite some time since this FLIP[1] was
>>>>>>>>>>>>>>> first proposed.
>>>>>>>>>>>>>>> > Thank
>>>>>>>>>>>>>>> > > > you for your valuable feedback—based on your
>>>>>>>>>>>>>>> suggestions, the FLIP has
>>>>>>>>>>>>>>> > > > undergone several rounds of revisions.
>>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>>> > > > Any more advice is welcome and appreciated. If there
>>>>>>>>>>>>>>> are no further
>>>>>>>>>>>>>>> > > > concerns, I plan to start the vote tomorrow.
>>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>>> > > > Best
>>>>>>>>>>>>>>> > > > Hongshun
>>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>>> > > > [1]
>>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=373886480
>>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>>> > > > On Mon, Sep 8, 2025 at 4:42 PM Hongshun Wang <
>>>>>>>>>>>>>>> [email protected]>
>>>>>>>>>>>>>>> > > > wrote:
>>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>>> > > > > Hi Leonard,
>>>>>>>>>>>>>>> > > > > Thanks for your advice.  It makes sense and I have
>>>>>>>>>>>>>>> modified it.
>>>>>>>>>>>>>>> > > > >
>>>>>>>>>>>>>>> > > > > Best,
>>>>>>>>>>>>>>> > > > > Hongshun
>>>>>>>>>>>>>>> > > > >
>>>>>>>>>>>>>>> > > > > On Mon, Sep 8, 2025 at 11:40 AM Leonard Xu <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>> > > > >
>>>>>>>>>>>>>>> > > > >> Thanks Hongshun and Becket for the deep discussion.
>>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>>> > > > >> I only have one comment for one API design:
>>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>>> > > > >> > Deprecate the old addSplitsBack  method, use a
>>>>>>>>>>>>>>> addSplitsBack with
>>>>>>>>>>>>>>> > > > >> param isReportedByReader instead. Because, The
>>>>>>>>>>>>>>> enumerator can apply
>>>>>>>>>>>>>>> > > > >> different reassignment policies based on the
>>>>>>>>>>>>>>> context.
>>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>>> > > > >> Could we introduce a new method like
>>>>>>>>>>>>>>> *addSplitsBackOnRecovery*  with
>>>>>>>>>>>>>>> > > > default
>>>>>>>>>>>>>>> > > > >> implementation. In this way, we can provide better
>>>>>>>>>>>>>>> backward
>>>>>>>>>>>>>>> > > > >> compatibility and also makes it easier for
>>>>>>>>>>>>>>> developers to understand.
>>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>>> > > > >> Best,
>>>>>>>>>>>>>>> > > > >> Leonard
>>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>>> > > > >> 2025 9月 3 20:26,Hongshun Wang <[email protected]> 写道:
>>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>>> > > > >> Hi Becket,
>>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>>> > > > >> I think that's a great idea!  I have added the
>>>>>>>>>>>>>>> > > > >> SupportSplitReassignmentOnRecovery interface in
>>>>>>>>>>>>>>> this FLIP. If a
>>>>>>>>>>>>>>> > Source
>>>>>>>>>>>>>>> > > > >> implements this interface indicates that the source
>>>>>>>>>>>>>>> operator needs
>>>>>>>>>>>>>>> > to
>>>>>>>>>>>>>>> > > > >> report splits to the enumerator and receive
>>>>>>>>>>>>>>> reassignment.[1]
>>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>>> > > > >> Best,
>>>>>>>>>>>>>>> > > > >> Hongshun
>>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>>> > > > >> [1]
>>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>>> > > > >> On Thu, Aug 21, 2025 at 12:09 PM Becket Qin <
>>>>>>>>>>>>>>> [email protected]>
>>>>>>>>>>>>>>> > > > wrote:
>>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>>> > > > >>> Hi Hongshun,
>>>>>>>>>>>>>>> > > > >>>
>>>>>>>>>>>>>>> > > > >>> I think the convention for such optional features
>>>>>>>>>>>>>>> in Source is via
>>>>>>>>>>>>>>> > > > >>> mix-in interfaces. So instead of adding a method
>>>>>>>>>>>>>>> to the
>>>>>>>>>>>>>>> > SourceReader,
>>>>>>>>>>>>>>> > > > maybe
>>>>>>>>>>>>>>> > > > >>> we should introduce an interface
>>>>>>>>>>>>>>> SupportSplitReassingmentOnRecovery
>>>>>>>>>>>>>>> > > > with
>>>>>>>>>>>>>>> > > > >>> this method. If a Source implementation implements
>>>>>>>>>>>>>>> that interface,
>>>>>>>>>>>>>>> > > > then the
>>>>>>>>>>>>>>> > > > >>> SourceOperator will check the desired behavior and
>>>>>>>>>>>>>>> act accordingly.
>>>>>>>>>>>>>>> > > > >>>
>>>>>>>>>>>>>>> > > > >>> Thanks,
>>>>>>>>>>>>>>> > > > >>>
>>>>>>>>>>>>>>> > > > >>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>>> > > > >>>
>>>>>>>>>>>>>>> > > > >>> On Wed, Aug 20, 2025 at 8:52 PM Hongshun Wang <
>>>>>>>>>>>>>>> > [email protected]
>>>>>>>>>>>>>>> > > > >
>>>>>>>>>>>>>>> > > > >>> wrote:
>>>>>>>>>>>>>>> > > > >>>
>>>>>>>>>>>>>>> > > > >>>> Hi de vs,
>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>> > > > >>>> Would anyone like to discuss this FLIP? I'd
>>>>>>>>>>>>>>> appreciate your
>>>>>>>>>>>>>>> > feedback
>>>>>>>>>>>>>>> > > > >>>> and suggestions.
>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>> > > > >>>> Best,
>>>>>>>>>>>>>>> > > > >>>> Hongshun
>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>> > > > >>>> 2025年8月13日 14:23,Hongshun Wang <[email protected]>
>>>>>>>>>>>>>>> 写道:
>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>> > > > >>>> Hi Becket,
>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>> > > > >>>> Thank you for your detailed feedback. The new
>>>>>>>>>>>>>>> contract makes good
>>>>>>>>>>>>>>> > > > sense
>>>>>>>>>>>>>>> > > > >>>> to me and effectively addresses the issues I
>>>>>>>>>>>>>>> encountered at the
>>>>>>>>>>>>>>> > > > beginning
>>>>>>>>>>>>>>> > > > >>>> of the design.
>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>> > > > >>>> That said, I recommend not reporting splits by
>>>>>>>>>>>>>>> default, primarily
>>>>>>>>>>>>>>> > for
>>>>>>>>>>>>>>> > > > >>>> compatibility and practical reasons:
>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>> > > > >>>> >  For these reasons, we do not expect the Split
>>>>>>>>>>>>>>> objects to be
>>>>>>>>>>>>>>> > huge,
>>>>>>>>>>>>>>> > > > >>>> and we are not trying to design for huge Split
>>>>>>>>>>>>>>> objects either as
>>>>>>>>>>>>>>> > they
>>>>>>>>>>>>>>> > > > will
>>>>>>>>>>>>>>> > > > >>>> have problems even today.
>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>> > > > >>>>    1.
>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>> > > > >>>>    Not all existing connector match this rule
>>>>>>>>>>>>>>> > > > >>>>    For example, in mysql cdc connector, a binlog
>>>>>>>>>>>>>>> split may contain
>>>>>>>>>>>>>>> > > > >>>>    hundreds (or even more) snapshot split
>>>>>>>>>>>>>>> completion records. This
>>>>>>>>>>>>>>> > > > state is
>>>>>>>>>>>>>>> > > > >>>>    large and is currently transmitted
>>>>>>>>>>>>>>> incrementally through
>>>>>>>>>>>>>>> > multiple
>>>>>>>>>>>>>>> > > > >>>>    BinlogSplitMetaEvent messages. Since the
>>>>>>>>>>>>>>> binlog reader operates
>>>>>>>>>>>>>>> > > > >>>>    with single parallelism, reporting the full
>>>>>>>>>>>>>>> split state on
>>>>>>>>>>>>>>> > recovery
>>>>>>>>>>>>>>> > > > >>>>    could be inefficient or even infeasible.
>>>>>>>>>>>>>>> > > > >>>>    For such sources, it would be better to
>>>>>>>>>>>>>>> provide a mechanism to
>>>>>>>>>>>>>>> > skip
>>>>>>>>>>>>>>> > > > >>>>    split reporting during restart until they
>>>>>>>>>>>>>>> redesign and reduce
>>>>>>>>>>>>>>> > the
>>>>>>>>>>>>>>> > > > >>>>    split size.
>>>>>>>>>>>>>>> > > > >>>>    2.
>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>> > > > >>>>    Not all enumerators maintain unassigned splits
>>>>>>>>>>>>>>> in state.
>>>>>>>>>>>>>>> > > > >>>>    Some SplitEnumerator(such as kafka connector)
>>>>>>>>>>>>>>> implementations
>>>>>>>>>>>>>>> > do
>>>>>>>>>>>>>>> > > > >>>>    not track or persistently manage unassigned
>>>>>>>>>>>>>>> splits. Requiring
>>>>>>>>>>>>>>> > them
>>>>>>>>>>>>>>> > > > to
>>>>>>>>>>>>>>> > > > >>>>    handle re-registration would add unnecessary
>>>>>>>>>>>>>>> complexity. Even
>>>>>>>>>>>>>>> > > > though we
>>>>>>>>>>>>>>> > > > >>>>    maybe implements in kafka connector,
>>>>>>>>>>>>>>> currently, kafka connector
>>>>>>>>>>>>>>> > is
>>>>>>>>>>>>>>> > > > decouple
>>>>>>>>>>>>>>> > > > >>>>    with flink version, we also need to make sure
>>>>>>>>>>>>>>> the elder version
>>>>>>>>>>>>>>> > is
>>>>>>>>>>>>>>> > > > >>>>    compatible.
>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>> > > > >>>> ------------------------------
>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>> > > > >>>> To address these concerns, I propose introducing
>>>>>>>>>>>>>>> a new method:
>>>>>>>>>>>>>>> > boolean
>>>>>>>>>>>>>>> > > > >>>> SourceReader#shouldReassignSplitsOnRecovery()
>>>>>>>>>>>>>>> with a default
>>>>>>>>>>>>>>> > > > >>>> implementation returning false. This allows
>>>>>>>>>>>>>>> source readers to opt
>>>>>>>>>>>>>>> > in
>>>>>>>>>>>>>>> > > > >>>> to split reassignment only when necessary. Since
>>>>>>>>>>>>>>> the new contract
>>>>>>>>>>>>>>> > > > already
>>>>>>>>>>>>>>> > > > >>>> places the responsibility for split assignment on
>>>>>>>>>>>>>>> the enumerator,
>>>>>>>>>>>>>>> > not
>>>>>>>>>>>>>>> > > > >>>> reporting splits by default is a safe and clean
>>>>>>>>>>>>>>> default behavior.
>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>> > > > >>>> ------------------------------
>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>> > > > >>>> I’ve updated the implementation and the FIP
>>>>>>>>>>>>>>> accordingly[1]. It
>>>>>>>>>>>>>>> > quite a
>>>>>>>>>>>>>>> > > > >>>> big change. In particular, for the Kafka
>>>>>>>>>>>>>>> connector, we can now use
>>>>>>>>>>>>>>> > a
>>>>>>>>>>>>>>> > > > >>>> pluggable SplitPartitioner to support different
>>>>>>>>>>>>>>> split assignment
>>>>>>>>>>>>>>> > > > >>>> strategies (e.g., default, round-robin).
>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>> > > > >>>> Could you please review it when you have a chance?
>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>> > > > >>>> Best,
>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>> > > > >>>> Hongshun
>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>> > > > >>>> [1]
>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>> > > > >>>> On Sat, Aug 9, 2025 at 3:03 AM Becket Qin <
>>>>>>>>>>>>>>> [email protected]>
>>>>>>>>>>>>>>> > > > wrote:
>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>> > > > >>>>> Hi Hongshun,
>>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>>> > > > >>>>> I am not too concerned about the transmission
>>>>>>>>>>>>>>> cost. Because the
>>>>>>>>>>>>>>> > full
>>>>>>>>>>>>>>> > > > >>>>> split transmission has to happen in the initial
>>>>>>>>>>>>>>> assignment phase
>>>>>>>>>>>>>>> > > > already.
>>>>>>>>>>>>>>> > > > >>>>> And in the future, we probably want to also
>>>>>>>>>>>>>>> introduce some kind
>>>>>>>>>>>>>>> > of
>>>>>>>>>>>>>>> > > > workload
>>>>>>>>>>>>>>> > > > >>>>> balance across source readers, e.g. based on the
>>>>>>>>>>>>>>> per-split
>>>>>>>>>>>>>>> > > > throughput or
>>>>>>>>>>>>>>> > > > >>>>> the per-source-reader workload in heterogeneous
>>>>>>>>>>>>>>> clusters. For
>>>>>>>>>>>>>>> > these
>>>>>>>>>>>>>>> > > > >>>>> reasons, we do not expect the Split objects to
>>>>>>>>>>>>>>> be huge, and we
>>>>>>>>>>>>>>> > are
>>>>>>>>>>>>>>> > > > not
>>>>>>>>>>>>>>> > > > >>>>> trying to design for huge Split objects either
>>>>>>>>>>>>>>> as they will have
>>>>>>>>>>>>>>> > > > problems
>>>>>>>>>>>>>>> > > > >>>>> even today.
>>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>>> > > > >>>>> Good point on the potential split loss, please
>>>>>>>>>>>>>>> see the reply
>>>>>>>>>>>>>>> > below:
>>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>>> > > > >>>>> Scenario 2:
>>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>>> > > > >>>>>> 1. Reader A reports splits (1 and 2), and
>>>>>>>>>>>>>>> Reader B reports (3
>>>>>>>>>>>>>>> > and 4)
>>>>>>>>>>>>>>> > > > >>>>>> upon restart.
>>>>>>>>>>>>>>> > > > >>>>>> 2. Before the enumerator receives all reports
>>>>>>>>>>>>>>> and performs
>>>>>>>>>>>>>>> > > > >>>>>> reassignment, a checkpoint is triggered.
>>>>>>>>>>>>>>> > > > >>>>>> 3. Since no splits have been reassigned yet,
>>>>>>>>>>>>>>> both readers have
>>>>>>>>>>>>>>> > empty
>>>>>>>>>>>>>>> > > > >>>>>> states.
>>>>>>>>>>>>>>> > > > >>>>>> 4. When restarting from this checkpoint, all
>>>>>>>>>>>>>>> four splits are
>>>>>>>>>>>>>>> > lost.
>>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>>> > > > >>>>> The reader registration happens in the
>>>>>>>>>>>>>>> SourceOperator.open(),
>>>>>>>>>>>>>>> > which
>>>>>>>>>>>>>>> > > > >>>>> means the task is still in the initializing
>>>>>>>>>>>>>>> state, therefore the
>>>>>>>>>>>>>>> > > > checkpoint
>>>>>>>>>>>>>>> > > > >>>>> should not be triggered until the enumerator
>>>>>>>>>>>>>>> receives all the
>>>>>>>>>>>>>>> > split
>>>>>>>>>>>>>>> > > > reports.
>>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>>> > > > >>>>> There is a nuance here. Today, the RPC call from
>>>>>>>>>>>>>>> the TM to the JM
>>>>>>>>>>>>>>> > is
>>>>>>>>>>>>>>> > > > >>>>> async. So it is possible that the
>>>>>>>>>>>>>>> SourceOpertor.open() has
>>>>>>>>>>>>>>> > returned,
>>>>>>>>>>>>>>> > > > but
>>>>>>>>>>>>>>> > > > >>>>> the enumerator has not received the split
>>>>>>>>>>>>>>> reports. However,
>>>>>>>>>>>>>>> > because
>>>>>>>>>>>>>>> > > > the
>>>>>>>>>>>>>>> > > > >>>>> task status update RPC call goes to the same
>>>>>>>>>>>>>>> channel as the split
>>>>>>>>>>>>>>> > > > reports
>>>>>>>>>>>>>>> > > > >>>>> call, so the task status RPC call will happen
>>>>>>>>>>>>>>> after the split
>>>>>>>>>>>>>>> > > > reports call
>>>>>>>>>>>>>>> > > > >>>>> on the JM side. Therefore, on the JM side, the
>>>>>>>>>>>>>>> SourceCoordinator
>>>>>>>>>>>>>>> > will
>>>>>>>>>>>>>>> > > > >>>>> always first receive the split reports, then
>>>>>>>>>>>>>>> receive the
>>>>>>>>>>>>>>> > checkpoint
>>>>>>>>>>>>>>> > > > request.
>>>>>>>>>>>>>>> > > > >>>>> This "happen before" relationship is kind of
>>>>>>>>>>>>>>> important to
>>>>>>>>>>>>>>> > guarantee
>>>>>>>>>>>>>>> > > > >>>>> the consistent state between enumerator and
>>>>>>>>>>>>>>> readers.
>>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>>> > > > >>>>> Scenario 1:
>>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>>> > > > >>>>>> 1. Upon restart, Reader A reports assigned
>>>>>>>>>>>>>>> splits (1 and 2), and
>>>>>>>>>>>>>>> > > > >>>>>> Reader B reports (3 and 4).
>>>>>>>>>>>>>>> > > > >>>>>> 2. The enumerator receives these reports but
>>>>>>>>>>>>>>> only reassigns
>>>>>>>>>>>>>>> > splits 1
>>>>>>>>>>>>>>> > > > >>>>>> and 2 — not 3 and 4.
>>>>>>>>>>>>>>> > > > >>>>>> 3. A checkpoint or savepoint is then triggered.
>>>>>>>>>>>>>>> Only splits 1
>>>>>>>>>>>>>>> > and 2
>>>>>>>>>>>>>>> > > > >>>>>> are recorded in the reader states; splits 3 and
>>>>>>>>>>>>>>> 4 are not
>>>>>>>>>>>>>>> > persisted.
>>>>>>>>>>>>>>> > > > >>>>>> 4. If the job is later restarted from this
>>>>>>>>>>>>>>> checkpoint, splits 3
>>>>>>>>>>>>>>> > and
>>>>>>>>>>>>>>> > > > 4
>>>>>>>>>>>>>>> > > > >>>>>> will be permanently lost.
>>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>>> > > > >>>>> This scenario is possible. One solution is to
>>>>>>>>>>>>>>> let the enumerator
>>>>>>>>>>>>>>> > > > >>>>> implementation handle this. That means if the
>>>>>>>>>>>>>>> enumerator relies
>>>>>>>>>>>>>>> > on
>>>>>>>>>>>>>>> > > > the
>>>>>>>>>>>>>>> > > > >>>>> initial split reports from the source readers,
>>>>>>>>>>>>>>> it should maintain
>>>>>>>>>>>>>>> > > > these
>>>>>>>>>>>>>>> > > > >>>>> reports by itself. In the above example, the
>>>>>>>>>>>>>>> enumerator will need
>>>>>>>>>>>>>>> > to
>>>>>>>>>>>>>>> > > > >>>>> remember that 3 and 4 are not assigned and put
>>>>>>>>>>>>>>> it into its own
>>>>>>>>>>>>>>> > state.
>>>>>>>>>>>>>>> > > > >>>>> The current contract is that anything assigned
>>>>>>>>>>>>>>> to the
>>>>>>>>>>>>>>> > SourceReaders
>>>>>>>>>>>>>>> > > > >>>>> are completely owned by the SourceReaders.
>>>>>>>>>>>>>>> Enumerators can
>>>>>>>>>>>>>>> > remember
>>>>>>>>>>>>>>> > > > the
>>>>>>>>>>>>>>> > > > >>>>> assignments but cannot change them, even when
>>>>>>>>>>>>>>> the source reader
>>>>>>>>>>>>>>> > > > recovers /
>>>>>>>>>>>>>>> > > > >>>>> restarts.
>>>>>>>>>>>>>>> > > > >>>>> With this FLIP, the contract becomes that the
>>>>>>>>>>>>>>> source readers will
>>>>>>>>>>>>>>> > > > >>>>> return the ownership of the splits to the
>>>>>>>>>>>>>>> enumerator. So the
>>>>>>>>>>>>>>> > > > enumerator is
>>>>>>>>>>>>>>> > > > >>>>> responsible for maintaining these splits, until
>>>>>>>>>>>>>>> they are assigned
>>>>>>>>>>>>>>> > to
>>>>>>>>>>>>>>> > > > a
>>>>>>>>>>>>>>> > > > >>>>> source reader again.
>>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>>> > > > >>>>> There are other cases where there may be
>>>>>>>>>>>>>>> conflict information
>>>>>>>>>>>>>>> > between
>>>>>>>>>>>>>>> > > > >>>>> reader and enumerator. For example, consider the
>>>>>>>>>>>>>>> following
>>>>>>>>>>>>>>> > sequence:
>>>>>>>>>>>>>>> > > > >>>>> 1. reader A reports splits (1 and 2) up on
>>>>>>>>>>>>>>> restart.
>>>>>>>>>>>>>>> > > > >>>>> 2. enumerator receives the report and assigns
>>>>>>>>>>>>>>> both 1 and 2 to
>>>>>>>>>>>>>>> > reader
>>>>>>>>>>>>>>> > > > B.
>>>>>>>>>>>>>>> > > > >>>>> 3. reader A failed before checkpointing. And
>>>>>>>>>>>>>>> this is a partial
>>>>>>>>>>>>>>> > > > >>>>> failure, so only reader A restarts.
>>>>>>>>>>>>>>> > > > >>>>> 4. When reader A recovers, it will again report
>>>>>>>>>>>>>>> splits (1 and 2)
>>>>>>>>>>>>>>> > to
>>>>>>>>>>>>>>> > > > >>>>> the enumerator.
>>>>>>>>>>>>>>> > > > >>>>> 5. The enumerator should ignore this report
>>>>>>>>>>>>>>> because it has
>>>>>>>>>>>>>>> > > > >>>>> assigned splits (1 and 2) to reader B.
>>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>>> > > > >>>>> So with the new contract, the enumerator should
>>>>>>>>>>>>>>> be the source of
>>>>>>>>>>>>>>> > > > truth
>>>>>>>>>>>>>>> > > > >>>>> for split ownership.
>>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>>> > > > >>>>> Thanks,
>>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>>> > > > >>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>>> > > > >>>>> On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang <
>>>>>>>>>>>>>>> > > > [email protected]>
>>>>>>>>>>>>>>> > > > >>>>> wrote:
>>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>>> > > > >>>>>> Hi Becket,
>>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>>> > > > >>>>>> I did consider this approach at the beginning
>>>>>>>>>>>>>>> (and it was also
>>>>>>>>>>>>>>> > > > >>>>>> mentioned in this FLIP), since it would allow
>>>>>>>>>>>>>>> more flexibility
>>>>>>>>>>>>>>> > in
>>>>>>>>>>>>>>> > > > >>>>>> reassigning all splits. However, there are a
>>>>>>>>>>>>>>> few potential
>>>>>>>>>>>>>>> > issues.
>>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>>> > > > >>>>>> 1. High Transmission Cost
>>>>>>>>>>>>>>> > > > >>>>>> If we pass the full split objects (rather than
>>>>>>>>>>>>>>> just split IDs),
>>>>>>>>>>>>>>> > the
>>>>>>>>>>>>>>> > > > >>>>>> data size could be significant, leading to high
>>>>>>>>>>>>>>> overhead during
>>>>>>>>>>>>>>> > > > >>>>>> transmission — especially when many splits are
>>>>>>>>>>>>>>> involved.
>>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>>> > > > >>>>>> 2. Risk of Split Loss
>>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>>> > > > >>>>>> Risk of split loss exists unless we have a
>>>>>>>>>>>>>>> mechanism to make
>>>>>>>>>>>>>>> > sure
>>>>>>>>>>>>>>> > > > >>>>>> only can checkpoint after all the splits are
>>>>>>>>>>>>>>> reassigned.
>>>>>>>>>>>>>>> > > > >>>>>> There are scenarios where splits could be lost
>>>>>>>>>>>>>>> due to
>>>>>>>>>>>>>>> > inconsistent
>>>>>>>>>>>>>>> > > > >>>>>> state handling during recovery:
>>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>>> > > > >>>>>> Scenario 1:
>>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>>> > > > >>>>>>    1. Upon restart, Reader A reports assigned
>>>>>>>>>>>>>>> splits (1 and 2),
>>>>>>>>>>>>>>> > and
>>>>>>>>>>>>>>> > > > >>>>>>    Reader B reports (3 and 4).
>>>>>>>>>>>>>>> > > > >>>>>>    2. The enumerator receives these reports but
>>>>>>>>>>>>>>> only reassigns
>>>>>>>>>>>>>>> > > > >>>>>>    splits 1 and 2 — not 3 and 4.
>>>>>>>>>>>>>>> > > > >>>>>>    3. A checkpoint or savepoint is then
>>>>>>>>>>>>>>> triggered. Only splits 1
>>>>>>>>>>>>>>> > and
>>>>>>>>>>>>>>> > > > >>>>>>    2 are recorded in the reader states; splits
>>>>>>>>>>>>>>> 3 and 4 are not
>>>>>>>>>>>>>>> > > > persisted.
>>>>>>>>>>>>>>> > > > >>>>>>    4. If the job is later restarted from this
>>>>>>>>>>>>>>> checkpoint, splits
>>>>>>>>>>>>>>> > 3
>>>>>>>>>>>>>>> > > > >>>>>>    and 4 will be permanently lost.
>>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>>> > > > >>>>>> Scenario 2:
>>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>>> > > > >>>>>>    1. Reader A reports splits (1 and 2), and
>>>>>>>>>>>>>>> Reader B reports (3
>>>>>>>>>>>>>>> > and
>>>>>>>>>>>>>>> > > > >>>>>>    4) upon restart.
>>>>>>>>>>>>>>> > > > >>>>>>    2. Before the enumerator receives all
>>>>>>>>>>>>>>> reports and performs
>>>>>>>>>>>>>>> > > > >>>>>>    reassignment, a checkpoint is triggered.
>>>>>>>>>>>>>>> > > > >>>>>>    3. Since no splits have been reassigned yet,
>>>>>>>>>>>>>>> both readers
>>>>>>>>>>>>>>> > have
>>>>>>>>>>>>>>> > > > >>>>>>    empty states.
>>>>>>>>>>>>>>> > > > >>>>>>    4. When restarting from this checkpoint, all
>>>>>>>>>>>>>>> four splits are
>>>>>>>>>>>>>>> > > > lost.
>>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>>> > > > >>>>>> Let me know if you have thoughts on how we
>>>>>>>>>>>>>>> might mitigate these
>>>>>>>>>>>>>>> > > > risks!
>>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>>> > > > >>>>>> Best
>>>>>>>>>>>>>>> > > > >>>>>> Hongshun
>>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>>> > > > >>>>>> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin <
>>>>>>>>>>>>>>> [email protected]>
>>>>>>>>>>>>>>> > > > >>>>>> wrote:
>>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>>> > > > >>>>>>> Hi Hongshun,
>>>>>>>>>>>>>>> > > > >>>>>>>
>>>>>>>>>>>>>>> > > > >>>>>>> The steps sound reasonable to me in general.
>>>>>>>>>>>>>>> In terms of the
>>>>>>>>>>>>>>> > > > updated
>>>>>>>>>>>>>>> > > > >>>>>>> FLIP wiki, it would be good to see if we can
>>>>>>>>>>>>>>> keep the protocol
>>>>>>>>>>>>>>> > > > simple. One
>>>>>>>>>>>>>>> > > > >>>>>>> alternative way to achieve this behavior is
>>>>>>>>>>>>>>> following:
>>>>>>>>>>>>>>> > > > >>>>>>>
>>>>>>>>>>>>>>> > > > >>>>>>> 1. Upon SourceOperator startup, the
>>>>>>>>>>>>>>> SourceOperator sends
>>>>>>>>>>>>>>> > > > >>>>>>> ReaderRegistrationEvent with the currently
>>>>>>>>>>>>>>> assigned splits to
>>>>>>>>>>>>>>> > the
>>>>>>>>>>>>>>> > > > >>>>>>> enumerator. It does not add these splits to
>>>>>>>>>>>>>>> the SourceReader.
>>>>>>>>>>>>>>> > > > >>>>>>> 2. The enumerator will always use the
>>>>>>>>>>>>>>> > > > >>>>>>> SourceEnumeratorContext.assignSplits() to
>>>>>>>>>>>>>>> assign the splits.
>>>>>>>>>>>>>>> > (not
>>>>>>>>>>>>>>> > > > via the
>>>>>>>>>>>>>>> > > > >>>>>>> response of the SourceRegistrationEvent, this
>>>>>>>>>>>>>>> allows async
>>>>>>>>>>>>>>> > split
>>>>>>>>>>>>>>> > > > assignment
>>>>>>>>>>>>>>> > > > >>>>>>> in case the enumerator wants to wait until all
>>>>>>>>>>>>>>> the readers are
>>>>>>>>>>>>>>> > > > registered)
>>>>>>>>>>>>>>> > > > >>>>>>> 3. The SourceOperator will only call
>>>>>>>>>>>>>>> SourceReader.addSplits()
>>>>>>>>>>>>>>> > when
>>>>>>>>>>>>>>> > > > >>>>>>> it receives the AddSplitEvent from the
>>>>>>>>>>>>>>> enumerator.
>>>>>>>>>>>>>>> > > > >>>>>>>
>>>>>>>>>>>>>>> > > > >>>>>>> This protocol has a few benefits:
>>>>>>>>>>>>>>> > > > >>>>>>> 1. it basically allows arbitrary split
>>>>>>>>>>>>>>> reassignment upon
>>>>>>>>>>>>>>> > restart
>>>>>>>>>>>>>>> > > > >>>>>>> 2. simplicity: there is only one way to assign
>>>>>>>>>>>>>>> splits.
>>>>>>>>>>>>>>> > > > >>>>>>>
>>>>>>>>>>>>>>> > > > >>>>>>> So we only need one interface change:
>>>>>>>>>>>>>>> > > > >>>>>>> - add the initially assigned splits to
>>>>>>>>>>>>>>> ReaderInfo so the
>>>>>>>>>>>>>>> > Enumerator
>>>>>>>>>>>>>>> > > > >>>>>>> can access it.
>>>>>>>>>>>>>>> > > > >>>>>>> and one behavior change:
>>>>>>>>>>>>>>> > > > >>>>>>> - The SourceOperator should stop assigning
>>>>>>>>>>>>>>> splits to the from
>>>>>>>>>>>>>>> > state
>>>>>>>>>>>>>>> > > > >>>>>>> restoration, but
>>>>>>>>>>>>>>> > [message truncated...]
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>

Reply via email to