Thanks hongshun for the updating and pretty detailed analysis for edge cases,  
the updated FLIP looks good to me now.

Only last implementation details about scenario in motivation section:

Restart with Changed subscription: During restart, if source options remove a 
topic or table. The splits which have already assigned can not be removed.

Could you clarify how we resolve this in Kafka connector ?

Best,
Leonard


> 2025 10月 9 19:48,Hongshun Wang <[email protected]> 写道:
> 
> Hi devs,
> If there are no further suggestions, I will start the voting tomorrow。
> 
> Best,
> Hongshun
> 
> On Fri, Sep 26, 2025 at 7:48 PM Hongshun Wang <[email protected] 
> <mailto:[email protected]>> wrote:
>> Hi Becket and Leonard,
>> I have updated the content of this FLIP. The key point is that:
>> When the split enumerator receives a split, these splits must have already 
>> existed in pendingSplitAssignment or assignedSplitments. 
>> If the split is in pendingSplitAssignments, ignore it.
>> If the split is in assignedSplitAssignments but has a different taskId, 
>> ignore it (this indicates it was already assigned to another task).
>> If the split is in assignedSplitAssignments and shares the same taskId, move 
>> the assignment from assignedSplitments to pendingSplitAssignment to 
>> re-assign again.
>> 
>> For better understanding why use these strategies. I added some examples and 
>> pictures to show it.
>> 
>> Would you like to help me check whether there are still some problems?
>> 
>> Best
>> Hongshun
>> 
>> 
>> On Fri, Sep 26, 2025 at 5:08 PM Leonard Xu <[email protected] 
>> <mailto:[email protected]>> wrote:
>>> Thanks Becket and Hongshun for the insightful discussion.
>>> 
>>> The underlying implementation and communication mechanisms of Flink Source 
>>> indeed involve many intricate details, we discussed the issue of splits 
>>> re-assignment in specific scenarios, but fortunately, the final decision 
>>> turned out to be pretty clear.
>>> 
>>>  +1 to Becket’s proposal to keeps the framework cleaner and more flexible. 
>>> +1 to Hongshun’s point to provide comprehensive guidance for connector 
>>> developers. 
>>> 
>>> 
>>> Best, 
>>> Leonard
>>> 
>>> 
>>> 
>>>> 2025 9月 26 16:30,Hongshun Wang <[email protected] 
>>>> <mailto:[email protected]>> 写道:
>>>> 
>>>> Hi Becket,
>>>> 
>>>> I Got it. You’re suggesting we should not handle this in the source 
>>>> framework but instead let the split enumerator manage these three 
>>>> scenarios.
>>>> 
>>>> Let me explain why I originally favored handling it in the framework: I'm 
>>>> concerned that connector developers might overlook certain edge cases 
>>>> (after all, we even payed extensive discussions to fully clarify the logic)
>>>> 
>>>> However, your point keeps the framework cleaner and more flexible. Thus, I 
>>>> will take it.
>>>> 
>>>> Perhaps, in this FLIP, we should focus on providing comprehensive guidance 
>>>> for connector developers: explain how to implement a split enumerator, 
>>>> including the underlying challenges and their solutions.
>>>>  
>>>> Additionally, we can use the Kafka connector as a reference implementation 
>>>> to demonstrate the practical steps. This way, developers who want to 
>>>> implement similar connectors can directly reference this example.
>>>> 
>>>> 
>>>> Best,
>>>> Hongshun
>>>> 
>>>> 
>>>> 
>>>> On Fri, Sep 26, 2025 at 1:27 PM Becket Qin <[email protected] 
>>>> <mailto:[email protected]>> wrote:
>>>>> It would be good to not expose runtime details to the source 
>>>>> implementation if possible. 
>>>>> 
>>>>> Today, the split enumerator implementations are expected to track the 
>>>>> split assignment.
>>>>> 
>>>>> Assuming the split enumerator implementation keeps a split assignment 
>>>>> map, that means the enumerator should already know whether a split is 
>>>>> assigned or unassigned. So it can handle the three scenarios you 
>>>>> mentioned.
>>>>> 
>>>>>> The split is reported by a reader during a global restoration.
>>>>> The split enumerator should have just been restored / created. If the 
>>>>> enumerator expects a full reassignment of splits up on global recovery, 
>>>>> there should be no assigned splits to that reader in the split assignment 
>>>>> mapping.
>>>>> 
>>>>>> The split is reported by a reader during a partial failure recovery.
>>>>> In this case, when SplitEnumerator.addReader() is invoked, the split 
>>>>> assignment map in the enumerator implementation should already have some 
>>>>> split assignments for the reader. Therefore it is a partial failover. If 
>>>>> the source supports split reassignment on recovery, the enumerator can 
>>>>> assign splits that are different from the reported assignment of that 
>>>>> reader in the SplitEnumeratorContext, or it can also assign the same 
>>>>> splits. In any case, the enumerator knows that this is a partial recovery 
>>>>> because the assignment map is non-empty.
>>>>> 
>>>>>> The split is not reported by a reader, but is assigned after the last 
>>>>>> successful checkpoint and was never acknowledged.
>>>>> This is actually one of the step in the partial failure recover. 
>>>>> SplitEnumerator.addSplitsBack() will be called first before 
>>>>> SplitReader.addReader() is called for the recovered reader. When the 
>>>>> SplitEnumerator.addSplitsBack() is invoked, it is for sure a partial 
>>>>> recovery. And the enumerator should remove these splits from the split 
>>>>> assignment map as if they were never assigned.
>>>>> 
>>>>> I think this should work, right?
>>>>> 
>>>>> Thanks,
>>>>> 
>>>>> Jiangjie (Becket) Qin
>>>>> 
>>>>> On Thu, Sep 25, 2025 at 8:34 PM Hongshun Wang <[email protected] 
>>>>> <mailto:[email protected]>> wrote:
>>>>>> Hi Becket and Leonard,
>>>>>> 
>>>>>> Thanks for your advice. 
>>>>>> 
>>>>>> > put all the reader information in the SplitEnumerator context
>>>>>> I have a concern: the current registeredReaders in 
>>>>>> SourceCoordinatorContext will be removed after subtaskResetor execution 
>>>>>> on failure.However, this approach has merit.
>>>>>> 
>>>>>> One more situation I found my previous design does not cover:
>>>>>> Initial state: Reader A reports splits (1, 2).
>>>>>> Enumerator action: Assigns split 1 to Reader A, and split 2 to Reader B.
>>>>>> Failure scenario: Reader A fails before checkpointing. Since this is a 
>>>>>> partial failure, only Reader A restarts.
>>>>>> Recovery issue: Upon recovery, Reader A re-reports split (1).
>>>>>> In my previous design, the enumerator will ignore Reader A's 
>>>>>> re-registration which will cause data loss.
>>>>>> 
>>>>>> Thus, when the enumerator receives a split, the split may originate from 
>>>>>> three scenarios:
>>>>>> The split is reported by a reader during a global restoration.
>>>>>> The split is reported by a reader during a partial failure recovery.
>>>>>> The split is not reported by a reader, but is assigned after the last 
>>>>>> successful checkpoint and was never acknowledged.
>>>>>> In the first scenario (global restore), the split should be 
>>>>>> re-distributed. For the latter two scenarios (partial failover and 
>>>>>> post-checkpoint assignment), we need to reassign the split to its 
>>>>>> originally assigned subtask.
>>>>>> 
>>>>>> By implementing a method in the SplitEnumerator context to track each 
>>>>>> assigned split's status, the system can correctly identify and resolve 
>>>>>> split ownership in all three scenarios.What about adding a  
>>>>>> `SplitRecoveryType splitRecoveryType(Split split)` in 
>>>>>> SplitEnumeratorContext. SplitRecoveryTypeis a enum including 
>>>>>> `UNASSIGNED`、`GLOBAL_RESTORE`、`PARTIAL_FAILOVER` and 
>>>>>> `POST_CHECKPOINT_ASSIGNMENT`.
>>>>>> 
>>>>>> What do you think? Are there any details or scenarios I haven't 
>>>>>> considered? Looking forward to your advice.
>>>>>> 
>>>>>> Best,
>>>>>> Hongshun
>>>>>> 
>>>>>> On Thu, Sep 11, 2025 at 12:41 AM Becket Qin <[email protected] 
>>>>>> <mailto:[email protected]>> wrote:
>>>>>>> Thanks for the explanation, Hongshun.
>>>>>>> 
>>>>>>> Current pattern of handling new reader registration following:
>>>>>>> 1. put all the reader information in the SplitEnumerator context
>>>>>>> 2. notify the enumerator about the new reader registration.
>>>>>>> 3. Let the split enumerator get whatever information it wants from the
>>>>>>> context and do its job.
>>>>>>> This pattern decouples the information passing and the reader 
>>>>>>> registration
>>>>>>> notification. This makes the API extensible - we can add more 
>>>>>>> information
>>>>>>> (e.g. reported assigned splits in our case) about the reader to the 
>>>>>>> context
>>>>>>> without introducing new methods.
>>>>>>> 
>>>>>>> Introducing a new method of addSplitBackOnRecovery() is redundant to the
>>>>>>> above pattern. Do we really need it?
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> 
>>>>>>> Jiangjie (Becket) Qin
>>>>>>> 
>>>>>>> On Mon, Sep 8, 2025 at 8:18 PM Hongshun Wang <[email protected] 
>>>>>>> <mailto:[email protected]>>
>>>>>>> wrote:
>>>>>>> 
>>>>>>> > Hi Becket,
>>>>>>> >
>>>>>>> > > I am curious what would the enumerator do differently for the splits
>>>>>>> > added via addSplitsBackOnRecovery() V.S. addSplitsBack()?
>>>>>>> >
>>>>>>> >  In this FLIP, there are two distinct scenarios in which the 
>>>>>>> > enumerator
>>>>>>> > receives splits being added back:
>>>>>>> > 1.  Job-level restore: The job is restored,  splits from reader’s 
>>>>>>> > state are
>>>>>>> > reported by ReaderRegistrationEvent.
>>>>>>> > 2.  Reader-level restart: a reader is started but not the whole  job,
>>>>>>> >  splits assigned to it after the last successful checkpoint. This is 
>>>>>>> > what
>>>>>>> > addSplitsBack used to do.
>>>>>>> >
>>>>>>> >
>>>>>>> > In these two situations, the enumerator will choose different 
>>>>>>> > strategies.
>>>>>>> > 1. Job-level restore: the splits should be redistributed across 
>>>>>>> > readers
>>>>>>> > according to the current partitioner strategy.
>>>>>>> > 2. Reader-level restart: the splits should be reassigned directly 
>>>>>>> > back to
>>>>>>> > the same reader they were originally assigned to, preserving locality 
>>>>>>> > and
>>>>>>> > avoiding unnecessary redistribution
>>>>>>> >
>>>>>>> > Therefore, the enumerator must clearly distinguish between these two
>>>>>>> > scenarios.I used to deprecate the former  addSplitsBack(List<SplitT>
>>>>>>> > splits, int subtaskId) but add a new addSplitsBack(List<SplitT>
>>>>>>> > splits, int subtaskId,
>>>>>>> > boolean reportedByReader).
>>>>>>> > Leonard suggest to use another method addSplitsBackOnRecovery but not
>>>>>>> > influenced  currently addSplitsBack.
>>>>>>> >
>>>>>>> > Best
>>>>>>> > Hongshun
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> > On 2025/09/08 17:20:31 Becket Qin wrote:
>>>>>>> > > Hi Leonard,
>>>>>>> > >
>>>>>>> > >
>>>>>>> > > > Could we introduce a new method like addSplitsBackOnRecovery  with
>>>>>>> > default
>>>>>>> > > > implementation. In this way, we can provide better backward
>>>>>>> > compatibility
>>>>>>> > > > and also makes it easier for developers to understand.
>>>>>>> > >
>>>>>>> > >
>>>>>>> > > I am curious what would the enumerator do differently for the splits
>>>>>>> > added
>>>>>>> > > via addSplitsBackOnRecovery() V.S. addSplitsBack()?  Today,
>>>>>>> > addSplitsBack()
>>>>>>> > > is also only called upon recovery. So the new method seems 
>>>>>>> > > confusing. One
>>>>>>> > > thing worth clarifying is if the Source implements
>>>>>>> > > SupportSplitReassignmentOnRecovery, upon recovery, should the splits
>>>>>>> > > reported by the readers also be added back to the SplitEnumerator 
>>>>>>> > > via the
>>>>>>> > > addSplitsBack() call? Or should the SplitEnumerator explicitly 
>>>>>>> > > query the
>>>>>>> > > registered reader information via the SplitEnumeratorContext to get 
>>>>>>> > > the
>>>>>>> > > originally assigned splits when addReader() is invoked? I was 
>>>>>>> > > assuming
>>>>>>> > the
>>>>>>> > > latter in the beginning, so the behavior of addSplitsBack() remains
>>>>>>> > > unchanged, but I am not opposed in doing the former.
>>>>>>> > >
>>>>>>> > > Also, can you elaborate on the backwards compatibility issue you 
>>>>>>> > > see if
>>>>>>> > we
>>>>>>> > > do not have a separate addSplitsBackOnRecovery() method? Even 
>>>>>>> > > without
>>>>>>> > this
>>>>>>> > > new method, the behavior remains exactly the same unless the end 
>>>>>>> > > users
>>>>>>> > > implement the mix-in interface of 
>>>>>>> > > "SupportSplitReassignmentOnRecovery",
>>>>>>> > > right?
>>>>>>> > >
>>>>>>> > > Thanks,
>>>>>>> > >
>>>>>>> > > Jiangjie (Becket) Qin
>>>>>>> > >
>>>>>>> > > On Mon, Sep 8, 2025 at 1:48 AM Hongshun Wang <[email protected] 
>>>>>>> > > <mailto:[email protected]>>
>>>>>>> > > wrote:
>>>>>>> > >
>>>>>>> > > > Hi devs,
>>>>>>> > > >
>>>>>>> > > > It has been quite some time since this FLIP[1] was first proposed.
>>>>>>> > Thank
>>>>>>> > > > you for your valuable feedback—based on your suggestions, the 
>>>>>>> > > > FLIP has
>>>>>>> > > > undergone several rounds of revisions.
>>>>>>> > > >
>>>>>>> > > > Any more advice is welcome and appreciated. If there are no 
>>>>>>> > > > further
>>>>>>> > > > concerns, I plan to start the vote tomorrow.
>>>>>>> > > >
>>>>>>> > > > Best
>>>>>>> > > > Hongshun
>>>>>>> > > >
>>>>>>> > > > [1]
>>>>>>> > > >
>>>>>>> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=373886480
>>>>>>> > > >
>>>>>>> > > > On Mon, Sep 8, 2025 at 4:42 PM Hongshun Wang <[email protected] 
>>>>>>> > > > <mailto:[email protected]>>
>>>>>>> > > > wrote:
>>>>>>> > > >
>>>>>>> > > > > Hi Leonard,
>>>>>>> > > > > Thanks for your advice.  It makes sense and I have modified it.
>>>>>>> > > > >
>>>>>>> > > > > Best,
>>>>>>> > > > > Hongshun
>>>>>>> > > > >
>>>>>>> > > > > On Mon, Sep 8, 2025 at 11:40 AM Leonard Xu <[email protected] 
>>>>>>> > > > > <mailto:[email protected]>> wrote:
>>>>>>> > > > >
>>>>>>> > > > >> Thanks Hongshun and Becket for the deep discussion.
>>>>>>> > > > >>
>>>>>>> > > > >> I only have one comment for one API design:
>>>>>>> > > > >>
>>>>>>> > > > >> > Deprecate the old addSplitsBack  method, use a addSplitsBack 
>>>>>>> > > > >> > with
>>>>>>> > > > >> param isReportedByReader instead. Because, The enumerator can 
>>>>>>> > > > >> apply
>>>>>>> > > > >> different reassignment policies based on the context.
>>>>>>> > > > >>
>>>>>>> > > > >> Could we introduce a new method like *addSplitsBackOnRecovery* 
>>>>>>> > > > >>  with
>>>>>>> > > > default
>>>>>>> > > > >> implementation. In this way, we can provide better backward
>>>>>>> > > > >> compatibility and also makes it easier for developers to 
>>>>>>> > > > >> understand.
>>>>>>> > > > >>
>>>>>>> > > > >> Best,
>>>>>>> > > > >> Leonard
>>>>>>> > > > >>
>>>>>>> > > > >>
>>>>>>> > > > >>
>>>>>>> > > > >> 2025 9月 3 20:26,Hongshun Wang <[email protected] 
>>>>>>> > > > >> <mailto:[email protected]>> 写道:
>>>>>>> > > > >>
>>>>>>> > > > >> Hi Becket,
>>>>>>> > > > >>
>>>>>>> > > > >> I think that's a great idea!  I have added the
>>>>>>> > > > >> SupportSplitReassignmentOnRecovery interface in this FLIP. If a
>>>>>>> > Source
>>>>>>> > > > >> implements this interface indicates that the source operator 
>>>>>>> > > > >> needs
>>>>>>> > to
>>>>>>> > > > >> report splits to the enumerator and receive reassignment.[1]
>>>>>>> > > > >>
>>>>>>> > > > >> Best,
>>>>>>> > > > >> Hongshun
>>>>>>> > > > >>
>>>>>>> > > > >> [1]
>>>>>>> > > > >>
>>>>>>> > > >
>>>>>>> >
>>>>>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>>>>>>> > > > >>
>>>>>>> > > > >> On Thu, Aug 21, 2025 at 12:09 PM Becket Qin <[email protected] 
>>>>>>> > > > >> <mailto:[email protected]>>
>>>>>>> > > > wrote:
>>>>>>> > > > >>
>>>>>>> > > > >>> Hi Hongshun,
>>>>>>> > > > >>>
>>>>>>> > > > >>> I think the convention for such optional features in Source 
>>>>>>> > > > >>> is via
>>>>>>> > > > >>> mix-in interfaces. So instead of adding a method to the
>>>>>>> > SourceReader,
>>>>>>> > > > maybe
>>>>>>> > > > >>> we should introduce an interface 
>>>>>>> > > > >>> SupportSplitReassingmentOnRecovery
>>>>>>> > > > with
>>>>>>> > > > >>> this method. If a Source implementation implements that 
>>>>>>> > > > >>> interface,
>>>>>>> > > > then the
>>>>>>> > > > >>> SourceOperator will check the desired behavior and act 
>>>>>>> > > > >>> accordingly.
>>>>>>> > > > >>>
>>>>>>> > > > >>> Thanks,
>>>>>>> > > > >>>
>>>>>>> > > > >>> Jiangjie (Becket) Qin
>>>>>>> > > > >>>
>>>>>>> > > > >>> On Wed, Aug 20, 2025 at 8:52 PM Hongshun Wang <
>>>>>>> > [email protected] <mailto:[email protected]>
>>>>>>> > > > >
>>>>>>> > > > >>> wrote:
>>>>>>> > > > >>>
>>>>>>> > > > >>>> Hi de vs,
>>>>>>> > > > >>>>
>>>>>>> > > > >>>> Would anyone like to discuss this FLIP? I'd appreciate your
>>>>>>> > feedback
>>>>>>> > > > >>>> and suggestions.
>>>>>>> > > > >>>>
>>>>>>> > > > >>>> Best,
>>>>>>> > > > >>>> Hongshun
>>>>>>> > > > >>>>
>>>>>>> > > > >>>>
>>>>>>> > > > >>>> 2025年8月13日 14:23,Hongshun Wang <[email protected] 
>>>>>>> > > > >>>> <mailto:[email protected]>> 写道:
>>>>>>> > > > >>>>
>>>>>>> > > > >>>> Hi Becket,
>>>>>>> > > > >>>>
>>>>>>> > > > >>>> Thank you for your detailed feedback. The new contract makes 
>>>>>>> > > > >>>> good
>>>>>>> > > > sense
>>>>>>> > > > >>>> to me and effectively addresses the issues I encountered at 
>>>>>>> > > > >>>> the
>>>>>>> > > > beginning
>>>>>>> > > > >>>> of the design.
>>>>>>> > > > >>>>
>>>>>>> > > > >>>> That said, I recommend not reporting splits by default, 
>>>>>>> > > > >>>> primarily
>>>>>>> > for
>>>>>>> > > > >>>> compatibility and practical reasons:
>>>>>>> > > > >>>>
>>>>>>> > > > >>>> >  For these reasons, we do not expect the Split objects to 
>>>>>>> > > > >>>> > be
>>>>>>> > huge,
>>>>>>> > > > >>>> and we are not trying to design for huge Split objects 
>>>>>>> > > > >>>> either as
>>>>>>> > they
>>>>>>> > > > will
>>>>>>> > > > >>>> have problems even today.
>>>>>>> > > > >>>>
>>>>>>> > > > >>>>    1.
>>>>>>> > > > >>>>
>>>>>>> > > > >>>>    Not all existing connector match this rule
>>>>>>> > > > >>>>    For example, in mysql cdc connector, a binlog split may 
>>>>>>> > > > >>>> contain
>>>>>>> > > > >>>>    hundreds (or even more) snapshot split completion 
>>>>>>> > > > >>>> records. This
>>>>>>> > > > state is
>>>>>>> > > > >>>>    large and is currently transmitted incrementally through
>>>>>>> > multiple
>>>>>>> > > > >>>>    BinlogSplitMetaEvent messages. Since the binlog reader 
>>>>>>> > > > >>>> operates
>>>>>>> > > > >>>>    with single parallelism, reporting the full split state on
>>>>>>> > recovery
>>>>>>> > > > >>>>    could be inefficient or even infeasible.
>>>>>>> > > > >>>>    For such sources, it would be better to provide a 
>>>>>>> > > > >>>> mechanism to
>>>>>>> > skip
>>>>>>> > > > >>>>    split reporting during restart until they redesign and 
>>>>>>> > > > >>>> reduce
>>>>>>> > the
>>>>>>> > > > >>>>    split size.
>>>>>>> > > > >>>>    2.
>>>>>>> > > > >>>>
>>>>>>> > > > >>>>    Not all enumerators maintain unassigned splits in state.
>>>>>>> > > > >>>>    Some SplitEnumerator(such as kafka connector) 
>>>>>>> > > > >>>> implementations
>>>>>>> > do
>>>>>>> > > > >>>>    not track or persistently manage unassigned splits. 
>>>>>>> > > > >>>> Requiring
>>>>>>> > them
>>>>>>> > > > to
>>>>>>> > > > >>>>    handle re-registration would add unnecessary complexity. 
>>>>>>> > > > >>>> Even
>>>>>>> > > > though we
>>>>>>> > > > >>>>    maybe implements in kafka connector, currently, kafka 
>>>>>>> > > > >>>> connector
>>>>>>> > is
>>>>>>> > > > decouple
>>>>>>> > > > >>>>    with flink version, we also need to make sure the elder 
>>>>>>> > > > >>>> version
>>>>>>> > is
>>>>>>> > > > >>>>    compatible.
>>>>>>> > > > >>>>
>>>>>>> > > > >>>> ------------------------------
>>>>>>> > > > >>>>
>>>>>>> > > > >>>> To address these concerns, I propose introducing a new 
>>>>>>> > > > >>>> method:
>>>>>>> > boolean
>>>>>>> > > > >>>> SourceReader#shouldReassignSplitsOnRecovery() with a default
>>>>>>> > > > >>>> implementation returning false. This allows source readers 
>>>>>>> > > > >>>> to opt
>>>>>>> > in
>>>>>>> > > > >>>> to split reassignment only when necessary. Since the new 
>>>>>>> > > > >>>> contract
>>>>>>> > > > already
>>>>>>> > > > >>>> places the responsibility for split assignment on the 
>>>>>>> > > > >>>> enumerator,
>>>>>>> > not
>>>>>>> > > > >>>> reporting splits by default is a safe and clean default 
>>>>>>> > > > >>>> behavior.
>>>>>>> > > > >>>>
>>>>>>> > > > >>>>
>>>>>>> > > > >>>> ------------------------------
>>>>>>> > > > >>>>
>>>>>>> > > > >>>>
>>>>>>> > > > >>>> I’ve updated the implementation and the FIP accordingly[1]. 
>>>>>>> > > > >>>> It
>>>>>>> > quite a
>>>>>>> > > > >>>> big change. In particular, for the Kafka connector, we can 
>>>>>>> > > > >>>> now use
>>>>>>> > a
>>>>>>> > > > >>>> pluggable SplitPartitioner to support different split 
>>>>>>> > > > >>>> assignment
>>>>>>> > > > >>>> strategies (e.g., default, round-robin).
>>>>>>> > > > >>>>
>>>>>>> > > > >>>>
>>>>>>> > > > >>>> Could you please review it when you have a chance?
>>>>>>> > > > >>>>
>>>>>>> > > > >>>>
>>>>>>> > > > >>>> Best,
>>>>>>> > > > >>>>
>>>>>>> > > > >>>> Hongshun
>>>>>>> > > > >>>>
>>>>>>> > > > >>>>
>>>>>>> > > > >>>> [1]
>>>>>>> > > > >>>>
>>>>>>> > > >
>>>>>>> >
>>>>>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>>>>>>> > > > >>>>
>>>>>>> > > > >>>> On Sat, Aug 9, 2025 at 3:03 AM Becket Qin <[email protected] 
>>>>>>> > > > >>>> <mailto:[email protected]>>
>>>>>>> > > > wrote:
>>>>>>> > > > >>>>
>>>>>>> > > > >>>>> Hi Hongshun,
>>>>>>> > > > >>>>>
>>>>>>> > > > >>>>> I am not too concerned about the transmission cost. Because 
>>>>>>> > > > >>>>> the
>>>>>>> > full
>>>>>>> > > > >>>>> split transmission has to happen in the initial assignment 
>>>>>>> > > > >>>>> phase
>>>>>>> > > > already.
>>>>>>> > > > >>>>> And in the future, we probably want to also introduce some 
>>>>>>> > > > >>>>> kind
>>>>>>> > of
>>>>>>> > > > workload
>>>>>>> > > > >>>>> balance across source readers, e.g. based on the per-split
>>>>>>> > > > throughput or
>>>>>>> > > > >>>>> the per-source-reader workload in heterogeneous clusters. 
>>>>>>> > > > >>>>> For
>>>>>>> > these
>>>>>>> > > > >>>>> reasons, we do not expect the Split objects to be huge, and 
>>>>>>> > > > >>>>> we
>>>>>>> > are
>>>>>>> > > > not
>>>>>>> > > > >>>>> trying to design for huge Split objects either as they will 
>>>>>>> > > > >>>>> have
>>>>>>> > > > problems
>>>>>>> > > > >>>>> even today.
>>>>>>> > > > >>>>>
>>>>>>> > > > >>>>> Good point on the potential split loss, please see the reply
>>>>>>> > below:
>>>>>>> > > > >>>>>
>>>>>>> > > > >>>>> Scenario 2:
>>>>>>> > > > >>>>>
>>>>>>> > > > >>>>>
>>>>>>> > > > >>>>>> 1. Reader A reports splits (1 and 2), and Reader B reports 
>>>>>>> > > > >>>>>> (3
>>>>>>> > and 4)
>>>>>>> > > > >>>>>> upon restart.
>>>>>>> > > > >>>>>> 2. Before the enumerator receives all reports and performs
>>>>>>> > > > >>>>>> reassignment, a checkpoint is triggered.
>>>>>>> > > > >>>>>> 3. Since no splits have been reassigned yet, both readers 
>>>>>>> > > > >>>>>> have
>>>>>>> > empty
>>>>>>> > > > >>>>>> states.
>>>>>>> > > > >>>>>> 4. When restarting from this checkpoint, all four splits 
>>>>>>> > > > >>>>>> are
>>>>>>> > lost.
>>>>>>> > > > >>>>>
>>>>>>> > > > >>>>> The reader registration happens in the 
>>>>>>> > > > >>>>> SourceOperator.open(),
>>>>>>> > which
>>>>>>> > > > >>>>> means the task is still in the initializing state, 
>>>>>>> > > > >>>>> therefore the
>>>>>>> > > > checkpoint
>>>>>>> > > > >>>>> should not be triggered until the enumerator receives all 
>>>>>>> > > > >>>>> the
>>>>>>> > split
>>>>>>> > > > reports.
>>>>>>> > > > >>>>>
>>>>>>> > > > >>>>> There is a nuance here. Today, the RPC call from the TM to 
>>>>>>> > > > >>>>> the JM
>>>>>>> > is
>>>>>>> > > > >>>>> async. So it is possible that the SourceOpertor.open() has
>>>>>>> > returned,
>>>>>>> > > > but
>>>>>>> > > > >>>>> the enumerator has not received the split reports. However,
>>>>>>> > because
>>>>>>> > > > the
>>>>>>> > > > >>>>> task status update RPC call goes to the same channel as the 
>>>>>>> > > > >>>>> split
>>>>>>> > > > reports
>>>>>>> > > > >>>>> call, so the task status RPC call will happen after the 
>>>>>>> > > > >>>>> split
>>>>>>> > > > reports call
>>>>>>> > > > >>>>> on the JM side. Therefore, on the JM side, the 
>>>>>>> > > > >>>>> SourceCoordinator
>>>>>>> > will
>>>>>>> > > > >>>>> always first receive the split reports, then receive the
>>>>>>> > checkpoint
>>>>>>> > > > request.
>>>>>>> > > > >>>>> This "happen before" relationship is kind of important to
>>>>>>> > guarantee
>>>>>>> > > > >>>>> the consistent state between enumerator and readers.
>>>>>>> > > > >>>>>
>>>>>>> > > > >>>>> Scenario 1:
>>>>>>> > > > >>>>>
>>>>>>> > > > >>>>>
>>>>>>> > > > >>>>>> 1. Upon restart, Reader A reports assigned splits (1 and 
>>>>>>> > > > >>>>>> 2), and
>>>>>>> > > > >>>>>> Reader B reports (3 and 4).
>>>>>>> > > > >>>>>> 2. The enumerator receives these reports but only reassigns
>>>>>>> > splits 1
>>>>>>> > > > >>>>>> and 2 — not 3 and 4.
>>>>>>> > > > >>>>>> 3. A checkpoint or savepoint is then triggered. Only 
>>>>>>> > > > >>>>>> splits 1
>>>>>>> > and 2
>>>>>>> > > > >>>>>> are recorded in the reader states; splits 3 and 4 are not
>>>>>>> > persisted.
>>>>>>> > > > >>>>>> 4. If the job is later restarted from this checkpoint, 
>>>>>>> > > > >>>>>> splits 3
>>>>>>> > and
>>>>>>> > > > 4
>>>>>>> > > > >>>>>> will be permanently lost.
>>>>>>> > > > >>>>>
>>>>>>> > > > >>>>> This scenario is possible. One solution is to let the 
>>>>>>> > > > >>>>> enumerator
>>>>>>> > > > >>>>> implementation handle this. That means if the enumerator 
>>>>>>> > > > >>>>> relies
>>>>>>> > on
>>>>>>> > > > the
>>>>>>> > > > >>>>> initial split reports from the source readers, it should 
>>>>>>> > > > >>>>> maintain
>>>>>>> > > > these
>>>>>>> > > > >>>>> reports by itself. In the above example, the enumerator 
>>>>>>> > > > >>>>> will need
>>>>>>> > to
>>>>>>> > > > >>>>> remember that 3 and 4 are not assigned and put it into its 
>>>>>>> > > > >>>>> own
>>>>>>> > state.
>>>>>>> > > > >>>>> The current contract is that anything assigned to the
>>>>>>> > SourceReaders
>>>>>>> > > > >>>>> are completely owned by the SourceReaders. Enumerators can
>>>>>>> > remember
>>>>>>> > > > the
>>>>>>> > > > >>>>> assignments but cannot change them, even when the source 
>>>>>>> > > > >>>>> reader
>>>>>>> > > > recovers /
>>>>>>> > > > >>>>> restarts.
>>>>>>> > > > >>>>> With this FLIP, the contract becomes that the source 
>>>>>>> > > > >>>>> readers will
>>>>>>> > > > >>>>> return the ownership of the splits to the enumerator. So the
>>>>>>> > > > enumerator is
>>>>>>> > > > >>>>> responsible for maintaining these splits, until they are 
>>>>>>> > > > >>>>> assigned
>>>>>>> > to
>>>>>>> > > > a
>>>>>>> > > > >>>>> source reader again.
>>>>>>> > > > >>>>>
>>>>>>> > > > >>>>> There are other cases where there may be conflict 
>>>>>>> > > > >>>>> information
>>>>>>> > between
>>>>>>> > > > >>>>> reader and enumerator. For example, consider the following
>>>>>>> > sequence:
>>>>>>> > > > >>>>> 1. reader A reports splits (1 and 2) up on restart.
>>>>>>> > > > >>>>> 2. enumerator receives the report and assigns both 1 and 2 
>>>>>>> > > > >>>>> to
>>>>>>> > reader
>>>>>>> > > > B.
>>>>>>> > > > >>>>> 3. reader A failed before checkpointing. And this is a 
>>>>>>> > > > >>>>> partial
>>>>>>> > > > >>>>> failure, so only reader A restarts.
>>>>>>> > > > >>>>> 4. When reader A recovers, it will again report splits (1 
>>>>>>> > > > >>>>> and 2)
>>>>>>> > to
>>>>>>> > > > >>>>> the enumerator.
>>>>>>> > > > >>>>> 5. The enumerator should ignore this report because it has
>>>>>>> > > > >>>>> assigned splits (1 and 2) to reader B.
>>>>>>> > > > >>>>>
>>>>>>> > > > >>>>> So with the new contract, the enumerator should be the 
>>>>>>> > > > >>>>> source of
>>>>>>> > > > truth
>>>>>>> > > > >>>>> for split ownership.
>>>>>>> > > > >>>>>
>>>>>>> > > > >>>>> Thanks,
>>>>>>> > > > >>>>>
>>>>>>> > > > >>>>> Jiangjie (Becket) Qin
>>>>>>> > > > >>>>>
>>>>>>> > > > >>>>> On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang <
>>>>>>> > > > [email protected] <mailto:[email protected]>>
>>>>>>> > > > >>>>> wrote:
>>>>>>> > > > >>>>>
>>>>>>> > > > >>>>>> Hi Becket,
>>>>>>> > > > >>>>>>
>>>>>>> > > > >>>>>>
>>>>>>> > > > >>>>>> I did consider this approach at the beginning (and it was 
>>>>>>> > > > >>>>>> also
>>>>>>> > > > >>>>>> mentioned in this FLIP), since it would allow more 
>>>>>>> > > > >>>>>> flexibility
>>>>>>> > in
>>>>>>> > > > >>>>>> reassigning all splits. However, there are a few potential
>>>>>>> > issues.
>>>>>>> > > > >>>>>>
>>>>>>> > > > >>>>>> 1. High Transmission Cost
>>>>>>> > > > >>>>>> If we pass the full split objects (rather than just split 
>>>>>>> > > > >>>>>> IDs),
>>>>>>> > the
>>>>>>> > > > >>>>>> data size could be significant, leading to high overhead 
>>>>>>> > > > >>>>>> during
>>>>>>> > > > >>>>>> transmission — especially when many splits are involved.
>>>>>>> > > > >>>>>>
>>>>>>> > > > >>>>>> 2. Risk of Split Loss
>>>>>>> > > > >>>>>>
>>>>>>> > > > >>>>>> Risk of split loss exists unless we have a mechanism to 
>>>>>>> > > > >>>>>> make
>>>>>>> > sure
>>>>>>> > > > >>>>>> only can checkpoint after all the splits are reassigned.
>>>>>>> > > > >>>>>> There are scenarios where splits could be lost due to
>>>>>>> > inconsistent
>>>>>>> > > > >>>>>> state handling during recovery:
>>>>>>> > > > >>>>>>
>>>>>>> > > > >>>>>>
>>>>>>> > > > >>>>>> Scenario 1:
>>>>>>> > > > >>>>>>
>>>>>>> > > > >>>>>>
>>>>>>> > > > >>>>>>    1. Upon restart, Reader A reports assigned splits (1 
>>>>>>> > > > >>>>>> and 2),
>>>>>>> > and
>>>>>>> > > > >>>>>>    Reader B reports (3 and 4).
>>>>>>> > > > >>>>>>    2. The enumerator receives these reports but only 
>>>>>>> > > > >>>>>> reassigns
>>>>>>> > > > >>>>>>    splits 1 and 2 — not 3 and 4.
>>>>>>> > > > >>>>>>    3. A checkpoint or savepoint is then triggered. Only 
>>>>>>> > > > >>>>>> splits 1
>>>>>>> > and
>>>>>>> > > > >>>>>>    2 are recorded in the reader states; splits 3 and 4 are 
>>>>>>> > > > >>>>>> not
>>>>>>> > > > persisted.
>>>>>>> > > > >>>>>>    4. If the job is later restarted from this checkpoint, 
>>>>>>> > > > >>>>>> splits
>>>>>>> > 3
>>>>>>> > > > >>>>>>    and 4 will be permanently lost.
>>>>>>> > > > >>>>>>
>>>>>>> > > > >>>>>>
>>>>>>> > > > >>>>>> Scenario 2:
>>>>>>> > > > >>>>>>
>>>>>>> > > > >>>>>>    1. Reader A reports splits (1 and 2), and Reader B 
>>>>>>> > > > >>>>>> reports (3
>>>>>>> > and
>>>>>>> > > > >>>>>>    4) upon restart.
>>>>>>> > > > >>>>>>    2. Before the enumerator receives all reports and 
>>>>>>> > > > >>>>>> performs
>>>>>>> > > > >>>>>>    reassignment, a checkpoint is triggered.
>>>>>>> > > > >>>>>>    3. Since no splits have been reassigned yet, both 
>>>>>>> > > > >>>>>> readers
>>>>>>> > have
>>>>>>> > > > >>>>>>    empty states.
>>>>>>> > > > >>>>>>    4. When restarting from this checkpoint, all four 
>>>>>>> > > > >>>>>> splits are
>>>>>>> > > > lost.
>>>>>>> > > > >>>>>>
>>>>>>> > > > >>>>>>
>>>>>>> > > > >>>>>> Let me know if you have thoughts on how we might mitigate 
>>>>>>> > > > >>>>>> these
>>>>>>> > > > risks!
>>>>>>> > > > >>>>>>
>>>>>>> > > > >>>>>> Best
>>>>>>> > > > >>>>>> Hongshun
>>>>>>> > > > >>>>>>
>>>>>>> > > > >>>>>> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin <[email protected] 
>>>>>>> > > > >>>>>> <mailto:[email protected]>>
>>>>>>> > > > >>>>>> wrote:
>>>>>>> > > > >>>>>>
>>>>>>> > > > >>>>>>> Hi Hongshun,
>>>>>>> > > > >>>>>>>
>>>>>>> > > > >>>>>>> The steps sound reasonable to me in general. In terms of 
>>>>>>> > > > >>>>>>> the
>>>>>>> > > > updated
>>>>>>> > > > >>>>>>> FLIP wiki, it would be good to see if we can keep the 
>>>>>>> > > > >>>>>>> protocol
>>>>>>> > > > simple. One
>>>>>>> > > > >>>>>>> alternative way to achieve this behavior is following:
>>>>>>> > > > >>>>>>>
>>>>>>> > > > >>>>>>> 1. Upon SourceOperator startup, the SourceOperator sends
>>>>>>> > > > >>>>>>> ReaderRegistrationEvent with the currently assigned 
>>>>>>> > > > >>>>>>> splits to
>>>>>>> > the
>>>>>>> > > > >>>>>>> enumerator. It does not add these splits to the 
>>>>>>> > > > >>>>>>> SourceReader.
>>>>>>> > > > >>>>>>> 2. The enumerator will always use the
>>>>>>> > > > >>>>>>> SourceEnumeratorContext.assignSplits() to assign the 
>>>>>>> > > > >>>>>>> splits.
>>>>>>> > (not
>>>>>>> > > > via the
>>>>>>> > > > >>>>>>> response of the SourceRegistrationEvent, this allows async
>>>>>>> > split
>>>>>>> > > > assignment
>>>>>>> > > > >>>>>>> in case the enumerator wants to wait until all the 
>>>>>>> > > > >>>>>>> readers are
>>>>>>> > > > registered)
>>>>>>> > > > >>>>>>> 3. The SourceOperator will only call 
>>>>>>> > > > >>>>>>> SourceReader.addSplits()
>>>>>>> > when
>>>>>>> > > > >>>>>>> it receives the AddSplitEvent from the enumerator.
>>>>>>> > > > >>>>>>>
>>>>>>> > > > >>>>>>> This protocol has a few benefits:
>>>>>>> > > > >>>>>>> 1. it basically allows arbitrary split reassignment upon
>>>>>>> > restart
>>>>>>> > > > >>>>>>> 2. simplicity: there is only one way to assign splits.
>>>>>>> > > > >>>>>>>
>>>>>>> > > > >>>>>>> So we only need one interface change:
>>>>>>> > > > >>>>>>> - add the initially assigned splits to ReaderInfo so the
>>>>>>> > Enumerator
>>>>>>> > > > >>>>>>> can access it.
>>>>>>> > > > >>>>>>> and one behavior change:
>>>>>>> > > > >>>>>>> - The SourceOperator should stop assigning splits to the 
>>>>>>> > > > >>>>>>> from
>>>>>>> > state
>>>>>>> > > > >>>>>>> restoration, but
>>>>>>> > [message truncated...]
>>>>>>> >
>>> 

Reply via email to