Hi Becket and Leonard,

It seems adding  `splitsOnRecovery` to `ReaderInfo` makes the split
enumerator simpler and cleaner.

I have modified this FLIP again. Please have a look and let me know what
you think.

Best,
Hongshun

On Mon, Oct 13, 2025 at 10:48 AM Hongshun Wang <[email protected]>
wrote:

> Hi Becket,
> Thanks for your explanation.
>
> > For the same three input above, the assignment should be consistently
> the same.
>
> That is exactly what troubles me. For *assignment algorithms such as
> hash, it does behave the same. What If we use round-robin? Each *the *reader
> information, the same split will be assigned to different readers. There is
> also what I used to list as an example.*
>
>    1. *Initial state:*: 2 parallelism, 2 splits.
>    2. *Enumerator action:*  Split 1 → Task 1, Split 2 → Task 2 ,  ,
>    3. *Failure scenario: *After Split 2 is assigned to Task 2 but before
>    next checkpoint success, task 1 restarts.
>    4. *Recovery issue:* Split 2 is re-added to the enumerator.
>    Round-robin strategy assigns Split 2 to Task 1. Then Task 1 now has 2
>    splits, Task 2 has 0 → Imbalanced distribution.
>
>
> > Please let me know if you think a meeting would be more efficient.
> Yes, I’d like to reach an agreement as soon as possible. If you’re
> available, we could schedule a meeting with Lenenord as well.
>
> Best,
> Hongshun
>
> On Sat, Oct 11, 2025 at 3:59 PM Becket Qin <[email protected]> wrote:
>
>> Hi Hongshun,
>>
>> I am confused. First of all, regardless of what the assignment algorithm
>> is. Using SplitEnumeratorContext to return the splits only gives more
>> information than using addSplitsBack(). So there should be no regression.
>>
>> Secondly, at this point. The SplitEnumerator should only take the
>> following three input to generate the global splits assignment:
>> 1. the *reader information (num readers, locations, etc)*
>> 2. *all the splits to assign*
>> 3. *configured assignment algorithm *
>> Preferably, for the same three input above, the assignment should be
>> consistently the same. I don't see why it should care about why a new
>> reader is added, whether due to partial failover or global failover or job
>> restart.
>>
>> If you want to do global redistribution on global failover and restart,
>> but honor the existing assignment for partial failover. The enumerator will
>> just do the following:
>> 1. Generate a new global assignment (global redistribution) in start()
>> because start() will only be invoked in global failover or restart. That
>> means all the readers are also new with empty assignment.
>> 2. After the global assignment is generated, it should be honored for the
>> whole life cycle. there might be many reader registrations, again for
>> different reasons but does not matter:
>>     - reader registration after this job restart
>>     - reader registration after this global failover
>>     - reader registration due to partial failover which may or may not
>> have a addSplitsBack() call.
>>     Regardless of the reason, the split enumerator will just enforce the
>> global assignment it has already generated, i.e. without split
>> redistribution.
>>
>> Wouldn't that give the behavior you want? I feel the discussion somehow
>> goes to circles. Please let me know if you think a meeting would be more
>> efficient.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Fri, Oct 10, 2025 at 7:58 PM Hongshun Wang <[email protected]>
>> wrote:
>>
>>> Hi Becket,
>>>
>>> > Ignore a returned split if it has been assigned to a different reader,
>>> otherwise put it back to unassigned splits / pending splits. Then the
>>> enumerator assigns new splits to the newly added reader, which may use the
>>> previous assignment as a reference. This should work regardless of whether
>>> it is a global failover, partial failover, restart, etc. There is no need
>>> for the SplitEnumerator to distinguish what failover scenario it is.
>>>
>>> In this case, it seems that global failover and partial failover share
>>> the same distribution strategy If it has not been assigned to a different
>>> reader. However, global failover needs to be redistributed(this is why we
>>> need this FLIP) , while partial failover is not. I have no idea how we
>>> distinguish them.
>>>
>>> What do you think?
>>>
>>> Best,
>>> Hongshun
>>>
>>> On Sat, Oct 11, 2025 at 12:54 AM Becket Qin <[email protected]>
>>> wrote:
>>>
>>>> Hi Hongshun,
>>>>
>>>> The problem we are trying to solve here is to give the splits back to
>>>> the SplitEnumerator. There are only two types of splits to give back:
>>>> 1) splits whose assignment has been checkpointed. - In this case, we
>>>> rely on addReader() + SplitEnumeratorContext to give the splits back, this
>>>> provides more information associated with those splits.
>>>> 2) splits whose assignment has not been checkpointed. -  In this case,
>>>> we use addSplitsBack(), there is no reader info to give because the
>>>> previous assignment did not take effect to begin with.
>>>>
>>>> From the SplitEnumerator implementation perspective, the contract is
>>>> straightforward.
>>>> 1. The SplitEnumerator is the source of truth for assignment.
>>>> 2. When the enumerator receives the addSplits() call, it always add
>>>> these splits back to unassigned splits / pending splits.
>>>> 3. When the enumerator receives the addReader() call, that means the
>>>> reader has no current assignment, and has returned its previous assignment
>>>> based on the reader side info. The SplitEnumerator checks the
>>>> SplitEnumeratorContext to retrieve the returned splits from that reader
>>>> (i.e. previous assignment) and handle them according to its own source of
>>>> truth knowledge of assignment - Ignore a returned split if it has been
>>>> assigned to a different reader, otherwise put it back to unassigned splits
>>>> / pending splits. Then the enumerator assigns new splits to the newly added
>>>> reader, which may use the previous assignment as a reference. This should
>>>> work regardless of whether it is a global failover, partial failover,
>>>> restart, etc. There is no need for the SplitEnumerator to distinguish what
>>>> failover scenario it is.
>>>>
>>>> Would this work?
>>>>
>>>> Thanks,
>>>>
>>>> Jiangjie (Becket) Qin
>>>>
>>>> On Fri, Oct 10, 2025 at 1:28 AM Hongshun Wang <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Becket,
>>>>>  > why do we need to change the behavior of addSplitsBack()? Should it
>>>>> remain the same?
>>>>>
>>>>> How does the enumerator get the splits from ReaderRegistrationEvent
>>>>> and then reassign it?
>>>>>
>>>>> You have given a advice before:
>>>>> > 1. Put all the reader information in the SplitEnumerator context.
>>>>> 2. notify the enumerator about the new reader registration. 3. let the
>>>>> split enumerator get whatever information it wants from the context and do
>>>>> its job.
>>>>>
>>>>> However, each time a source task fails over, the
>>>>> ConcurrentMap<Integer, ConcurrentMap<Integer, ReaderInfo>>
>>>>> registeredReaders will remove this reader infos. When the source task is
>>>>> registered again, it will be added again. *Thus, registeredReaders
>>>>> cannot know whether is registered before. *
>>>>>
>>>>> Therefore, registeredReaders enumerator#addReader does not distinguish
>>>>> the following situations:
>>>>> However, each time one source task is failover. The
>>>>> `ConcurrentMap<Integer, ConcurrentMap<Integer, ReaderInfo>>
>>>>> registeredReaders` will remove this source. When source Task is registered
>>>>> again, enumerator#addReader not distinguished three situations:
>>>>> 1. The Reader is registered when the global restart. In this case,
>>>>> redistribution the split from the infos. (take off all the splits from
>>>>> ReaderInfo).
>>>>> 2. The Reader is registered when a partial failover(before the first
>>>>> successful checkpoint). In this case,  ignore the split from the infos.
>>>>> (leave alone all the splits from ReaderInfo).
>>>>> 3. The Reader is registered when a partial failover(after the first
>>>>> successful checkpoint).In this case, we need assign the split to same
>>>>> reader again. (take off all the splits from ReaderInfo but assigned to it
>>>>> again).
>>>>> we still need the enumerator to distinguish them (using
>>>>> pendingSplitAssignment & assignedSplitAssignment. However, it is redundant
>>>>> to maintain split assigned information both in the enumerator and the
>>>>> enumerator context.
>>>>>
>>>>> I think if we change the behavior of addSplitsBack, it will be more
>>>>> simple. Just let the enumerator to handle these split based on 
>>>>> pendingSplitAssignment
>>>>> & assignedSplitments.
>>>>>
>>>>> What do you think?
>>>>>
>>>>> Best,
>>>>> Hongshun
>>>>>
>>>>> On Fri, Oct 10, 2025 at 12:55 PM Becket Qin <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Hongshun,
>>>>>>
>>>>>> Thanks for updating the FLIP. A quick question: why do we need to
>>>>>> change the behavior of addSplitsBack()? Should it remain the same?
>>>>>>
>>>>>> Regarding the case of restart with changed subscription. I think the
>>>>>> only correct behavior is removing obsolete splits without any warning /
>>>>>> exception. It is OK to add an info level logging if we want to. It is a
>>>>>> clear intention if the user has explicitly changed subscription and
>>>>>> restarted the job. There is no need to add a config to double confirm.
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Jiangjie (Becket) Qin
>>>>>>
>>>>>> On Thu, Oct 9, 2025 at 7:28 PM Hongshun Wang <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Leonard,
>>>>>>>
>>>>>>> If the SplitEnumerator received all splits after a restart, it
>>>>>>> becomes straightforward to clear and un-assign the unmatched
>>>>>>> splits(checking whether matches the source options). However, a key
>>>>>>> question arises: *should  automatically discard obsolete splits, or
>>>>>>> explicitly notify the user via an exception?*
>>>>>>>
>>>>>>> We provided a option `scan.partition-unsubscribe.strategy`:
>>>>>>> 1. If Strict, throws an exception when encountering removed splits.
>>>>>>> 2. If Lenient, automatically removes obsolete splits silently.
>>>>>>>
>>>>>>> What Do you think?
>>>>>>>
>>>>>>> Best,
>>>>>>> Hongshun
>>>>>>>
>>>>>>> On Thu, Oct 9, 2025 at 9:37 PM Leonard Xu <[email protected]> wrote:
>>>>>>>
>>>>>>>> Thanks hongshun for the updating and pretty detailed analysis for
>>>>>>>> edge cases,  the updated FLIP looks good to me now.
>>>>>>>>
>>>>>>>> Only last implementation details about scenario in motivation
>>>>>>>> section:
>>>>>>>>
>>>>>>>> *Restart with Changed subscription: During restart, if source
>>>>>>>> options remove a topic or table. The splits which have already 
>>>>>>>> assigned can
>>>>>>>> not be removed.*
>>>>>>>>
>>>>>>>> Could you clarify how we resolve this in Kafka connector ?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Leonard
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 2025 10月 9 19:48,Hongshun Wang <[email protected]> 写道:
>>>>>>>>
>>>>>>>> Hi devs,
>>>>>>>> If there are no further suggestions, I will start the voting
>>>>>>>> tomorrow。
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Hongshun
>>>>>>>>
>>>>>>>> On Fri, Sep 26, 2025 at 7:48 PM Hongshun Wang <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Hi Becket and Leonard,
>>>>>>>>>
>>>>>>>>> I have updated the content of this FLIP. The key point is that:
>>>>>>>>>
>>>>>>>>> When the split enumerator receives a split, *these splits must
>>>>>>>>> have already existed in pendingSplitAssignment or assignedSplitments*
>>>>>>>>> .
>>>>>>>>>
>>>>>>>>>    - If the split is in pendingSplitAssignments, ignore it.
>>>>>>>>>    - If the split is in assignedSplitAssignments but has a
>>>>>>>>>    different taskId, ignore it (this indicates it was already
>>>>>>>>>    assigned to another task).
>>>>>>>>>    - If the split is in assignedSplitAssignments and shares the
>>>>>>>>>    same taskId, move the assignment from assignedSplitments to 
>>>>>>>>> pendingSplitAssignment
>>>>>>>>>    to re-assign again.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> For better understanding why use these strategies. I added some
>>>>>>>>> examples and pictures to show it.
>>>>>>>>>
>>>>>>>>> Would you like to help me check whether there are still some
>>>>>>>>> problems?
>>>>>>>>>
>>>>>>>>> Best
>>>>>>>>> Hongshun
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Sep 26, 2025 at 5:08 PM Leonard Xu <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks Becket and Hongshun for the insightful discussion.
>>>>>>>>>>
>>>>>>>>>> The underlying implementation and communication mechanisms of
>>>>>>>>>> Flink Source indeed involve many intricate details, we discussed the 
>>>>>>>>>> issue
>>>>>>>>>> of splits re-assignment in specific scenarios, but fortunately, the 
>>>>>>>>>> final
>>>>>>>>>> decision turned out to be pretty clear.
>>>>>>>>>>
>>>>>>>>>>  +1 to Becket’s proposal to keeps the framework cleaner and more
>>>>>>>>>> flexible.
>>>>>>>>>> +1 to Hongshun’s point to provide comprehensive guidance for
>>>>>>>>>> connector developers.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Leonard
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 2025 9月 26 16:30,Hongshun Wang <[email protected]> 写道:
>>>>>>>>>>
>>>>>>>>>> Hi Becket,
>>>>>>>>>>
>>>>>>>>>> I Got it. You’re suggesting we should not handle this in the
>>>>>>>>>> source framework but instead let the split enumerator manage these 
>>>>>>>>>> three
>>>>>>>>>> scenarios.
>>>>>>>>>>
>>>>>>>>>> Let me explain why I originally favored handling it in the
>>>>>>>>>> framework: I'm concerned that connector developers might overlook 
>>>>>>>>>> certain
>>>>>>>>>> edge cases (after all, we even payed extensive discussions to fully 
>>>>>>>>>> clarify
>>>>>>>>>> the logic)
>>>>>>>>>>
>>>>>>>>>> However, your point keeps the framework cleaner and more
>>>>>>>>>> flexible. Thus, I will take it.
>>>>>>>>>>
>>>>>>>>>> Perhaps, in this FLIP, we should focus on providing
>>>>>>>>>> comprehensive guidance for connector developers: explain how to
>>>>>>>>>> implement a split enumerator, including the underlying challenges 
>>>>>>>>>> and their
>>>>>>>>>> solutions.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Additionally, we can use the Kafka connector as a reference
>>>>>>>>>> implementation to demonstrate the practical steps. This way, 
>>>>>>>>>> developers who
>>>>>>>>>> want to implement similar connectors can directly reference this 
>>>>>>>>>> example.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Hongshun
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Sep 26, 2025 at 1:27 PM Becket Qin <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> It would be good to not expose runtime details to the source
>>>>>>>>>>> implementation if possible.
>>>>>>>>>>>
>>>>>>>>>>> Today, the split enumerator implementations are expected to
>>>>>>>>>>> track the split assignment.
>>>>>>>>>>>
>>>>>>>>>>> Assuming the split enumerator implementation keeps a split
>>>>>>>>>>> assignment map, that means the enumerator should already know 
>>>>>>>>>>> whether a
>>>>>>>>>>> split is assigned or unassigned. So it can handle the three 
>>>>>>>>>>> scenarios you
>>>>>>>>>>> mentioned.
>>>>>>>>>>>
>>>>>>>>>>> The split is reported by a reader during a global restoration.
>>>>>>>>>>>>
>>>>>>>>>>> The split enumerator should have just been restored / created.
>>>>>>>>>>> If the enumerator expects a full reassignment of splits up on global
>>>>>>>>>>> recovery, there should be no assigned splits to that reader in the 
>>>>>>>>>>> split
>>>>>>>>>>> assignment mapping.
>>>>>>>>>>>
>>>>>>>>>>> The split is reported by a reader during a partial failure
>>>>>>>>>>>> recovery.
>>>>>>>>>>>>
>>>>>>>>>>> In this case, when SplitEnumerator.addReader() is invoked, the
>>>>>>>>>>> split assignment map in the enumerator implementation should 
>>>>>>>>>>> already have
>>>>>>>>>>> some split assignments for the reader. Therefore it is a partial 
>>>>>>>>>>> failover.
>>>>>>>>>>> If the source supports split reassignment on recovery, the 
>>>>>>>>>>> enumerator can
>>>>>>>>>>> assign splits that are different from the reported assignment of 
>>>>>>>>>>> that
>>>>>>>>>>> reader in the SplitEnumeratorContext, or it can also assign the same
>>>>>>>>>>> splits. In any case, the enumerator knows that this is a partial 
>>>>>>>>>>> recovery
>>>>>>>>>>> because the assignment map is non-empty.
>>>>>>>>>>>
>>>>>>>>>>> The split is not reported by a reader, but is assigned after the
>>>>>>>>>>>> last successful checkpoint and was never acknowledged.
>>>>>>>>>>>
>>>>>>>>>>> This is actually one of the step in the partial failure recover.
>>>>>>>>>>> SplitEnumerator.addSplitsBack() will be called first before
>>>>>>>>>>> SplitReader.addReader() is called for the recovered reader. When the
>>>>>>>>>>> SplitEnumerator.addSplitsBack() is invoked, it is for sure a partial
>>>>>>>>>>> recovery. And the enumerator should remove these splits from the 
>>>>>>>>>>> split
>>>>>>>>>>> assignment map as if they were never assigned.
>>>>>>>>>>>
>>>>>>>>>>> I think this should work, right?
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>>
>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Sep 25, 2025 at 8:34 PM Hongshun Wang <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Becket and Leonard,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for your advice.
>>>>>>>>>>>>
>>>>>>>>>>>> > put all the reader information in the SplitEnumerator context
>>>>>>>>>>>> I have a concern: the current registeredReaders in*
>>>>>>>>>>>> SourceCoordinatorContext will be removed after subtaskResetor 
>>>>>>>>>>>> execution on
>>>>>>>>>>>> failure*.However, this approach has merit.
>>>>>>>>>>>>
>>>>>>>>>>>> One more situation I found my previous design does not cover:
>>>>>>>>>>>>
>>>>>>>>>>>>    1. Initial state: Reader A reports splits (1, 2).
>>>>>>>>>>>>    2. Enumerator action: Assigns split 1 to Reader A, and
>>>>>>>>>>>>    split 2 to Reader B.
>>>>>>>>>>>>    3. Failure scenario: Reader A fails before checkpointing.
>>>>>>>>>>>>    Since this is a partial failure, only Reader A restarts.
>>>>>>>>>>>>    4. Recovery issue: Upon recovery, Reader A re-reports split
>>>>>>>>>>>>    (1).
>>>>>>>>>>>>
>>>>>>>>>>>> In my previous design, the enumerator will ignore Reader A's
>>>>>>>>>>>> re-registration which will cause data loss.
>>>>>>>>>>>>
>>>>>>>>>>>> Thus, when the enumerator receives a split, the split may
>>>>>>>>>>>> originate from three scenarios:
>>>>>>>>>>>>
>>>>>>>>>>>>    1. The split is reported by a reader during a global
>>>>>>>>>>>>    restoration.
>>>>>>>>>>>>    2. The split is reported by a reader during a partial
>>>>>>>>>>>>    failure recovery.
>>>>>>>>>>>>    3. The split is not reported by a reader, but is assigned
>>>>>>>>>>>>    after the last successful checkpoint and was never acknowledged.
>>>>>>>>>>>>
>>>>>>>>>>>> In the first scenario (global restore), the split should
>>>>>>>>>>>> be re-distributed. For the latter two scenarios (partial failover 
>>>>>>>>>>>> and
>>>>>>>>>>>> post-checkpoint assignment), we need to reassign the split to
>>>>>>>>>>>> its originally assigned subtask.
>>>>>>>>>>>>
>>>>>>>>>>>> By implementing a method in the SplitEnumerator context to
>>>>>>>>>>>> track each assigned split's status, the system can correctly 
>>>>>>>>>>>> identify and
>>>>>>>>>>>> resolve split ownership in all three scenarios.*What about
>>>>>>>>>>>> adding a  `SplitRecoveryType splitRecoveryType(Split split)` in
>>>>>>>>>>>> SplitEnumeratorContext.* SplitRecoveryTypeis a enum including
>>>>>>>>>>>> `UNASSIGNED`、`GLOBAL_RESTORE`、`PARTIAL_FAILOVER` and
>>>>>>>>>>>> `POST_CHECKPOINT_ASSIGNMENT`.
>>>>>>>>>>>>
>>>>>>>>>>>> What do you think? Are there any details or scenarios I haven't
>>>>>>>>>>>> considered? Looking forward to your advice.
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Hongshun
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Sep 11, 2025 at 12:41 AM Becket Qin <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for the explanation, Hongshun.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Current pattern of handling new reader registration following:
>>>>>>>>>>>>> 1. put all the reader information in the SplitEnumerator
>>>>>>>>>>>>> context
>>>>>>>>>>>>> 2. notify the enumerator about the new reader registration.
>>>>>>>>>>>>> 3. Let the split enumerator get whatever information it wants
>>>>>>>>>>>>> from the
>>>>>>>>>>>>> context and do its job.
>>>>>>>>>>>>> This pattern decouples the information passing and the reader
>>>>>>>>>>>>> registration
>>>>>>>>>>>>> notification. This makes the API extensible - we can add more
>>>>>>>>>>>>> information
>>>>>>>>>>>>> (e.g. reported assigned splits in our case) about the reader
>>>>>>>>>>>>> to the context
>>>>>>>>>>>>> without introducing new methods.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Introducing a new method of addSplitBackOnRecovery() is
>>>>>>>>>>>>> redundant to the
>>>>>>>>>>>>> above pattern. Do we really need it?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Sep 8, 2025 at 8:18 PM Hongshun Wang <
>>>>>>>>>>>>> [email protected]>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> > Hi Becket,
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > > I am curious what would the enumerator do differently for
>>>>>>>>>>>>> the splits
>>>>>>>>>>>>> > added via addSplitsBackOnRecovery() V.S. addSplitsBack()?
>>>>>>>>>>>>> >
>>>>>>>>>>>>> >  In this FLIP, there are two distinct scenarios in which the
>>>>>>>>>>>>> enumerator
>>>>>>>>>>>>> > receives splits being added back:
>>>>>>>>>>>>> > 1.  Job-level restore: The job is restored,  splits from
>>>>>>>>>>>>> reader’s state are
>>>>>>>>>>>>> > reported by ReaderRegistrationEvent.
>>>>>>>>>>>>> > 2.  Reader-level restart: a reader is started but not the
>>>>>>>>>>>>> whole  job,
>>>>>>>>>>>>> >  splits assigned to it after the last successful checkpoint.
>>>>>>>>>>>>> This is what
>>>>>>>>>>>>> > addSplitsBack used to do.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > In these two situations, the enumerator will choose
>>>>>>>>>>>>> different strategies.
>>>>>>>>>>>>> > 1. Job-level restore: the splits should be redistributed
>>>>>>>>>>>>> across readers
>>>>>>>>>>>>> > according to the current partitioner strategy.
>>>>>>>>>>>>> > 2. Reader-level restart: the splits should be reassigned
>>>>>>>>>>>>> directly back to
>>>>>>>>>>>>> > the same reader they were originally assigned to, preserving
>>>>>>>>>>>>> locality and
>>>>>>>>>>>>> > avoiding unnecessary redistribution
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Therefore, the enumerator must clearly distinguish between
>>>>>>>>>>>>> these two
>>>>>>>>>>>>> > scenarios.I used to deprecate the former
>>>>>>>>>>>>> addSplitsBack(List<SplitT>
>>>>>>>>>>>>> > splits, int subtaskId) but add a new
>>>>>>>>>>>>> addSplitsBack(List<SplitT>
>>>>>>>>>>>>> > splits, int subtaskId,
>>>>>>>>>>>>> > boolean reportedByReader).
>>>>>>>>>>>>> > Leonard suggest to use another method
>>>>>>>>>>>>> addSplitsBackOnRecovery but not
>>>>>>>>>>>>> > influenced  currently addSplitsBack.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Best
>>>>>>>>>>>>> > Hongshun
>>>>>>>>>>>>> >
>>>>>>>>>>>>> >
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > On 2025/09/08 17:20:31 Becket Qin wrote:
>>>>>>>>>>>>> > > Hi Leonard,
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > > > Could we introduce a new method like
>>>>>>>>>>>>> addSplitsBackOnRecovery  with
>>>>>>>>>>>>> > default
>>>>>>>>>>>>> > > > implementation. In this way, we can provide better
>>>>>>>>>>>>> backward
>>>>>>>>>>>>> > compatibility
>>>>>>>>>>>>> > > > and also makes it easier for developers to understand.
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > > I am curious what would the enumerator do differently for
>>>>>>>>>>>>> the splits
>>>>>>>>>>>>> > added
>>>>>>>>>>>>> > > via addSplitsBackOnRecovery() V.S. addSplitsBack()?  Today,
>>>>>>>>>>>>> > addSplitsBack()
>>>>>>>>>>>>> > > is also only called upon recovery. So the new method seems
>>>>>>>>>>>>> confusing. One
>>>>>>>>>>>>> > > thing worth clarifying is if the Source implements
>>>>>>>>>>>>> > > SupportSplitReassignmentOnRecovery, upon recovery, should
>>>>>>>>>>>>> the splits
>>>>>>>>>>>>> > > reported by the readers also be added back to the
>>>>>>>>>>>>> SplitEnumerator via the
>>>>>>>>>>>>> > > addSplitsBack() call? Or should the SplitEnumerator
>>>>>>>>>>>>> explicitly query the
>>>>>>>>>>>>> > > registered reader information via the
>>>>>>>>>>>>> SplitEnumeratorContext to get the
>>>>>>>>>>>>> > > originally assigned splits when addReader() is invoked? I
>>>>>>>>>>>>> was assuming
>>>>>>>>>>>>> > the
>>>>>>>>>>>>> > > latter in the beginning, so the behavior of
>>>>>>>>>>>>> addSplitsBack() remains
>>>>>>>>>>>>> > > unchanged, but I am not opposed in doing the former.
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > > Also, can you elaborate on the backwards compatibility
>>>>>>>>>>>>> issue you see if
>>>>>>>>>>>>> > we
>>>>>>>>>>>>> > > do not have a separate addSplitsBackOnRecovery() method?
>>>>>>>>>>>>> Even without
>>>>>>>>>>>>> > this
>>>>>>>>>>>>> > > new method, the behavior remains exactly the same unless
>>>>>>>>>>>>> the end users
>>>>>>>>>>>>> > > implement the mix-in interface of
>>>>>>>>>>>>> "SupportSplitReassignmentOnRecovery",
>>>>>>>>>>>>> > > right?
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > > Thanks,
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > > Jiangjie (Becket) Qin
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > > On Mon, Sep 8, 2025 at 1:48 AM Hongshun Wang <
>>>>>>>>>>>>> [email protected]>
>>>>>>>>>>>>> > > wrote:
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > > > Hi devs,
>>>>>>>>>>>>> > > >
>>>>>>>>>>>>> > > > It has been quite some time since this FLIP[1] was first
>>>>>>>>>>>>> proposed.
>>>>>>>>>>>>> > Thank
>>>>>>>>>>>>> > > > you for your valuable feedback—based on your
>>>>>>>>>>>>> suggestions, the FLIP has
>>>>>>>>>>>>> > > > undergone several rounds of revisions.
>>>>>>>>>>>>> > > >
>>>>>>>>>>>>> > > > Any more advice is welcome and appreciated. If there are
>>>>>>>>>>>>> no further
>>>>>>>>>>>>> > > > concerns, I plan to start the vote tomorrow.
>>>>>>>>>>>>> > > >
>>>>>>>>>>>>> > > > Best
>>>>>>>>>>>>> > > > Hongshun
>>>>>>>>>>>>> > > >
>>>>>>>>>>>>> > > > [1]
>>>>>>>>>>>>> > > >
>>>>>>>>>>>>> >
>>>>>>>>>>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=373886480
>>>>>>>>>>>>> > > >
>>>>>>>>>>>>> > > > On Mon, Sep 8, 2025 at 4:42 PM Hongshun Wang <
>>>>>>>>>>>>> [email protected]>
>>>>>>>>>>>>> > > > wrote:
>>>>>>>>>>>>> > > >
>>>>>>>>>>>>> > > > > Hi Leonard,
>>>>>>>>>>>>> > > > > Thanks for your advice.  It makes sense and I have
>>>>>>>>>>>>> modified it.
>>>>>>>>>>>>> > > > >
>>>>>>>>>>>>> > > > > Best,
>>>>>>>>>>>>> > > > > Hongshun
>>>>>>>>>>>>> > > > >
>>>>>>>>>>>>> > > > > On Mon, Sep 8, 2025 at 11:40 AM Leonard Xu <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>> > > > >
>>>>>>>>>>>>> > > > >> Thanks Hongshun and Becket for the deep discussion.
>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>> > > > >> I only have one comment for one API design:
>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>> > > > >> > Deprecate the old addSplitsBack  method, use a
>>>>>>>>>>>>> addSplitsBack with
>>>>>>>>>>>>> > > > >> param isReportedByReader instead. Because, The
>>>>>>>>>>>>> enumerator can apply
>>>>>>>>>>>>> > > > >> different reassignment policies based on the context.
>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>> > > > >> Could we introduce a new method like
>>>>>>>>>>>>> *addSplitsBackOnRecovery*  with
>>>>>>>>>>>>> > > > default
>>>>>>>>>>>>> > > > >> implementation. In this way, we can provide better
>>>>>>>>>>>>> backward
>>>>>>>>>>>>> > > > >> compatibility and also makes it easier for developers
>>>>>>>>>>>>> to understand.
>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>> > > > >> Best,
>>>>>>>>>>>>> > > > >> Leonard
>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>> > > > >> 2025 9月 3 20:26,Hongshun Wang <[email protected]> 写道:
>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>> > > > >> Hi Becket,
>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>> > > > >> I think that's a great idea!  I have added the
>>>>>>>>>>>>> > > > >> SupportSplitReassignmentOnRecovery interface in this
>>>>>>>>>>>>> FLIP. If a
>>>>>>>>>>>>> > Source
>>>>>>>>>>>>> > > > >> implements this interface indicates that the source
>>>>>>>>>>>>> operator needs
>>>>>>>>>>>>> > to
>>>>>>>>>>>>> > > > >> report splits to the enumerator and receive
>>>>>>>>>>>>> reassignment.[1]
>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>> > > > >> Best,
>>>>>>>>>>>>> > > > >> Hongshun
>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>> > > > >> [1]
>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>> > > >
>>>>>>>>>>>>> >
>>>>>>>>>>>>> >
>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>> > > > >> On Thu, Aug 21, 2025 at 12:09 PM Becket Qin <
>>>>>>>>>>>>> [email protected]>
>>>>>>>>>>>>> > > > wrote:
>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>> > > > >>> Hi Hongshun,
>>>>>>>>>>>>> > > > >>>
>>>>>>>>>>>>> > > > >>> I think the convention for such optional features in
>>>>>>>>>>>>> Source is via
>>>>>>>>>>>>> > > > >>> mix-in interfaces. So instead of adding a method to
>>>>>>>>>>>>> the
>>>>>>>>>>>>> > SourceReader,
>>>>>>>>>>>>> > > > maybe
>>>>>>>>>>>>> > > > >>> we should introduce an interface
>>>>>>>>>>>>> SupportSplitReassingmentOnRecovery
>>>>>>>>>>>>> > > > with
>>>>>>>>>>>>> > > > >>> this method. If a Source implementation implements
>>>>>>>>>>>>> that interface,
>>>>>>>>>>>>> > > > then the
>>>>>>>>>>>>> > > > >>> SourceOperator will check the desired behavior and
>>>>>>>>>>>>> act accordingly.
>>>>>>>>>>>>> > > > >>>
>>>>>>>>>>>>> > > > >>> Thanks,
>>>>>>>>>>>>> > > > >>>
>>>>>>>>>>>>> > > > >>> Jiangjie (Becket) Qin
>>>>>>>>>>>>> > > > >>>
>>>>>>>>>>>>> > > > >>> On Wed, Aug 20, 2025 at 8:52 PM Hongshun Wang <
>>>>>>>>>>>>> > [email protected]
>>>>>>>>>>>>> > > > >
>>>>>>>>>>>>> > > > >>> wrote:
>>>>>>>>>>>>> > > > >>>
>>>>>>>>>>>>> > > > >>>> Hi de vs,
>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>> > > > >>>> Would anyone like to discuss this FLIP? I'd
>>>>>>>>>>>>> appreciate your
>>>>>>>>>>>>> > feedback
>>>>>>>>>>>>> > > > >>>> and suggestions.
>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>> > > > >>>> Best,
>>>>>>>>>>>>> > > > >>>> Hongshun
>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>> > > > >>>> 2025年8月13日 14:23,Hongshun Wang <[email protected]>
>>>>>>>>>>>>> 写道:
>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>> > > > >>>> Hi Becket,
>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>> > > > >>>> Thank you for your detailed feedback. The new
>>>>>>>>>>>>> contract makes good
>>>>>>>>>>>>> > > > sense
>>>>>>>>>>>>> > > > >>>> to me and effectively addresses the issues I
>>>>>>>>>>>>> encountered at the
>>>>>>>>>>>>> > > > beginning
>>>>>>>>>>>>> > > > >>>> of the design.
>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>> > > > >>>> That said, I recommend not reporting splits by
>>>>>>>>>>>>> default, primarily
>>>>>>>>>>>>> > for
>>>>>>>>>>>>> > > > >>>> compatibility and practical reasons:
>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>> > > > >>>> >  For these reasons, we do not expect the Split
>>>>>>>>>>>>> objects to be
>>>>>>>>>>>>> > huge,
>>>>>>>>>>>>> > > > >>>> and we are not trying to design for huge Split
>>>>>>>>>>>>> objects either as
>>>>>>>>>>>>> > they
>>>>>>>>>>>>> > > > will
>>>>>>>>>>>>> > > > >>>> have problems even today.
>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>> > > > >>>>    1.
>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>> > > > >>>>    Not all existing connector match this rule
>>>>>>>>>>>>> > > > >>>>    For example, in mysql cdc connector, a binlog
>>>>>>>>>>>>> split may contain
>>>>>>>>>>>>> > > > >>>>    hundreds (or even more) snapshot split
>>>>>>>>>>>>> completion records. This
>>>>>>>>>>>>> > > > state is
>>>>>>>>>>>>> > > > >>>>    large and is currently transmitted incrementally
>>>>>>>>>>>>> through
>>>>>>>>>>>>> > multiple
>>>>>>>>>>>>> > > > >>>>    BinlogSplitMetaEvent messages. Since the binlog
>>>>>>>>>>>>> reader operates
>>>>>>>>>>>>> > > > >>>>    with single parallelism, reporting the full
>>>>>>>>>>>>> split state on
>>>>>>>>>>>>> > recovery
>>>>>>>>>>>>> > > > >>>>    could be inefficient or even infeasible.
>>>>>>>>>>>>> > > > >>>>    For such sources, it would be better to provide
>>>>>>>>>>>>> a mechanism to
>>>>>>>>>>>>> > skip
>>>>>>>>>>>>> > > > >>>>    split reporting during restart until they
>>>>>>>>>>>>> redesign and reduce
>>>>>>>>>>>>> > the
>>>>>>>>>>>>> > > > >>>>    split size.
>>>>>>>>>>>>> > > > >>>>    2.
>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>> > > > >>>>    Not all enumerators maintain unassigned splits
>>>>>>>>>>>>> in state.
>>>>>>>>>>>>> > > > >>>>    Some SplitEnumerator(such as kafka connector)
>>>>>>>>>>>>> implementations
>>>>>>>>>>>>> > do
>>>>>>>>>>>>> > > > >>>>    not track or persistently manage unassigned
>>>>>>>>>>>>> splits. Requiring
>>>>>>>>>>>>> > them
>>>>>>>>>>>>> > > > to
>>>>>>>>>>>>> > > > >>>>    handle re-registration would add unnecessary
>>>>>>>>>>>>> complexity. Even
>>>>>>>>>>>>> > > > though we
>>>>>>>>>>>>> > > > >>>>    maybe implements in kafka connector, currently,
>>>>>>>>>>>>> kafka connector
>>>>>>>>>>>>> > is
>>>>>>>>>>>>> > > > decouple
>>>>>>>>>>>>> > > > >>>>    with flink version, we also need to make sure
>>>>>>>>>>>>> the elder version
>>>>>>>>>>>>> > is
>>>>>>>>>>>>> > > > >>>>    compatible.
>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>> > > > >>>> ------------------------------
>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>> > > > >>>> To address these concerns, I propose introducing a
>>>>>>>>>>>>> new method:
>>>>>>>>>>>>> > boolean
>>>>>>>>>>>>> > > > >>>> SourceReader#shouldReassignSplitsOnRecovery() with
>>>>>>>>>>>>> a default
>>>>>>>>>>>>> > > > >>>> implementation returning false. This allows source
>>>>>>>>>>>>> readers to opt
>>>>>>>>>>>>> > in
>>>>>>>>>>>>> > > > >>>> to split reassignment only when necessary. Since
>>>>>>>>>>>>> the new contract
>>>>>>>>>>>>> > > > already
>>>>>>>>>>>>> > > > >>>> places the responsibility for split assignment on
>>>>>>>>>>>>> the enumerator,
>>>>>>>>>>>>> > not
>>>>>>>>>>>>> > > > >>>> reporting splits by default is a safe and clean
>>>>>>>>>>>>> default behavior.
>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>> > > > >>>> ------------------------------
>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>> > > > >>>> I’ve updated the implementation and the FIP
>>>>>>>>>>>>> accordingly[1]. It
>>>>>>>>>>>>> > quite a
>>>>>>>>>>>>> > > > >>>> big change. In particular, for the Kafka connector,
>>>>>>>>>>>>> we can now use
>>>>>>>>>>>>> > a
>>>>>>>>>>>>> > > > >>>> pluggable SplitPartitioner to support different
>>>>>>>>>>>>> split assignment
>>>>>>>>>>>>> > > > >>>> strategies (e.g., default, round-robin).
>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>> > > > >>>> Could you please review it when you have a chance?
>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>> > > > >>>> Best,
>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>> > > > >>>> Hongshun
>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>> > > > >>>> [1]
>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>> > > >
>>>>>>>>>>>>> >
>>>>>>>>>>>>> >
>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>> > > > >>>> On Sat, Aug 9, 2025 at 3:03 AM Becket Qin <
>>>>>>>>>>>>> [email protected]>
>>>>>>>>>>>>> > > > wrote:
>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>> > > > >>>>> Hi Hongshun,
>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>> > > > >>>>> I am not too concerned about the transmission
>>>>>>>>>>>>> cost. Because the
>>>>>>>>>>>>> > full
>>>>>>>>>>>>> > > > >>>>> split transmission has to happen in the initial
>>>>>>>>>>>>> assignment phase
>>>>>>>>>>>>> > > > already.
>>>>>>>>>>>>> > > > >>>>> And in the future, we probably want to also
>>>>>>>>>>>>> introduce some kind
>>>>>>>>>>>>> > of
>>>>>>>>>>>>> > > > workload
>>>>>>>>>>>>> > > > >>>>> balance across source readers, e.g. based on the
>>>>>>>>>>>>> per-split
>>>>>>>>>>>>> > > > throughput or
>>>>>>>>>>>>> > > > >>>>> the per-source-reader workload in heterogeneous
>>>>>>>>>>>>> clusters. For
>>>>>>>>>>>>> > these
>>>>>>>>>>>>> > > > >>>>> reasons, we do not expect the Split objects to be
>>>>>>>>>>>>> huge, and we
>>>>>>>>>>>>> > are
>>>>>>>>>>>>> > > > not
>>>>>>>>>>>>> > > > >>>>> trying to design for huge Split objects either as
>>>>>>>>>>>>> they will have
>>>>>>>>>>>>> > > > problems
>>>>>>>>>>>>> > > > >>>>> even today.
>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>> > > > >>>>> Good point on the potential split loss, please see
>>>>>>>>>>>>> the reply
>>>>>>>>>>>>> > below:
>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>> > > > >>>>> Scenario 2:
>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>> > > > >>>>>> 1. Reader A reports splits (1 and 2), and Reader
>>>>>>>>>>>>> B reports (3
>>>>>>>>>>>>> > and 4)
>>>>>>>>>>>>> > > > >>>>>> upon restart.
>>>>>>>>>>>>> > > > >>>>>> 2. Before the enumerator receives all reports and
>>>>>>>>>>>>> performs
>>>>>>>>>>>>> > > > >>>>>> reassignment, a checkpoint is triggered.
>>>>>>>>>>>>> > > > >>>>>> 3. Since no splits have been reassigned yet, both
>>>>>>>>>>>>> readers have
>>>>>>>>>>>>> > empty
>>>>>>>>>>>>> > > > >>>>>> states.
>>>>>>>>>>>>> > > > >>>>>> 4. When restarting from this checkpoint, all four
>>>>>>>>>>>>> splits are
>>>>>>>>>>>>> > lost.
>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>> > > > >>>>> The reader registration happens in the
>>>>>>>>>>>>> SourceOperator.open(),
>>>>>>>>>>>>> > which
>>>>>>>>>>>>> > > > >>>>> means the task is still in the initializing state,
>>>>>>>>>>>>> therefore the
>>>>>>>>>>>>> > > > checkpoint
>>>>>>>>>>>>> > > > >>>>> should not be triggered until the enumerator
>>>>>>>>>>>>> receives all the
>>>>>>>>>>>>> > split
>>>>>>>>>>>>> > > > reports.
>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>> > > > >>>>> There is a nuance here. Today, the RPC call from
>>>>>>>>>>>>> the TM to the JM
>>>>>>>>>>>>> > is
>>>>>>>>>>>>> > > > >>>>> async. So it is possible that the
>>>>>>>>>>>>> SourceOpertor.open() has
>>>>>>>>>>>>> > returned,
>>>>>>>>>>>>> > > > but
>>>>>>>>>>>>> > > > >>>>> the enumerator has not received the split reports.
>>>>>>>>>>>>> However,
>>>>>>>>>>>>> > because
>>>>>>>>>>>>> > > > the
>>>>>>>>>>>>> > > > >>>>> task status update RPC call goes to the same
>>>>>>>>>>>>> channel as the split
>>>>>>>>>>>>> > > > reports
>>>>>>>>>>>>> > > > >>>>> call, so the task status RPC call will happen
>>>>>>>>>>>>> after the split
>>>>>>>>>>>>> > > > reports call
>>>>>>>>>>>>> > > > >>>>> on the JM side. Therefore, on the JM side, the
>>>>>>>>>>>>> SourceCoordinator
>>>>>>>>>>>>> > will
>>>>>>>>>>>>> > > > >>>>> always first receive the split reports, then
>>>>>>>>>>>>> receive the
>>>>>>>>>>>>> > checkpoint
>>>>>>>>>>>>> > > > request.
>>>>>>>>>>>>> > > > >>>>> This "happen before" relationship is kind of
>>>>>>>>>>>>> important to
>>>>>>>>>>>>> > guarantee
>>>>>>>>>>>>> > > > >>>>> the consistent state between enumerator and
>>>>>>>>>>>>> readers.
>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>> > > > >>>>> Scenario 1:
>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>> > > > >>>>>> 1. Upon restart, Reader A reports assigned splits
>>>>>>>>>>>>> (1 and 2), and
>>>>>>>>>>>>> > > > >>>>>> Reader B reports (3 and 4).
>>>>>>>>>>>>> > > > >>>>>> 2. The enumerator receives these reports but only
>>>>>>>>>>>>> reassigns
>>>>>>>>>>>>> > splits 1
>>>>>>>>>>>>> > > > >>>>>> and 2 — not 3 and 4.
>>>>>>>>>>>>> > > > >>>>>> 3. A checkpoint or savepoint is then triggered.
>>>>>>>>>>>>> Only splits 1
>>>>>>>>>>>>> > and 2
>>>>>>>>>>>>> > > > >>>>>> are recorded in the reader states; splits 3 and 4
>>>>>>>>>>>>> are not
>>>>>>>>>>>>> > persisted.
>>>>>>>>>>>>> > > > >>>>>> 4. If the job is later restarted from this
>>>>>>>>>>>>> checkpoint, splits 3
>>>>>>>>>>>>> > and
>>>>>>>>>>>>> > > > 4
>>>>>>>>>>>>> > > > >>>>>> will be permanently lost.
>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>> > > > >>>>> This scenario is possible. One solution is to let
>>>>>>>>>>>>> the enumerator
>>>>>>>>>>>>> > > > >>>>> implementation handle this. That means if the
>>>>>>>>>>>>> enumerator relies
>>>>>>>>>>>>> > on
>>>>>>>>>>>>> > > > the
>>>>>>>>>>>>> > > > >>>>> initial split reports from the source readers, it
>>>>>>>>>>>>> should maintain
>>>>>>>>>>>>> > > > these
>>>>>>>>>>>>> > > > >>>>> reports by itself. In the above example, the
>>>>>>>>>>>>> enumerator will need
>>>>>>>>>>>>> > to
>>>>>>>>>>>>> > > > >>>>> remember that 3 and 4 are not assigned and put it
>>>>>>>>>>>>> into its own
>>>>>>>>>>>>> > state.
>>>>>>>>>>>>> > > > >>>>> The current contract is that anything assigned to
>>>>>>>>>>>>> the
>>>>>>>>>>>>> > SourceReaders
>>>>>>>>>>>>> > > > >>>>> are completely owned by the SourceReaders.
>>>>>>>>>>>>> Enumerators can
>>>>>>>>>>>>> > remember
>>>>>>>>>>>>> > > > the
>>>>>>>>>>>>> > > > >>>>> assignments but cannot change them, even when the
>>>>>>>>>>>>> source reader
>>>>>>>>>>>>> > > > recovers /
>>>>>>>>>>>>> > > > >>>>> restarts.
>>>>>>>>>>>>> > > > >>>>> With this FLIP, the contract becomes that the
>>>>>>>>>>>>> source readers will
>>>>>>>>>>>>> > > > >>>>> return the ownership of the splits to the
>>>>>>>>>>>>> enumerator. So the
>>>>>>>>>>>>> > > > enumerator is
>>>>>>>>>>>>> > > > >>>>> responsible for maintaining these splits, until
>>>>>>>>>>>>> they are assigned
>>>>>>>>>>>>> > to
>>>>>>>>>>>>> > > > a
>>>>>>>>>>>>> > > > >>>>> source reader again.
>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>> > > > >>>>> There are other cases where there may be conflict
>>>>>>>>>>>>> information
>>>>>>>>>>>>> > between
>>>>>>>>>>>>> > > > >>>>> reader and enumerator. For example, consider the
>>>>>>>>>>>>> following
>>>>>>>>>>>>> > sequence:
>>>>>>>>>>>>> > > > >>>>> 1. reader A reports splits (1 and 2) up on restart.
>>>>>>>>>>>>> > > > >>>>> 2. enumerator receives the report and assigns both
>>>>>>>>>>>>> 1 and 2 to
>>>>>>>>>>>>> > reader
>>>>>>>>>>>>> > > > B.
>>>>>>>>>>>>> > > > >>>>> 3. reader A failed before checkpointing. And this
>>>>>>>>>>>>> is a partial
>>>>>>>>>>>>> > > > >>>>> failure, so only reader A restarts.
>>>>>>>>>>>>> > > > >>>>> 4. When reader A recovers, it will again report
>>>>>>>>>>>>> splits (1 and 2)
>>>>>>>>>>>>> > to
>>>>>>>>>>>>> > > > >>>>> the enumerator.
>>>>>>>>>>>>> > > > >>>>> 5. The enumerator should ignore this report
>>>>>>>>>>>>> because it has
>>>>>>>>>>>>> > > > >>>>> assigned splits (1 and 2) to reader B.
>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>> > > > >>>>> So with the new contract, the enumerator should be
>>>>>>>>>>>>> the source of
>>>>>>>>>>>>> > > > truth
>>>>>>>>>>>>> > > > >>>>> for split ownership.
>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>> > > > >>>>> Thanks,
>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>> > > > >>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>> > > > >>>>> On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang <
>>>>>>>>>>>>> > > > [email protected]>
>>>>>>>>>>>>> > > > >>>>> wrote:
>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>> > > > >>>>>> Hi Becket,
>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>> > > > >>>>>> I did consider this approach at the beginning
>>>>>>>>>>>>> (and it was also
>>>>>>>>>>>>> > > > >>>>>> mentioned in this FLIP), since it would allow
>>>>>>>>>>>>> more flexibility
>>>>>>>>>>>>> > in
>>>>>>>>>>>>> > > > >>>>>> reassigning all splits. However, there are a few
>>>>>>>>>>>>> potential
>>>>>>>>>>>>> > issues.
>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>> > > > >>>>>> 1. High Transmission Cost
>>>>>>>>>>>>> > > > >>>>>> If we pass the full split objects (rather than
>>>>>>>>>>>>> just split IDs),
>>>>>>>>>>>>> > the
>>>>>>>>>>>>> > > > >>>>>> data size could be significant, leading to high
>>>>>>>>>>>>> overhead during
>>>>>>>>>>>>> > > > >>>>>> transmission — especially when many splits are
>>>>>>>>>>>>> involved.
>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>> > > > >>>>>> 2. Risk of Split Loss
>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>> > > > >>>>>> Risk of split loss exists unless we have a
>>>>>>>>>>>>> mechanism to make
>>>>>>>>>>>>> > sure
>>>>>>>>>>>>> > > > >>>>>> only can checkpoint after all the splits are
>>>>>>>>>>>>> reassigned.
>>>>>>>>>>>>> > > > >>>>>> There are scenarios where splits could be lost
>>>>>>>>>>>>> due to
>>>>>>>>>>>>> > inconsistent
>>>>>>>>>>>>> > > > >>>>>> state handling during recovery:
>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>> > > > >>>>>> Scenario 1:
>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>> > > > >>>>>>    1. Upon restart, Reader A reports assigned
>>>>>>>>>>>>> splits (1 and 2),
>>>>>>>>>>>>> > and
>>>>>>>>>>>>> > > > >>>>>>    Reader B reports (3 and 4).
>>>>>>>>>>>>> > > > >>>>>>    2. The enumerator receives these reports but
>>>>>>>>>>>>> only reassigns
>>>>>>>>>>>>> > > > >>>>>>    splits 1 and 2 — not 3 and 4.
>>>>>>>>>>>>> > > > >>>>>>    3. A checkpoint or savepoint is then
>>>>>>>>>>>>> triggered. Only splits 1
>>>>>>>>>>>>> > and
>>>>>>>>>>>>> > > > >>>>>>    2 are recorded in the reader states; splits 3
>>>>>>>>>>>>> and 4 are not
>>>>>>>>>>>>> > > > persisted.
>>>>>>>>>>>>> > > > >>>>>>    4. If the job is later restarted from this
>>>>>>>>>>>>> checkpoint, splits
>>>>>>>>>>>>> > 3
>>>>>>>>>>>>> > > > >>>>>>    and 4 will be permanently lost.
>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>> > > > >>>>>> Scenario 2:
>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>> > > > >>>>>>    1. Reader A reports splits (1 and 2), and
>>>>>>>>>>>>> Reader B reports (3
>>>>>>>>>>>>> > and
>>>>>>>>>>>>> > > > >>>>>>    4) upon restart.
>>>>>>>>>>>>> > > > >>>>>>    2. Before the enumerator receives all reports
>>>>>>>>>>>>> and performs
>>>>>>>>>>>>> > > > >>>>>>    reassignment, a checkpoint is triggered.
>>>>>>>>>>>>> > > > >>>>>>    3. Since no splits have been reassigned yet,
>>>>>>>>>>>>> both readers
>>>>>>>>>>>>> > have
>>>>>>>>>>>>> > > > >>>>>>    empty states.
>>>>>>>>>>>>> > > > >>>>>>    4. When restarting from this checkpoint, all
>>>>>>>>>>>>> four splits are
>>>>>>>>>>>>> > > > lost.
>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>> > > > >>>>>> Let me know if you have thoughts on how we might
>>>>>>>>>>>>> mitigate these
>>>>>>>>>>>>> > > > risks!
>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>> > > > >>>>>> Best
>>>>>>>>>>>>> > > > >>>>>> Hongshun
>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>> > > > >>>>>> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin <
>>>>>>>>>>>>> [email protected]>
>>>>>>>>>>>>> > > > >>>>>> wrote:
>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>> > > > >>>>>>> Hi Hongshun,
>>>>>>>>>>>>> > > > >>>>>>>
>>>>>>>>>>>>> > > > >>>>>>> The steps sound reasonable to me in general. In
>>>>>>>>>>>>> terms of the
>>>>>>>>>>>>> > > > updated
>>>>>>>>>>>>> > > > >>>>>>> FLIP wiki, it would be good to see if we can
>>>>>>>>>>>>> keep the protocol
>>>>>>>>>>>>> > > > simple. One
>>>>>>>>>>>>> > > > >>>>>>> alternative way to achieve this behavior is
>>>>>>>>>>>>> following:
>>>>>>>>>>>>> > > > >>>>>>>
>>>>>>>>>>>>> > > > >>>>>>> 1. Upon SourceOperator startup, the
>>>>>>>>>>>>> SourceOperator sends
>>>>>>>>>>>>> > > > >>>>>>> ReaderRegistrationEvent with the currently
>>>>>>>>>>>>> assigned splits to
>>>>>>>>>>>>> > the
>>>>>>>>>>>>> > > > >>>>>>> enumerator. It does not add these splits to the
>>>>>>>>>>>>> SourceReader.
>>>>>>>>>>>>> > > > >>>>>>> 2. The enumerator will always use the
>>>>>>>>>>>>> > > > >>>>>>> SourceEnumeratorContext.assignSplits() to assign
>>>>>>>>>>>>> the splits.
>>>>>>>>>>>>> > (not
>>>>>>>>>>>>> > > > via the
>>>>>>>>>>>>> > > > >>>>>>> response of the SourceRegistrationEvent, this
>>>>>>>>>>>>> allows async
>>>>>>>>>>>>> > split
>>>>>>>>>>>>> > > > assignment
>>>>>>>>>>>>> > > > >>>>>>> in case the enumerator wants to wait until all
>>>>>>>>>>>>> the readers are
>>>>>>>>>>>>> > > > registered)
>>>>>>>>>>>>> > > > >>>>>>> 3. The SourceOperator will only call
>>>>>>>>>>>>> SourceReader.addSplits()
>>>>>>>>>>>>> > when
>>>>>>>>>>>>> > > > >>>>>>> it receives the AddSplitEvent from the
>>>>>>>>>>>>> enumerator.
>>>>>>>>>>>>> > > > >>>>>>>
>>>>>>>>>>>>> > > > >>>>>>> This protocol has a few benefits:
>>>>>>>>>>>>> > > > >>>>>>> 1. it basically allows arbitrary split
>>>>>>>>>>>>> reassignment upon
>>>>>>>>>>>>> > restart
>>>>>>>>>>>>> > > > >>>>>>> 2. simplicity: there is only one way to assign
>>>>>>>>>>>>> splits.
>>>>>>>>>>>>> > > > >>>>>>>
>>>>>>>>>>>>> > > > >>>>>>> So we only need one interface change:
>>>>>>>>>>>>> > > > >>>>>>> - add the initially assigned splits to
>>>>>>>>>>>>> ReaderInfo so the
>>>>>>>>>>>>> > Enumerator
>>>>>>>>>>>>> > > > >>>>>>> can access it.
>>>>>>>>>>>>> > > > >>>>>>> and one behavior change:
>>>>>>>>>>>>> > > > >>>>>>> - The SourceOperator should stop assigning
>>>>>>>>>>>>> splits to the from
>>>>>>>>>>>>> > state
>>>>>>>>>>>>> > > > >>>>>>> restoration, but
>>>>>>>>>>>>> > [message truncated...]
>>>>>>>>>>>>> >
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>

Reply via email to