Hi Leonard,

If the SplitEnumerator received all splits after a restart, it becomes
straightforward to clear and un-assign the unmatched splits(checking
whether matches the source options). However, a key question arises: *should
automatically discard obsolete splits, or explicitly notify the user via an
exception?*

We provided a option `scan.partition-unsubscribe.strategy`:
1. If Strict, throws an exception when encountering removed splits.
2. If Lenient, automatically removes obsolete splits silently.

What Do you think?

Best,
Hongshun

On Thu, Oct 9, 2025 at 9:37 PM Leonard Xu <[email protected]> wrote:

> Thanks hongshun for the updating and pretty detailed analysis for edge
> cases,  the updated FLIP looks good to me now.
>
> Only last implementation details about scenario in motivation section:
>
> *Restart with Changed subscription: During restart, if source options
> remove a topic or table. The splits which have already assigned can not be
> removed.*
>
> Could you clarify how we resolve this in Kafka connector ?
>
> Best,
> Leonard
>
>
>
> 2025 10月 9 19:48,Hongshun Wang <[email protected]> 写道:
>
> Hi devs,
> If there are no further suggestions, I will start the voting tomorrow。
>
> Best,
> Hongshun
>
> On Fri, Sep 26, 2025 at 7:48 PM Hongshun Wang <[email protected]>
> wrote:
>
>> Hi Becket and Leonard,
>>
>> I have updated the content of this FLIP. The key point is that:
>>
>> When the split enumerator receives a split, *these splits must have
>> already existed in pendingSplitAssignment or assignedSplitments*.
>>
>>    - If the split is in pendingSplitAssignments, ignore it.
>>    - If the split is in assignedSplitAssignments but has a different
>>    taskId, ignore it (this indicates it was already assigned to another
>>    task).
>>    - If the split is in assignedSplitAssignments and shares the same
>>    taskId, move the assignment from assignedSplitments to 
>> pendingSplitAssignment
>>    to re-assign again.
>>
>>
>> For better understanding why use these strategies. I added some examples
>> and pictures to show it.
>>
>> Would you like to help me check whether there are still some problems?
>>
>> Best
>> Hongshun
>>
>>
>>
>> On Fri, Sep 26, 2025 at 5:08 PM Leonard Xu <[email protected]> wrote:
>>
>>> Thanks Becket and Hongshun for the insightful discussion.
>>>
>>> The underlying implementation and communication mechanisms of Flink
>>> Source indeed involve many intricate details, we discussed the issue of
>>> splits re-assignment in specific scenarios, but fortunately, the final
>>> decision turned out to be pretty clear.
>>>
>>>  +1 to Becket’s proposal to keeps the framework cleaner and more
>>> flexible.
>>> +1 to Hongshun’s point to provide comprehensive guidance for connector
>>> developers.
>>>
>>>
>>> Best,
>>> Leonard
>>>
>>>
>>>
>>> 2025 9月 26 16:30,Hongshun Wang <[email protected]> 写道:
>>>
>>> Hi Becket,
>>>
>>> I Got it. You’re suggesting we should not handle this in the source
>>> framework but instead let the split enumerator manage these three scenarios.
>>>
>>> Let me explain why I originally favored handling it in the framework:
>>> I'm concerned that connector developers might overlook certain edge cases
>>> (after all, we even payed extensive discussions to fully clarify the logic)
>>>
>>> However, your point keeps the framework cleaner and more flexible. Thus,
>>> I will take it.
>>>
>>> Perhaps, in this FLIP, we should focus on providing comprehensive
>>> guidance for connector developers: explain how to implement a split
>>> enumerator, including the underlying challenges and their solutions.
>>>
>>>
>>> Additionally, we can use the Kafka connector as a reference
>>> implementation to demonstrate the practical steps. This way, developers who
>>> want to implement similar connectors can directly reference this example.
>>>
>>>
>>> Best,
>>> Hongshun
>>>
>>>
>>>
>>> On Fri, Sep 26, 2025 at 1:27 PM Becket Qin <[email protected]> wrote:
>>>
>>>> It would be good to not expose runtime details to the source
>>>> implementation if possible.
>>>>
>>>> Today, the split enumerator implementations are expected to track the
>>>> split assignment.
>>>>
>>>> Assuming the split enumerator implementation keeps a split assignment
>>>> map, that means the enumerator should already know whether a split is
>>>> assigned or unassigned. So it can handle the three scenarios you mentioned.
>>>>
>>>> The split is reported by a reader during a global restoration.
>>>>>
>>>> The split enumerator should have just been restored / created. If the
>>>> enumerator expects a full reassignment of splits up on global recovery,
>>>> there should be no assigned splits to that reader in the split assignment
>>>> mapping.
>>>>
>>>> The split is reported by a reader during a partial failure recovery.
>>>>>
>>>> In this case, when SplitEnumerator.addReader() is invoked, the split
>>>> assignment map in the enumerator implementation should already have some
>>>> split assignments for the reader. Therefore it is a partial failover. If
>>>> the source supports split reassignment on recovery, the enumerator can
>>>> assign splits that are different from the reported assignment of that
>>>> reader in the SplitEnumeratorContext, or it can also assign the same
>>>> splits. In any case, the enumerator knows that this is a partial recovery
>>>> because the assignment map is non-empty.
>>>>
>>>> The split is not reported by a reader, but is assigned after the last
>>>>> successful checkpoint and was never acknowledged.
>>>>
>>>> This is actually one of the step in the partial failure recover.
>>>> SplitEnumerator.addSplitsBack() will be called first before
>>>> SplitReader.addReader() is called for the recovered reader. When the
>>>> SplitEnumerator.addSplitsBack() is invoked, it is for sure a partial
>>>> recovery. And the enumerator should remove these splits from the split
>>>> assignment map as if they were never assigned.
>>>>
>>>> I think this should work, right?
>>>>
>>>> Thanks,
>>>>
>>>> Jiangjie (Becket) Qin
>>>>
>>>> On Thu, Sep 25, 2025 at 8:34 PM Hongshun Wang <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Becket and Leonard,
>>>>>
>>>>> Thanks for your advice.
>>>>>
>>>>> > put all the reader information in the SplitEnumerator context
>>>>> I have a concern: the current registeredReaders in*
>>>>> SourceCoordinatorContext will be removed after subtaskResetor execution on
>>>>> failure*.However, this approach has merit.
>>>>>
>>>>> One more situation I found my previous design does not cover:
>>>>>
>>>>>    1. Initial state: Reader A reports splits (1, 2).
>>>>>    2. Enumerator action: Assigns split 1 to Reader A, and split 2 to
>>>>>    Reader B.
>>>>>    3. Failure scenario: Reader A fails before checkpointing. Since
>>>>>    this is a partial failure, only Reader A restarts.
>>>>>    4. Recovery issue: Upon recovery, Reader A re-reports split (1).
>>>>>
>>>>> In my previous design, the enumerator will ignore Reader A's
>>>>> re-registration which will cause data loss.
>>>>>
>>>>> Thus, when the enumerator receives a split, the split may originate
>>>>> from three scenarios:
>>>>>
>>>>>    1. The split is reported by a reader during a global restoration.
>>>>>    2. The split is reported by a reader during a partial failure
>>>>>    recovery.
>>>>>    3. The split is not reported by a reader, but is assigned after
>>>>>    the last successful checkpoint and was never acknowledged.
>>>>>
>>>>> In the first scenario (global restore), the split should
>>>>> be re-distributed. For the latter two scenarios (partial failover and
>>>>> post-checkpoint assignment), we need to reassign the split to
>>>>> its originally assigned subtask.
>>>>>
>>>>> By implementing a method in the SplitEnumerator context to track each
>>>>> assigned split's status, the system can correctly identify and resolve
>>>>> split ownership in all three scenarios.*What about adding a
>>>>> `SplitRecoveryType splitRecoveryType(Split split)` in
>>>>> SplitEnumeratorContext.* SplitRecoveryTypeis a enum including
>>>>> `UNASSIGNED`、`GLOBAL_RESTORE`、`PARTIAL_FAILOVER` and
>>>>> `POST_CHECKPOINT_ASSIGNMENT`.
>>>>>
>>>>> What do you think? Are there any details or scenarios I haven't
>>>>> considered? Looking forward to your advice.
>>>>>
>>>>> Best,
>>>>> Hongshun
>>>>>
>>>>> On Thu, Sep 11, 2025 at 12:41 AM Becket Qin <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Thanks for the explanation, Hongshun.
>>>>>>
>>>>>> Current pattern of handling new reader registration following:
>>>>>> 1. put all the reader information in the SplitEnumerator context
>>>>>> 2. notify the enumerator about the new reader registration.
>>>>>> 3. Let the split enumerator get whatever information it wants from the
>>>>>> context and do its job.
>>>>>> This pattern decouples the information passing and the reader
>>>>>> registration
>>>>>> notification. This makes the API extensible - we can add more
>>>>>> information
>>>>>> (e.g. reported assigned splits in our case) about the reader to the
>>>>>> context
>>>>>> without introducing new methods.
>>>>>>
>>>>>> Introducing a new method of addSplitBackOnRecovery() is redundant to
>>>>>> the
>>>>>> above pattern. Do we really need it?
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Jiangjie (Becket) Qin
>>>>>>
>>>>>> On Mon, Sep 8, 2025 at 8:18 PM Hongshun Wang <[email protected]
>>>>>> >
>>>>>> wrote:
>>>>>>
>>>>>> > Hi Becket,
>>>>>> >
>>>>>> > > I am curious what would the enumerator do differently for the
>>>>>> splits
>>>>>> > added via addSplitsBackOnRecovery() V.S. addSplitsBack()?
>>>>>> >
>>>>>> >  In this FLIP, there are two distinct scenarios in which the
>>>>>> enumerator
>>>>>> > receives splits being added back:
>>>>>> > 1.  Job-level restore: The job is restored,  splits from reader’s
>>>>>> state are
>>>>>> > reported by ReaderRegistrationEvent.
>>>>>> > 2.  Reader-level restart: a reader is started but not the whole
>>>>>> job,
>>>>>> >  splits assigned to it after the last successful checkpoint. This
>>>>>> is what
>>>>>> > addSplitsBack used to do.
>>>>>> >
>>>>>> >
>>>>>> > In these two situations, the enumerator will choose different
>>>>>> strategies.
>>>>>> > 1. Job-level restore: the splits should be redistributed across
>>>>>> readers
>>>>>> > according to the current partitioner strategy.
>>>>>> > 2. Reader-level restart: the splits should be reassigned directly
>>>>>> back to
>>>>>> > the same reader they were originally assigned to, preserving
>>>>>> locality and
>>>>>> > avoiding unnecessary redistribution
>>>>>> >
>>>>>> > Therefore, the enumerator must clearly distinguish between these two
>>>>>> > scenarios.I used to deprecate the former  addSplitsBack(List<SplitT>
>>>>>> > splits, int subtaskId) but add a new addSplitsBack(List<SplitT>
>>>>>> > splits, int subtaskId,
>>>>>> > boolean reportedByReader).
>>>>>> > Leonard suggest to use another method addSplitsBackOnRecovery but
>>>>>> not
>>>>>> > influenced  currently addSplitsBack.
>>>>>> >
>>>>>> > Best
>>>>>> > Hongshun
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > On 2025/09/08 17:20:31 Becket Qin wrote:
>>>>>> > > Hi Leonard,
>>>>>> > >
>>>>>> > >
>>>>>> > > > Could we introduce a new method like addSplitsBackOnRecovery
>>>>>> with
>>>>>> > default
>>>>>> > > > implementation. In this way, we can provide better backward
>>>>>> > compatibility
>>>>>> > > > and also makes it easier for developers to understand.
>>>>>> > >
>>>>>> > >
>>>>>> > > I am curious what would the enumerator do differently for the
>>>>>> splits
>>>>>> > added
>>>>>> > > via addSplitsBackOnRecovery() V.S. addSplitsBack()?  Today,
>>>>>> > addSplitsBack()
>>>>>> > > is also only called upon recovery. So the new method seems
>>>>>> confusing. One
>>>>>> > > thing worth clarifying is if the Source implements
>>>>>> > > SupportSplitReassignmentOnRecovery, upon recovery, should the
>>>>>> splits
>>>>>> > > reported by the readers also be added back to the SplitEnumerator
>>>>>> via the
>>>>>> > > addSplitsBack() call? Or should the SplitEnumerator explicitly
>>>>>> query the
>>>>>> > > registered reader information via the SplitEnumeratorContext to
>>>>>> get the
>>>>>> > > originally assigned splits when addReader() is invoked? I was
>>>>>> assuming
>>>>>> > the
>>>>>> > > latter in the beginning, so the behavior of addSplitsBack()
>>>>>> remains
>>>>>> > > unchanged, but I am not opposed in doing the former.
>>>>>> > >
>>>>>> > > Also, can you elaborate on the backwards compatibility issue you
>>>>>> see if
>>>>>> > we
>>>>>> > > do not have a separate addSplitsBackOnRecovery() method? Even
>>>>>> without
>>>>>> > this
>>>>>> > > new method, the behavior remains exactly the same unless the end
>>>>>> users
>>>>>> > > implement the mix-in interface of
>>>>>> "SupportSplitReassignmentOnRecovery",
>>>>>> > > right?
>>>>>> > >
>>>>>> > > Thanks,
>>>>>> > >
>>>>>> > > Jiangjie (Becket) Qin
>>>>>> > >
>>>>>> > > On Mon, Sep 8, 2025 at 1:48 AM Hongshun Wang <[email protected]>
>>>>>> > > wrote:
>>>>>> > >
>>>>>> > > > Hi devs,
>>>>>> > > >
>>>>>> > > > It has been quite some time since this FLIP[1] was first
>>>>>> proposed.
>>>>>> > Thank
>>>>>> > > > you for your valuable feedback—based on your suggestions, the
>>>>>> FLIP has
>>>>>> > > > undergone several rounds of revisions.
>>>>>> > > >
>>>>>> > > > Any more advice is welcome and appreciated. If there are no
>>>>>> further
>>>>>> > > > concerns, I plan to start the vote tomorrow.
>>>>>> > > >
>>>>>> > > > Best
>>>>>> > > > Hongshun
>>>>>> > > >
>>>>>> > > > [1]
>>>>>> > > >
>>>>>> >
>>>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=373886480
>>>>>> > > >
>>>>>> > > > On Mon, Sep 8, 2025 at 4:42 PM Hongshun Wang <[email protected]>
>>>>>> > > > wrote:
>>>>>> > > >
>>>>>> > > > > Hi Leonard,
>>>>>> > > > > Thanks for your advice.  It makes sense and I have modified
>>>>>> it.
>>>>>> > > > >
>>>>>> > > > > Best,
>>>>>> > > > > Hongshun
>>>>>> > > > >
>>>>>> > > > > On Mon, Sep 8, 2025 at 11:40 AM Leonard Xu <[email protected]>
>>>>>> wrote:
>>>>>> > > > >
>>>>>> > > > >> Thanks Hongshun and Becket for the deep discussion.
>>>>>> > > > >>
>>>>>> > > > >> I only have one comment for one API design:
>>>>>> > > > >>
>>>>>> > > > >> > Deprecate the old addSplitsBack  method, use a
>>>>>> addSplitsBack with
>>>>>> > > > >> param isReportedByReader instead. Because, The enumerator
>>>>>> can apply
>>>>>> > > > >> different reassignment policies based on the context.
>>>>>> > > > >>
>>>>>> > > > >> Could we introduce a new method like
>>>>>> *addSplitsBackOnRecovery*  with
>>>>>> > > > default
>>>>>> > > > >> implementation. In this way, we can provide better backward
>>>>>> > > > >> compatibility and also makes it easier for developers to
>>>>>> understand.
>>>>>> > > > >>
>>>>>> > > > >> Best,
>>>>>> > > > >> Leonard
>>>>>> > > > >>
>>>>>> > > > >>
>>>>>> > > > >>
>>>>>> > > > >> 2025 9月 3 20:26,Hongshun Wang <[email protected]> 写道:
>>>>>> > > > >>
>>>>>> > > > >> Hi Becket,
>>>>>> > > > >>
>>>>>> > > > >> I think that's a great idea!  I have added the
>>>>>> > > > >> SupportSplitReassignmentOnRecovery interface in this FLIP.
>>>>>> If a
>>>>>> > Source
>>>>>> > > > >> implements this interface indicates that the source operator
>>>>>> needs
>>>>>> > to
>>>>>> > > > >> report splits to the enumerator and receive reassignment.[1]
>>>>>> > > > >>
>>>>>> > > > >> Best,
>>>>>> > > > >> Hongshun
>>>>>> > > > >>
>>>>>> > > > >> [1]
>>>>>> > > > >>
>>>>>> > > >
>>>>>> >
>>>>>> >
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>>>>>> > > > >>
>>>>>> > > > >> On Thu, Aug 21, 2025 at 12:09 PM Becket Qin <[email protected]
>>>>>> >
>>>>>> > > > wrote:
>>>>>> > > > >>
>>>>>> > > > >>> Hi Hongshun,
>>>>>> > > > >>>
>>>>>> > > > >>> I think the convention for such optional features in Source
>>>>>> is via
>>>>>> > > > >>> mix-in interfaces. So instead of adding a method to the
>>>>>> > SourceReader,
>>>>>> > > > maybe
>>>>>> > > > >>> we should introduce an interface
>>>>>> SupportSplitReassingmentOnRecovery
>>>>>> > > > with
>>>>>> > > > >>> this method. If a Source implementation implements that
>>>>>> interface,
>>>>>> > > > then the
>>>>>> > > > >>> SourceOperator will check the desired behavior and act
>>>>>> accordingly.
>>>>>> > > > >>>
>>>>>> > > > >>> Thanks,
>>>>>> > > > >>>
>>>>>> > > > >>> Jiangjie (Becket) Qin
>>>>>> > > > >>>
>>>>>> > > > >>> On Wed, Aug 20, 2025 at 8:52 PM Hongshun Wang <
>>>>>> > [email protected]
>>>>>> > > > >
>>>>>> > > > >>> wrote:
>>>>>> > > > >>>
>>>>>> > > > >>>> Hi de vs,
>>>>>> > > > >>>>
>>>>>> > > > >>>> Would anyone like to discuss this FLIP? I'd appreciate your
>>>>>> > feedback
>>>>>> > > > >>>> and suggestions.
>>>>>> > > > >>>>
>>>>>> > > > >>>> Best,
>>>>>> > > > >>>> Hongshun
>>>>>> > > > >>>>
>>>>>> > > > >>>>
>>>>>> > > > >>>> 2025年8月13日 14:23,Hongshun Wang <[email protected]> 写道:
>>>>>> > > > >>>>
>>>>>> > > > >>>> Hi Becket,
>>>>>> > > > >>>>
>>>>>> > > > >>>> Thank you for your detailed feedback. The new contract
>>>>>> makes good
>>>>>> > > > sense
>>>>>> > > > >>>> to me and effectively addresses the issues I encountered
>>>>>> at the
>>>>>> > > > beginning
>>>>>> > > > >>>> of the design.
>>>>>> > > > >>>>
>>>>>> > > > >>>> That said, I recommend not reporting splits by default,
>>>>>> primarily
>>>>>> > for
>>>>>> > > > >>>> compatibility and practical reasons:
>>>>>> > > > >>>>
>>>>>> > > > >>>> >  For these reasons, we do not expect the Split objects
>>>>>> to be
>>>>>> > huge,
>>>>>> > > > >>>> and we are not trying to design for huge Split objects
>>>>>> either as
>>>>>> > they
>>>>>> > > > will
>>>>>> > > > >>>> have problems even today.
>>>>>> > > > >>>>
>>>>>> > > > >>>>    1.
>>>>>> > > > >>>>
>>>>>> > > > >>>>    Not all existing connector match this rule
>>>>>> > > > >>>>    For example, in mysql cdc connector, a binlog split may
>>>>>> contain
>>>>>> > > > >>>>    hundreds (or even more) snapshot split completion
>>>>>> records. This
>>>>>> > > > state is
>>>>>> > > > >>>>    large and is currently transmitted incrementally through
>>>>>> > multiple
>>>>>> > > > >>>>    BinlogSplitMetaEvent messages. Since the binlog reader
>>>>>> operates
>>>>>> > > > >>>>    with single parallelism, reporting the full split state
>>>>>> on
>>>>>> > recovery
>>>>>> > > > >>>>    could be inefficient or even infeasible.
>>>>>> > > > >>>>    For such sources, it would be better to provide a
>>>>>> mechanism to
>>>>>> > skip
>>>>>> > > > >>>>    split reporting during restart until they redesign and
>>>>>> reduce
>>>>>> > the
>>>>>> > > > >>>>    split size.
>>>>>> > > > >>>>    2.
>>>>>> > > > >>>>
>>>>>> > > > >>>>    Not all enumerators maintain unassigned splits in state.
>>>>>> > > > >>>>    Some SplitEnumerator(such as kafka connector)
>>>>>> implementations
>>>>>> > do
>>>>>> > > > >>>>    not track or persistently manage unassigned splits.
>>>>>> Requiring
>>>>>> > them
>>>>>> > > > to
>>>>>> > > > >>>>    handle re-registration would add unnecessary
>>>>>> complexity. Even
>>>>>> > > > though we
>>>>>> > > > >>>>    maybe implements in kafka connector, currently, kafka
>>>>>> connector
>>>>>> > is
>>>>>> > > > decouple
>>>>>> > > > >>>>    with flink version, we also need to make sure the elder
>>>>>> version
>>>>>> > is
>>>>>> > > > >>>>    compatible.
>>>>>> > > > >>>>
>>>>>> > > > >>>> ------------------------------
>>>>>> > > > >>>>
>>>>>> > > > >>>> To address these concerns, I propose introducing a new
>>>>>> method:
>>>>>> > boolean
>>>>>> > > > >>>> SourceReader#shouldReassignSplitsOnRecovery() with a
>>>>>> default
>>>>>> > > > >>>> implementation returning false. This allows source readers
>>>>>> to opt
>>>>>> > in
>>>>>> > > > >>>> to split reassignment only when necessary. Since the new
>>>>>> contract
>>>>>> > > > already
>>>>>> > > > >>>> places the responsibility for split assignment on the
>>>>>> enumerator,
>>>>>> > not
>>>>>> > > > >>>> reporting splits by default is a safe and clean default
>>>>>> behavior.
>>>>>> > > > >>>>
>>>>>> > > > >>>>
>>>>>> > > > >>>> ------------------------------
>>>>>> > > > >>>>
>>>>>> > > > >>>>
>>>>>> > > > >>>> I’ve updated the implementation and the FIP
>>>>>> accordingly[1]. It
>>>>>> > quite a
>>>>>> > > > >>>> big change. In particular, for the Kafka connector, we can
>>>>>> now use
>>>>>> > a
>>>>>> > > > >>>> pluggable SplitPartitioner to support different split
>>>>>> assignment
>>>>>> > > > >>>> strategies (e.g., default, round-robin).
>>>>>> > > > >>>>
>>>>>> > > > >>>>
>>>>>> > > > >>>> Could you please review it when you have a chance?
>>>>>> > > > >>>>
>>>>>> > > > >>>>
>>>>>> > > > >>>> Best,
>>>>>> > > > >>>>
>>>>>> > > > >>>> Hongshun
>>>>>> > > > >>>>
>>>>>> > > > >>>>
>>>>>> > > > >>>> [1]
>>>>>> > > > >>>>
>>>>>> > > >
>>>>>> >
>>>>>> >
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>>>>>> > > > >>>>
>>>>>> > > > >>>> On Sat, Aug 9, 2025 at 3:03 AM Becket Qin <[email protected]
>>>>>> >
>>>>>> > > > wrote:
>>>>>> > > > >>>>
>>>>>> > > > >>>>> Hi Hongshun,
>>>>>> > > > >>>>>
>>>>>> > > > >>>>> I am not too concerned about the transmission cost.
>>>>>> Because the
>>>>>> > full
>>>>>> > > > >>>>> split transmission has to happen in the initial
>>>>>> assignment phase
>>>>>> > > > already.
>>>>>> > > > >>>>> And in the future, we probably want to also introduce
>>>>>> some kind
>>>>>> > of
>>>>>> > > > workload
>>>>>> > > > >>>>> balance across source readers, e.g. based on the per-split
>>>>>> > > > throughput or
>>>>>> > > > >>>>> the per-source-reader workload in heterogeneous clusters.
>>>>>> For
>>>>>> > these
>>>>>> > > > >>>>> reasons, we do not expect the Split objects to be huge,
>>>>>> and we
>>>>>> > are
>>>>>> > > > not
>>>>>> > > > >>>>> trying to design for huge Split objects either as they
>>>>>> will have
>>>>>> > > > problems
>>>>>> > > > >>>>> even today.
>>>>>> > > > >>>>>
>>>>>> > > > >>>>> Good point on the potential split loss, please see the
>>>>>> reply
>>>>>> > below:
>>>>>> > > > >>>>>
>>>>>> > > > >>>>> Scenario 2:
>>>>>> > > > >>>>>
>>>>>> > > > >>>>>
>>>>>> > > > >>>>>> 1. Reader A reports splits (1 and 2), and Reader B
>>>>>> reports (3
>>>>>> > and 4)
>>>>>> > > > >>>>>> upon restart.
>>>>>> > > > >>>>>> 2. Before the enumerator receives all reports and
>>>>>> performs
>>>>>> > > > >>>>>> reassignment, a checkpoint is triggered.
>>>>>> > > > >>>>>> 3. Since no splits have been reassigned yet, both
>>>>>> readers have
>>>>>> > empty
>>>>>> > > > >>>>>> states.
>>>>>> > > > >>>>>> 4. When restarting from this checkpoint, all four splits
>>>>>> are
>>>>>> > lost.
>>>>>> > > > >>>>>
>>>>>> > > > >>>>> The reader registration happens in the
>>>>>> SourceOperator.open(),
>>>>>> > which
>>>>>> > > > >>>>> means the task is still in the initializing state,
>>>>>> therefore the
>>>>>> > > > checkpoint
>>>>>> > > > >>>>> should not be triggered until the enumerator receives all
>>>>>> the
>>>>>> > split
>>>>>> > > > reports.
>>>>>> > > > >>>>>
>>>>>> > > > >>>>> There is a nuance here. Today, the RPC call from the TM
>>>>>> to the JM
>>>>>> > is
>>>>>> > > > >>>>> async. So it is possible that the SourceOpertor.open() has
>>>>>> > returned,
>>>>>> > > > but
>>>>>> > > > >>>>> the enumerator has not received the split reports.
>>>>>> However,
>>>>>> > because
>>>>>> > > > the
>>>>>> > > > >>>>> task status update RPC call goes to the same channel as
>>>>>> the split
>>>>>> > > > reports
>>>>>> > > > >>>>> call, so the task status RPC call will happen after the
>>>>>> split
>>>>>> > > > reports call
>>>>>> > > > >>>>> on the JM side. Therefore, on the JM side, the
>>>>>> SourceCoordinator
>>>>>> > will
>>>>>> > > > >>>>> always first receive the split reports, then receive the
>>>>>> > checkpoint
>>>>>> > > > request.
>>>>>> > > > >>>>> This "happen before" relationship is kind of important to
>>>>>> > guarantee
>>>>>> > > > >>>>> the consistent state between enumerator and readers.
>>>>>> > > > >>>>>
>>>>>> > > > >>>>> Scenario 1:
>>>>>> > > > >>>>>
>>>>>> > > > >>>>>
>>>>>> > > > >>>>>> 1. Upon restart, Reader A reports assigned splits (1 and
>>>>>> 2), and
>>>>>> > > > >>>>>> Reader B reports (3 and 4).
>>>>>> > > > >>>>>> 2. The enumerator receives these reports but only
>>>>>> reassigns
>>>>>> > splits 1
>>>>>> > > > >>>>>> and 2 — not 3 and 4.
>>>>>> > > > >>>>>> 3. A checkpoint or savepoint is then triggered. Only
>>>>>> splits 1
>>>>>> > and 2
>>>>>> > > > >>>>>> are recorded in the reader states; splits 3 and 4 are not
>>>>>> > persisted.
>>>>>> > > > >>>>>> 4. If the job is later restarted from this checkpoint,
>>>>>> splits 3
>>>>>> > and
>>>>>> > > > 4
>>>>>> > > > >>>>>> will be permanently lost.
>>>>>> > > > >>>>>
>>>>>> > > > >>>>> This scenario is possible. One solution is to let the
>>>>>> enumerator
>>>>>> > > > >>>>> implementation handle this. That means if the enumerator
>>>>>> relies
>>>>>> > on
>>>>>> > > > the
>>>>>> > > > >>>>> initial split reports from the source readers, it should
>>>>>> maintain
>>>>>> > > > these
>>>>>> > > > >>>>> reports by itself. In the above example, the enumerator
>>>>>> will need
>>>>>> > to
>>>>>> > > > >>>>> remember that 3 and 4 are not assigned and put it into
>>>>>> its own
>>>>>> > state.
>>>>>> > > > >>>>> The current contract is that anything assigned to the
>>>>>> > SourceReaders
>>>>>> > > > >>>>> are completely owned by the SourceReaders. Enumerators can
>>>>>> > remember
>>>>>> > > > the
>>>>>> > > > >>>>> assignments but cannot change them, even when the source
>>>>>> reader
>>>>>> > > > recovers /
>>>>>> > > > >>>>> restarts.
>>>>>> > > > >>>>> With this FLIP, the contract becomes that the source
>>>>>> readers will
>>>>>> > > > >>>>> return the ownership of the splits to the enumerator. So
>>>>>> the
>>>>>> > > > enumerator is
>>>>>> > > > >>>>> responsible for maintaining these splits, until they are
>>>>>> assigned
>>>>>> > to
>>>>>> > > > a
>>>>>> > > > >>>>> source reader again.
>>>>>> > > > >>>>>
>>>>>> > > > >>>>> There are other cases where there may be conflict
>>>>>> information
>>>>>> > between
>>>>>> > > > >>>>> reader and enumerator. For example, consider the following
>>>>>> > sequence:
>>>>>> > > > >>>>> 1. reader A reports splits (1 and 2) up on restart.
>>>>>> > > > >>>>> 2. enumerator receives the report and assigns both 1 and
>>>>>> 2 to
>>>>>> > reader
>>>>>> > > > B.
>>>>>> > > > >>>>> 3. reader A failed before checkpointing. And this is a
>>>>>> partial
>>>>>> > > > >>>>> failure, so only reader A restarts.
>>>>>> > > > >>>>> 4. When reader A recovers, it will again report splits (1
>>>>>> and 2)
>>>>>> > to
>>>>>> > > > >>>>> the enumerator.
>>>>>> > > > >>>>> 5. The enumerator should ignore this report because it has
>>>>>> > > > >>>>> assigned splits (1 and 2) to reader B.
>>>>>> > > > >>>>>
>>>>>> > > > >>>>> So with the new contract, the enumerator should be the
>>>>>> source of
>>>>>> > > > truth
>>>>>> > > > >>>>> for split ownership.
>>>>>> > > > >>>>>
>>>>>> > > > >>>>> Thanks,
>>>>>> > > > >>>>>
>>>>>> > > > >>>>> Jiangjie (Becket) Qin
>>>>>> > > > >>>>>
>>>>>> > > > >>>>> On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang <
>>>>>> > > > [email protected]>
>>>>>> > > > >>>>> wrote:
>>>>>> > > > >>>>>
>>>>>> > > > >>>>>> Hi Becket,
>>>>>> > > > >>>>>>
>>>>>> > > > >>>>>>
>>>>>> > > > >>>>>> I did consider this approach at the beginning (and it
>>>>>> was also
>>>>>> > > > >>>>>> mentioned in this FLIP), since it would allow more
>>>>>> flexibility
>>>>>> > in
>>>>>> > > > >>>>>> reassigning all splits. However, there are a few
>>>>>> potential
>>>>>> > issues.
>>>>>> > > > >>>>>>
>>>>>> > > > >>>>>> 1. High Transmission Cost
>>>>>> > > > >>>>>> If we pass the full split objects (rather than just
>>>>>> split IDs),
>>>>>> > the
>>>>>> > > > >>>>>> data size could be significant, leading to high overhead
>>>>>> during
>>>>>> > > > >>>>>> transmission — especially when many splits are involved.
>>>>>> > > > >>>>>>
>>>>>> > > > >>>>>> 2. Risk of Split Loss
>>>>>> > > > >>>>>>
>>>>>> > > > >>>>>> Risk of split loss exists unless we have a mechanism to
>>>>>> make
>>>>>> > sure
>>>>>> > > > >>>>>> only can checkpoint after all the splits are reassigned.
>>>>>> > > > >>>>>> There are scenarios where splits could be lost due to
>>>>>> > inconsistent
>>>>>> > > > >>>>>> state handling during recovery:
>>>>>> > > > >>>>>>
>>>>>> > > > >>>>>>
>>>>>> > > > >>>>>> Scenario 1:
>>>>>> > > > >>>>>>
>>>>>> > > > >>>>>>
>>>>>> > > > >>>>>>    1. Upon restart, Reader A reports assigned splits (1
>>>>>> and 2),
>>>>>> > and
>>>>>> > > > >>>>>>    Reader B reports (3 and 4).
>>>>>> > > > >>>>>>    2. The enumerator receives these reports but only
>>>>>> reassigns
>>>>>> > > > >>>>>>    splits 1 and 2 — not 3 and 4.
>>>>>> > > > >>>>>>    3. A checkpoint or savepoint is then triggered. Only
>>>>>> splits 1
>>>>>> > and
>>>>>> > > > >>>>>>    2 are recorded in the reader states; splits 3 and 4
>>>>>> are not
>>>>>> > > > persisted.
>>>>>> > > > >>>>>>    4. If the job is later restarted from this
>>>>>> checkpoint, splits
>>>>>> > 3
>>>>>> > > > >>>>>>    and 4 will be permanently lost.
>>>>>> > > > >>>>>>
>>>>>> > > > >>>>>>
>>>>>> > > > >>>>>> Scenario 2:
>>>>>> > > > >>>>>>
>>>>>> > > > >>>>>>    1. Reader A reports splits (1 and 2), and Reader B
>>>>>> reports (3
>>>>>> > and
>>>>>> > > > >>>>>>    4) upon restart.
>>>>>> > > > >>>>>>    2. Before the enumerator receives all reports and
>>>>>> performs
>>>>>> > > > >>>>>>    reassignment, a checkpoint is triggered.
>>>>>> > > > >>>>>>    3. Since no splits have been reassigned yet, both
>>>>>> readers
>>>>>> > have
>>>>>> > > > >>>>>>    empty states.
>>>>>> > > > >>>>>>    4. When restarting from this checkpoint, all four
>>>>>> splits are
>>>>>> > > > lost.
>>>>>> > > > >>>>>>
>>>>>> > > > >>>>>>
>>>>>> > > > >>>>>> Let me know if you have thoughts on how we might
>>>>>> mitigate these
>>>>>> > > > risks!
>>>>>> > > > >>>>>>
>>>>>> > > > >>>>>> Best
>>>>>> > > > >>>>>> Hongshun
>>>>>> > > > >>>>>>
>>>>>> > > > >>>>>> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin <
>>>>>> [email protected]>
>>>>>> > > > >>>>>> wrote:
>>>>>> > > > >>>>>>
>>>>>> > > > >>>>>>> Hi Hongshun,
>>>>>> > > > >>>>>>>
>>>>>> > > > >>>>>>> The steps sound reasonable to me in general. In terms
>>>>>> of the
>>>>>> > > > updated
>>>>>> > > > >>>>>>> FLIP wiki, it would be good to see if we can keep the
>>>>>> protocol
>>>>>> > > > simple. One
>>>>>> > > > >>>>>>> alternative way to achieve this behavior is following:
>>>>>> > > > >>>>>>>
>>>>>> > > > >>>>>>> 1. Upon SourceOperator startup, the SourceOperator sends
>>>>>> > > > >>>>>>> ReaderRegistrationEvent with the currently assigned
>>>>>> splits to
>>>>>> > the
>>>>>> > > > >>>>>>> enumerator. It does not add these splits to the
>>>>>> SourceReader.
>>>>>> > > > >>>>>>> 2. The enumerator will always use the
>>>>>> > > > >>>>>>> SourceEnumeratorContext.assignSplits() to assign the
>>>>>> splits.
>>>>>> > (not
>>>>>> > > > via the
>>>>>> > > > >>>>>>> response of the SourceRegistrationEvent, this allows
>>>>>> async
>>>>>> > split
>>>>>> > > > assignment
>>>>>> > > > >>>>>>> in case the enumerator wants to wait until all the
>>>>>> readers are
>>>>>> > > > registered)
>>>>>> > > > >>>>>>> 3. The SourceOperator will only call
>>>>>> SourceReader.addSplits()
>>>>>> > when
>>>>>> > > > >>>>>>> it receives the AddSplitEvent from the enumerator.
>>>>>> > > > >>>>>>>
>>>>>> > > > >>>>>>> This protocol has a few benefits:
>>>>>> > > > >>>>>>> 1. it basically allows arbitrary split reassignment upon
>>>>>> > restart
>>>>>> > > > >>>>>>> 2. simplicity: there is only one way to assign splits.
>>>>>> > > > >>>>>>>
>>>>>> > > > >>>>>>> So we only need one interface change:
>>>>>> > > > >>>>>>> - add the initially assigned splits to ReaderInfo so the
>>>>>> > Enumerator
>>>>>> > > > >>>>>>> can access it.
>>>>>> > > > >>>>>>> and one behavior change:
>>>>>> > > > >>>>>>> - The SourceOperator should stop assigning splits to
>>>>>> the from
>>>>>> > state
>>>>>> > > > >>>>>>> restoration, but
>>>>>> > [message truncated...]
>>>>>> >
>>>>>>
>>>>>
>>>
>

Reply via email to