Hi Hongshun,

Thanks for updating the FLIP. A quick question: why do we need to change
the behavior of addSplitsBack()? Should it remain the same?

Regarding the case of restart with changed subscription. I think the only
correct behavior is removing obsolete splits without any warning /
exception. It is OK to add an info level logging if we want to. It is a
clear intention if the user has explicitly changed subscription and
restarted the job. There is no need to add a config to double confirm.

Regards,

Jiangjie (Becket) Qin

On Thu, Oct 9, 2025 at 7:28 PM Hongshun Wang <[email protected]>
wrote:

> Hi Leonard,
>
> If the SplitEnumerator received all splits after a restart, it becomes
> straightforward to clear and un-assign the unmatched splits(checking
> whether matches the source options). However, a key question arises: *should
> automatically discard obsolete splits, or explicitly notify the user via an
> exception?*
>
> We provided a option `scan.partition-unsubscribe.strategy`:
> 1. If Strict, throws an exception when encountering removed splits.
> 2. If Lenient, automatically removes obsolete splits silently.
>
> What Do you think?
>
> Best,
> Hongshun
>
> On Thu, Oct 9, 2025 at 9:37 PM Leonard Xu <[email protected]> wrote:
>
>> Thanks hongshun for the updating and pretty detailed analysis for edge
>> cases,  the updated FLIP looks good to me now.
>>
>> Only last implementation details about scenario in motivation section:
>>
>> *Restart with Changed subscription: During restart, if source options
>> remove a topic or table. The splits which have already assigned can not be
>> removed.*
>>
>> Could you clarify how we resolve this in Kafka connector ?
>>
>> Best,
>> Leonard
>>
>>
>>
>> 2025 10月 9 19:48,Hongshun Wang <[email protected]> 写道:
>>
>> Hi devs,
>> If there are no further suggestions, I will start the voting tomorrow。
>>
>> Best,
>> Hongshun
>>
>> On Fri, Sep 26, 2025 at 7:48 PM Hongshun Wang <[email protected]>
>> wrote:
>>
>>> Hi Becket and Leonard,
>>>
>>> I have updated the content of this FLIP. The key point is that:
>>>
>>> When the split enumerator receives a split, *these splits must have
>>> already existed in pendingSplitAssignment or assignedSplitments*.
>>>
>>>    - If the split is in pendingSplitAssignments, ignore it.
>>>    - If the split is in assignedSplitAssignments but has a different
>>>    taskId, ignore it (this indicates it was already assigned to another
>>>    task).
>>>    - If the split is in assignedSplitAssignments and shares the same
>>>    taskId, move the assignment from assignedSplitments to 
>>> pendingSplitAssignment
>>>    to re-assign again.
>>>
>>>
>>> For better understanding why use these strategies. I added some examples
>>> and pictures to show it.
>>>
>>> Would you like to help me check whether there are still some problems?
>>>
>>> Best
>>> Hongshun
>>>
>>>
>>>
>>> On Fri, Sep 26, 2025 at 5:08 PM Leonard Xu <[email protected]> wrote:
>>>
>>>> Thanks Becket and Hongshun for the insightful discussion.
>>>>
>>>> The underlying implementation and communication mechanisms of Flink
>>>> Source indeed involve many intricate details, we discussed the issue of
>>>> splits re-assignment in specific scenarios, but fortunately, the final
>>>> decision turned out to be pretty clear.
>>>>
>>>>  +1 to Becket’s proposal to keeps the framework cleaner and more
>>>> flexible.
>>>> +1 to Hongshun’s point to provide comprehensive guidance for connector
>>>> developers.
>>>>
>>>>
>>>> Best,
>>>> Leonard
>>>>
>>>>
>>>>
>>>> 2025 9月 26 16:30,Hongshun Wang <[email protected]> 写道:
>>>>
>>>> Hi Becket,
>>>>
>>>> I Got it. You’re suggesting we should not handle this in the source
>>>> framework but instead let the split enumerator manage these three 
>>>> scenarios.
>>>>
>>>> Let me explain why I originally favored handling it in the framework:
>>>> I'm concerned that connector developers might overlook certain edge cases
>>>> (after all, we even payed extensive discussions to fully clarify the logic)
>>>>
>>>> However, your point keeps the framework cleaner and more flexible.
>>>> Thus, I will take it.
>>>>
>>>> Perhaps, in this FLIP, we should focus on providing comprehensive
>>>> guidance for connector developers: explain how to implement a split
>>>> enumerator, including the underlying challenges and their solutions.
>>>>
>>>>
>>>> Additionally, we can use the Kafka connector as a reference
>>>> implementation to demonstrate the practical steps. This way, developers who
>>>> want to implement similar connectors can directly reference this example.
>>>>
>>>>
>>>> Best,
>>>> Hongshun
>>>>
>>>>
>>>>
>>>> On Fri, Sep 26, 2025 at 1:27 PM Becket Qin <[email protected]>
>>>> wrote:
>>>>
>>>>> It would be good to not expose runtime details to the source
>>>>> implementation if possible.
>>>>>
>>>>> Today, the split enumerator implementations are expected to track the
>>>>> split assignment.
>>>>>
>>>>> Assuming the split enumerator implementation keeps a split assignment
>>>>> map, that means the enumerator should already know whether a split is
>>>>> assigned or unassigned. So it can handle the three scenarios you 
>>>>> mentioned.
>>>>>
>>>>> The split is reported by a reader during a global restoration.
>>>>>>
>>>>> The split enumerator should have just been restored / created. If the
>>>>> enumerator expects a full reassignment of splits up on global recovery,
>>>>> there should be no assigned splits to that reader in the split assignment
>>>>> mapping.
>>>>>
>>>>> The split is reported by a reader during a partial failure recovery.
>>>>>>
>>>>> In this case, when SplitEnumerator.addReader() is invoked, the split
>>>>> assignment map in the enumerator implementation should already have some
>>>>> split assignments for the reader. Therefore it is a partial failover. If
>>>>> the source supports split reassignment on recovery, the enumerator can
>>>>> assign splits that are different from the reported assignment of that
>>>>> reader in the SplitEnumeratorContext, or it can also assign the same
>>>>> splits. In any case, the enumerator knows that this is a partial recovery
>>>>> because the assignment map is non-empty.
>>>>>
>>>>> The split is not reported by a reader, but is assigned after the last
>>>>>> successful checkpoint and was never acknowledged.
>>>>>
>>>>> This is actually one of the step in the partial failure recover.
>>>>> SplitEnumerator.addSplitsBack() will be called first before
>>>>> SplitReader.addReader() is called for the recovered reader. When the
>>>>> SplitEnumerator.addSplitsBack() is invoked, it is for sure a partial
>>>>> recovery. And the enumerator should remove these splits from the split
>>>>> assignment map as if they were never assigned.
>>>>>
>>>>> I think this should work, right?
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Jiangjie (Becket) Qin
>>>>>
>>>>> On Thu, Sep 25, 2025 at 8:34 PM Hongshun Wang <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Becket and Leonard,
>>>>>>
>>>>>> Thanks for your advice.
>>>>>>
>>>>>> > put all the reader information in the SplitEnumerator context
>>>>>> I have a concern: the current registeredReaders in*
>>>>>> SourceCoordinatorContext will be removed after subtaskResetor execution 
>>>>>> on
>>>>>> failure*.However, this approach has merit.
>>>>>>
>>>>>> One more situation I found my previous design does not cover:
>>>>>>
>>>>>>    1. Initial state: Reader A reports splits (1, 2).
>>>>>>    2. Enumerator action: Assigns split 1 to Reader A, and split 2 to
>>>>>>    Reader B.
>>>>>>    3. Failure scenario: Reader A fails before checkpointing. Since
>>>>>>    this is a partial failure, only Reader A restarts.
>>>>>>    4. Recovery issue: Upon recovery, Reader A re-reports split (1).
>>>>>>
>>>>>> In my previous design, the enumerator will ignore Reader A's
>>>>>> re-registration which will cause data loss.
>>>>>>
>>>>>> Thus, when the enumerator receives a split, the split may originate
>>>>>> from three scenarios:
>>>>>>
>>>>>>    1. The split is reported by a reader during a global restoration.
>>>>>>    2. The split is reported by a reader during a partial failure
>>>>>>    recovery.
>>>>>>    3. The split is not reported by a reader, but is assigned after
>>>>>>    the last successful checkpoint and was never acknowledged.
>>>>>>
>>>>>> In the first scenario (global restore), the split should
>>>>>> be re-distributed. For the latter two scenarios (partial failover and
>>>>>> post-checkpoint assignment), we need to reassign the split to
>>>>>> its originally assigned subtask.
>>>>>>
>>>>>> By implementing a method in the SplitEnumerator context to track each
>>>>>> assigned split's status, the system can correctly identify and resolve
>>>>>> split ownership in all three scenarios.*What about adding a
>>>>>> `SplitRecoveryType splitRecoveryType(Split split)` in
>>>>>> SplitEnumeratorContext.* SplitRecoveryTypeis a enum including
>>>>>> `UNASSIGNED`、`GLOBAL_RESTORE`、`PARTIAL_FAILOVER` and
>>>>>> `POST_CHECKPOINT_ASSIGNMENT`.
>>>>>>
>>>>>> What do you think? Are there any details or scenarios I haven't
>>>>>> considered? Looking forward to your advice.
>>>>>>
>>>>>> Best,
>>>>>> Hongshun
>>>>>>
>>>>>> On Thu, Sep 11, 2025 at 12:41 AM Becket Qin <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for the explanation, Hongshun.
>>>>>>>
>>>>>>> Current pattern of handling new reader registration following:
>>>>>>> 1. put all the reader information in the SplitEnumerator context
>>>>>>> 2. notify the enumerator about the new reader registration.
>>>>>>> 3. Let the split enumerator get whatever information it wants from
>>>>>>> the
>>>>>>> context and do its job.
>>>>>>> This pattern decouples the information passing and the reader
>>>>>>> registration
>>>>>>> notification. This makes the API extensible - we can add more
>>>>>>> information
>>>>>>> (e.g. reported assigned splits in our case) about the reader to the
>>>>>>> context
>>>>>>> without introducing new methods.
>>>>>>>
>>>>>>> Introducing a new method of addSplitBackOnRecovery() is redundant to
>>>>>>> the
>>>>>>> above pattern. Do we really need it?
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Jiangjie (Becket) Qin
>>>>>>>
>>>>>>> On Mon, Sep 8, 2025 at 8:18 PM Hongshun Wang <
>>>>>>> [email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>> > Hi Becket,
>>>>>>> >
>>>>>>> > > I am curious what would the enumerator do differently for the
>>>>>>> splits
>>>>>>> > added via addSplitsBackOnRecovery() V.S. addSplitsBack()?
>>>>>>> >
>>>>>>> >  In this FLIP, there are two distinct scenarios in which the
>>>>>>> enumerator
>>>>>>> > receives splits being added back:
>>>>>>> > 1.  Job-level restore: The job is restored,  splits from reader’s
>>>>>>> state are
>>>>>>> > reported by ReaderRegistrationEvent.
>>>>>>> > 2.  Reader-level restart: a reader is started but not the whole
>>>>>>> job,
>>>>>>> >  splits assigned to it after the last successful checkpoint. This
>>>>>>> is what
>>>>>>> > addSplitsBack used to do.
>>>>>>> >
>>>>>>> >
>>>>>>> > In these two situations, the enumerator will choose different
>>>>>>> strategies.
>>>>>>> > 1. Job-level restore: the splits should be redistributed across
>>>>>>> readers
>>>>>>> > according to the current partitioner strategy.
>>>>>>> > 2. Reader-level restart: the splits should be reassigned directly
>>>>>>> back to
>>>>>>> > the same reader they were originally assigned to, preserving
>>>>>>> locality and
>>>>>>> > avoiding unnecessary redistribution
>>>>>>> >
>>>>>>> > Therefore, the enumerator must clearly distinguish between these
>>>>>>> two
>>>>>>> > scenarios.I used to deprecate the former
>>>>>>> addSplitsBack(List<SplitT>
>>>>>>> > splits, int subtaskId) but add a new addSplitsBack(List<SplitT>
>>>>>>> > splits, int subtaskId,
>>>>>>> > boolean reportedByReader).
>>>>>>> > Leonard suggest to use another method addSplitsBackOnRecovery but
>>>>>>> not
>>>>>>> > influenced  currently addSplitsBack.
>>>>>>> >
>>>>>>> > Best
>>>>>>> > Hongshun
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> > On 2025/09/08 17:20:31 Becket Qin wrote:
>>>>>>> > > Hi Leonard,
>>>>>>> > >
>>>>>>> > >
>>>>>>> > > > Could we introduce a new method like addSplitsBackOnRecovery
>>>>>>> with
>>>>>>> > default
>>>>>>> > > > implementation. In this way, we can provide better backward
>>>>>>> > compatibility
>>>>>>> > > > and also makes it easier for developers to understand.
>>>>>>> > >
>>>>>>> > >
>>>>>>> > > I am curious what would the enumerator do differently for the
>>>>>>> splits
>>>>>>> > added
>>>>>>> > > via addSplitsBackOnRecovery() V.S. addSplitsBack()?  Today,
>>>>>>> > addSplitsBack()
>>>>>>> > > is also only called upon recovery. So the new method seems
>>>>>>> confusing. One
>>>>>>> > > thing worth clarifying is if the Source implements
>>>>>>> > > SupportSplitReassignmentOnRecovery, upon recovery, should the
>>>>>>> splits
>>>>>>> > > reported by the readers also be added back to the
>>>>>>> SplitEnumerator via the
>>>>>>> > > addSplitsBack() call? Or should the SplitEnumerator explicitly
>>>>>>> query the
>>>>>>> > > registered reader information via the SplitEnumeratorContext to
>>>>>>> get the
>>>>>>> > > originally assigned splits when addReader() is invoked? I was
>>>>>>> assuming
>>>>>>> > the
>>>>>>> > > latter in the beginning, so the behavior of addSplitsBack()
>>>>>>> remains
>>>>>>> > > unchanged, but I am not opposed in doing the former.
>>>>>>> > >
>>>>>>> > > Also, can you elaborate on the backwards compatibility issue you
>>>>>>> see if
>>>>>>> > we
>>>>>>> > > do not have a separate addSplitsBackOnRecovery() method? Even
>>>>>>> without
>>>>>>> > this
>>>>>>> > > new method, the behavior remains exactly the same unless the end
>>>>>>> users
>>>>>>> > > implement the mix-in interface of
>>>>>>> "SupportSplitReassignmentOnRecovery",
>>>>>>> > > right?
>>>>>>> > >
>>>>>>> > > Thanks,
>>>>>>> > >
>>>>>>> > > Jiangjie (Becket) Qin
>>>>>>> > >
>>>>>>> > > On Mon, Sep 8, 2025 at 1:48 AM Hongshun Wang <[email protected]>
>>>>>>> > > wrote:
>>>>>>> > >
>>>>>>> > > > Hi devs,
>>>>>>> > > >
>>>>>>> > > > It has been quite some time since this FLIP[1] was first
>>>>>>> proposed.
>>>>>>> > Thank
>>>>>>> > > > you for your valuable feedback—based on your suggestions, the
>>>>>>> FLIP has
>>>>>>> > > > undergone several rounds of revisions.
>>>>>>> > > >
>>>>>>> > > > Any more advice is welcome and appreciated. If there are no
>>>>>>> further
>>>>>>> > > > concerns, I plan to start the vote tomorrow.
>>>>>>> > > >
>>>>>>> > > > Best
>>>>>>> > > > Hongshun
>>>>>>> > > >
>>>>>>> > > > [1]
>>>>>>> > > >
>>>>>>> >
>>>>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=373886480
>>>>>>> > > >
>>>>>>> > > > On Mon, Sep 8, 2025 at 4:42 PM Hongshun Wang <[email protected]>
>>>>>>> > > > wrote:
>>>>>>> > > >
>>>>>>> > > > > Hi Leonard,
>>>>>>> > > > > Thanks for your advice.  It makes sense and I have modified
>>>>>>> it.
>>>>>>> > > > >
>>>>>>> > > > > Best,
>>>>>>> > > > > Hongshun
>>>>>>> > > > >
>>>>>>> > > > > On Mon, Sep 8, 2025 at 11:40 AM Leonard Xu <[email protected]>
>>>>>>> wrote:
>>>>>>> > > > >
>>>>>>> > > > >> Thanks Hongshun and Becket for the deep discussion.
>>>>>>> > > > >>
>>>>>>> > > > >> I only have one comment for one API design:
>>>>>>> > > > >>
>>>>>>> > > > >> > Deprecate the old addSplitsBack  method, use a
>>>>>>> addSplitsBack with
>>>>>>> > > > >> param isReportedByReader instead. Because, The enumerator
>>>>>>> can apply
>>>>>>> > > > >> different reassignment policies based on the context.
>>>>>>> > > > >>
>>>>>>> > > > >> Could we introduce a new method like
>>>>>>> *addSplitsBackOnRecovery*  with
>>>>>>> > > > default
>>>>>>> > > > >> implementation. In this way, we can provide better backward
>>>>>>> > > > >> compatibility and also makes it easier for developers to
>>>>>>> understand.
>>>>>>> > > > >>
>>>>>>> > > > >> Best,
>>>>>>> > > > >> Leonard
>>>>>>> > > > >>
>>>>>>> > > > >>
>>>>>>> > > > >>
>>>>>>> > > > >> 2025 9月 3 20:26,Hongshun Wang <[email protected]> 写道:
>>>>>>> > > > >>
>>>>>>> > > > >> Hi Becket,
>>>>>>> > > > >>
>>>>>>> > > > >> I think that's a great idea!  I have added the
>>>>>>> > > > >> SupportSplitReassignmentOnRecovery interface in this FLIP.
>>>>>>> If a
>>>>>>> > Source
>>>>>>> > > > >> implements this interface indicates that the source
>>>>>>> operator needs
>>>>>>> > to
>>>>>>> > > > >> report splits to the enumerator and receive reassignment.[1]
>>>>>>> > > > >>
>>>>>>> > > > >> Best,
>>>>>>> > > > >> Hongshun
>>>>>>> > > > >>
>>>>>>> > > > >> [1]
>>>>>>> > > > >>
>>>>>>> > > >
>>>>>>> >
>>>>>>> >
>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>>>>>>> > > > >>
>>>>>>> > > > >> On Thu, Aug 21, 2025 at 12:09 PM Becket Qin <
>>>>>>> [email protected]>
>>>>>>> > > > wrote:
>>>>>>> > > > >>
>>>>>>> > > > >>> Hi Hongshun,
>>>>>>> > > > >>>
>>>>>>> > > > >>> I think the convention for such optional features in
>>>>>>> Source is via
>>>>>>> > > > >>> mix-in interfaces. So instead of adding a method to the
>>>>>>> > SourceReader,
>>>>>>> > > > maybe
>>>>>>> > > > >>> we should introduce an interface
>>>>>>> SupportSplitReassingmentOnRecovery
>>>>>>> > > > with
>>>>>>> > > > >>> this method. If a Source implementation implements that
>>>>>>> interface,
>>>>>>> > > > then the
>>>>>>> > > > >>> SourceOperator will check the desired behavior and act
>>>>>>> accordingly.
>>>>>>> > > > >>>
>>>>>>> > > > >>> Thanks,
>>>>>>> > > > >>>
>>>>>>> > > > >>> Jiangjie (Becket) Qin
>>>>>>> > > > >>>
>>>>>>> > > > >>> On Wed, Aug 20, 2025 at 8:52 PM Hongshun Wang <
>>>>>>> > [email protected]
>>>>>>> > > > >
>>>>>>> > > > >>> wrote:
>>>>>>> > > > >>>
>>>>>>> > > > >>>> Hi de vs,
>>>>>>> > > > >>>>
>>>>>>> > > > >>>> Would anyone like to discuss this FLIP? I'd appreciate
>>>>>>> your
>>>>>>> > feedback
>>>>>>> > > > >>>> and suggestions.
>>>>>>> > > > >>>>
>>>>>>> > > > >>>> Best,
>>>>>>> > > > >>>> Hongshun
>>>>>>> > > > >>>>
>>>>>>> > > > >>>>
>>>>>>> > > > >>>> 2025年8月13日 14:23,Hongshun Wang <[email protected]> 写道:
>>>>>>> > > > >>>>
>>>>>>> > > > >>>> Hi Becket,
>>>>>>> > > > >>>>
>>>>>>> > > > >>>> Thank you for your detailed feedback. The new contract
>>>>>>> makes good
>>>>>>> > > > sense
>>>>>>> > > > >>>> to me and effectively addresses the issues I encountered
>>>>>>> at the
>>>>>>> > > > beginning
>>>>>>> > > > >>>> of the design.
>>>>>>> > > > >>>>
>>>>>>> > > > >>>> That said, I recommend not reporting splits by default,
>>>>>>> primarily
>>>>>>> > for
>>>>>>> > > > >>>> compatibility and practical reasons:
>>>>>>> > > > >>>>
>>>>>>> > > > >>>> >  For these reasons, we do not expect the Split objects
>>>>>>> to be
>>>>>>> > huge,
>>>>>>> > > > >>>> and we are not trying to design for huge Split objects
>>>>>>> either as
>>>>>>> > they
>>>>>>> > > > will
>>>>>>> > > > >>>> have problems even today.
>>>>>>> > > > >>>>
>>>>>>> > > > >>>>    1.
>>>>>>> > > > >>>>
>>>>>>> > > > >>>>    Not all existing connector match this rule
>>>>>>> > > > >>>>    For example, in mysql cdc connector, a binlog split
>>>>>>> may contain
>>>>>>> > > > >>>>    hundreds (or even more) snapshot split completion
>>>>>>> records. This
>>>>>>> > > > state is
>>>>>>> > > > >>>>    large and is currently transmitted incrementally
>>>>>>> through
>>>>>>> > multiple
>>>>>>> > > > >>>>    BinlogSplitMetaEvent messages. Since the binlog reader
>>>>>>> operates
>>>>>>> > > > >>>>    with single parallelism, reporting the full split
>>>>>>> state on
>>>>>>> > recovery
>>>>>>> > > > >>>>    could be inefficient or even infeasible.
>>>>>>> > > > >>>>    For such sources, it would be better to provide a
>>>>>>> mechanism to
>>>>>>> > skip
>>>>>>> > > > >>>>    split reporting during restart until they redesign and
>>>>>>> reduce
>>>>>>> > the
>>>>>>> > > > >>>>    split size.
>>>>>>> > > > >>>>    2.
>>>>>>> > > > >>>>
>>>>>>> > > > >>>>    Not all enumerators maintain unassigned splits in
>>>>>>> state.
>>>>>>> > > > >>>>    Some SplitEnumerator(such as kafka connector)
>>>>>>> implementations
>>>>>>> > do
>>>>>>> > > > >>>>    not track or persistently manage unassigned splits.
>>>>>>> Requiring
>>>>>>> > them
>>>>>>> > > > to
>>>>>>> > > > >>>>    handle re-registration would add unnecessary
>>>>>>> complexity. Even
>>>>>>> > > > though we
>>>>>>> > > > >>>>    maybe implements in kafka connector, currently, kafka
>>>>>>> connector
>>>>>>> > is
>>>>>>> > > > decouple
>>>>>>> > > > >>>>    with flink version, we also need to make sure the
>>>>>>> elder version
>>>>>>> > is
>>>>>>> > > > >>>>    compatible.
>>>>>>> > > > >>>>
>>>>>>> > > > >>>> ------------------------------
>>>>>>> > > > >>>>
>>>>>>> > > > >>>> To address these concerns, I propose introducing a new
>>>>>>> method:
>>>>>>> > boolean
>>>>>>> > > > >>>> SourceReader#shouldReassignSplitsOnRecovery() with a
>>>>>>> default
>>>>>>> > > > >>>> implementation returning false. This allows source
>>>>>>> readers to opt
>>>>>>> > in
>>>>>>> > > > >>>> to split reassignment only when necessary. Since the new
>>>>>>> contract
>>>>>>> > > > already
>>>>>>> > > > >>>> places the responsibility for split assignment on the
>>>>>>> enumerator,
>>>>>>> > not
>>>>>>> > > > >>>> reporting splits by default is a safe and clean default
>>>>>>> behavior.
>>>>>>> > > > >>>>
>>>>>>> > > > >>>>
>>>>>>> > > > >>>> ------------------------------
>>>>>>> > > > >>>>
>>>>>>> > > > >>>>
>>>>>>> > > > >>>> I’ve updated the implementation and the FIP
>>>>>>> accordingly[1]. It
>>>>>>> > quite a
>>>>>>> > > > >>>> big change. In particular, for the Kafka connector, we
>>>>>>> can now use
>>>>>>> > a
>>>>>>> > > > >>>> pluggable SplitPartitioner to support different split
>>>>>>> assignment
>>>>>>> > > > >>>> strategies (e.g., default, round-robin).
>>>>>>> > > > >>>>
>>>>>>> > > > >>>>
>>>>>>> > > > >>>> Could you please review it when you have a chance?
>>>>>>> > > > >>>>
>>>>>>> > > > >>>>
>>>>>>> > > > >>>> Best,
>>>>>>> > > > >>>>
>>>>>>> > > > >>>> Hongshun
>>>>>>> > > > >>>>
>>>>>>> > > > >>>>
>>>>>>> > > > >>>> [1]
>>>>>>> > > > >>>>
>>>>>>> > > >
>>>>>>> >
>>>>>>> >
>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>>>>>>> > > > >>>>
>>>>>>> > > > >>>> On Sat, Aug 9, 2025 at 3:03 AM Becket Qin <
>>>>>>> [email protected]>
>>>>>>> > > > wrote:
>>>>>>> > > > >>>>
>>>>>>> > > > >>>>> Hi Hongshun,
>>>>>>> > > > >>>>>
>>>>>>> > > > >>>>> I am not too concerned about the transmission cost.
>>>>>>> Because the
>>>>>>> > full
>>>>>>> > > > >>>>> split transmission has to happen in the initial
>>>>>>> assignment phase
>>>>>>> > > > already.
>>>>>>> > > > >>>>> And in the future, we probably want to also introduce
>>>>>>> some kind
>>>>>>> > of
>>>>>>> > > > workload
>>>>>>> > > > >>>>> balance across source readers, e.g. based on the
>>>>>>> per-split
>>>>>>> > > > throughput or
>>>>>>> > > > >>>>> the per-source-reader workload in heterogeneous
>>>>>>> clusters. For
>>>>>>> > these
>>>>>>> > > > >>>>> reasons, we do not expect the Split objects to be huge,
>>>>>>> and we
>>>>>>> > are
>>>>>>> > > > not
>>>>>>> > > > >>>>> trying to design for huge Split objects either as they
>>>>>>> will have
>>>>>>> > > > problems
>>>>>>> > > > >>>>> even today.
>>>>>>> > > > >>>>>
>>>>>>> > > > >>>>> Good point on the potential split loss, please see the
>>>>>>> reply
>>>>>>> > below:
>>>>>>> > > > >>>>>
>>>>>>> > > > >>>>> Scenario 2:
>>>>>>> > > > >>>>>
>>>>>>> > > > >>>>>
>>>>>>> > > > >>>>>> 1. Reader A reports splits (1 and 2), and Reader B
>>>>>>> reports (3
>>>>>>> > and 4)
>>>>>>> > > > >>>>>> upon restart.
>>>>>>> > > > >>>>>> 2. Before the enumerator receives all reports and
>>>>>>> performs
>>>>>>> > > > >>>>>> reassignment, a checkpoint is triggered.
>>>>>>> > > > >>>>>> 3. Since no splits have been reassigned yet, both
>>>>>>> readers have
>>>>>>> > empty
>>>>>>> > > > >>>>>> states.
>>>>>>> > > > >>>>>> 4. When restarting from this checkpoint, all four
>>>>>>> splits are
>>>>>>> > lost.
>>>>>>> > > > >>>>>
>>>>>>> > > > >>>>> The reader registration happens in the
>>>>>>> SourceOperator.open(),
>>>>>>> > which
>>>>>>> > > > >>>>> means the task is still in the initializing state,
>>>>>>> therefore the
>>>>>>> > > > checkpoint
>>>>>>> > > > >>>>> should not be triggered until the enumerator receives
>>>>>>> all the
>>>>>>> > split
>>>>>>> > > > reports.
>>>>>>> > > > >>>>>
>>>>>>> > > > >>>>> There is a nuance here. Today, the RPC call from the TM
>>>>>>> to the JM
>>>>>>> > is
>>>>>>> > > > >>>>> async. So it is possible that the SourceOpertor.open()
>>>>>>> has
>>>>>>> > returned,
>>>>>>> > > > but
>>>>>>> > > > >>>>> the enumerator has not received the split reports.
>>>>>>> However,
>>>>>>> > because
>>>>>>> > > > the
>>>>>>> > > > >>>>> task status update RPC call goes to the same channel as
>>>>>>> the split
>>>>>>> > > > reports
>>>>>>> > > > >>>>> call, so the task status RPC call will happen after the
>>>>>>> split
>>>>>>> > > > reports call
>>>>>>> > > > >>>>> on the JM side. Therefore, on the JM side, the
>>>>>>> SourceCoordinator
>>>>>>> > will
>>>>>>> > > > >>>>> always first receive the split reports, then receive the
>>>>>>> > checkpoint
>>>>>>> > > > request.
>>>>>>> > > > >>>>> This "happen before" relationship is kind of important to
>>>>>>> > guarantee
>>>>>>> > > > >>>>> the consistent state between enumerator and readers.
>>>>>>> > > > >>>>>
>>>>>>> > > > >>>>> Scenario 1:
>>>>>>> > > > >>>>>
>>>>>>> > > > >>>>>
>>>>>>> > > > >>>>>> 1. Upon restart, Reader A reports assigned splits (1
>>>>>>> and 2), and
>>>>>>> > > > >>>>>> Reader B reports (3 and 4).
>>>>>>> > > > >>>>>> 2. The enumerator receives these reports but only
>>>>>>> reassigns
>>>>>>> > splits 1
>>>>>>> > > > >>>>>> and 2 — not 3 and 4.
>>>>>>> > > > >>>>>> 3. A checkpoint or savepoint is then triggered. Only
>>>>>>> splits 1
>>>>>>> > and 2
>>>>>>> > > > >>>>>> are recorded in the reader states; splits 3 and 4 are
>>>>>>> not
>>>>>>> > persisted.
>>>>>>> > > > >>>>>> 4. If the job is later restarted from this checkpoint,
>>>>>>> splits 3
>>>>>>> > and
>>>>>>> > > > 4
>>>>>>> > > > >>>>>> will be permanently lost.
>>>>>>> > > > >>>>>
>>>>>>> > > > >>>>> This scenario is possible. One solution is to let the
>>>>>>> enumerator
>>>>>>> > > > >>>>> implementation handle this. That means if the enumerator
>>>>>>> relies
>>>>>>> > on
>>>>>>> > > > the
>>>>>>> > > > >>>>> initial split reports from the source readers, it should
>>>>>>> maintain
>>>>>>> > > > these
>>>>>>> > > > >>>>> reports by itself. In the above example, the enumerator
>>>>>>> will need
>>>>>>> > to
>>>>>>> > > > >>>>> remember that 3 and 4 are not assigned and put it into
>>>>>>> its own
>>>>>>> > state.
>>>>>>> > > > >>>>> The current contract is that anything assigned to the
>>>>>>> > SourceReaders
>>>>>>> > > > >>>>> are completely owned by the SourceReaders. Enumerators
>>>>>>> can
>>>>>>> > remember
>>>>>>> > > > the
>>>>>>> > > > >>>>> assignments but cannot change them, even when the source
>>>>>>> reader
>>>>>>> > > > recovers /
>>>>>>> > > > >>>>> restarts.
>>>>>>> > > > >>>>> With this FLIP, the contract becomes that the source
>>>>>>> readers will
>>>>>>> > > > >>>>> return the ownership of the splits to the enumerator. So
>>>>>>> the
>>>>>>> > > > enumerator is
>>>>>>> > > > >>>>> responsible for maintaining these splits, until they are
>>>>>>> assigned
>>>>>>> > to
>>>>>>> > > > a
>>>>>>> > > > >>>>> source reader again.
>>>>>>> > > > >>>>>
>>>>>>> > > > >>>>> There are other cases where there may be conflict
>>>>>>> information
>>>>>>> > between
>>>>>>> > > > >>>>> reader and enumerator. For example, consider the
>>>>>>> following
>>>>>>> > sequence:
>>>>>>> > > > >>>>> 1. reader A reports splits (1 and 2) up on restart.
>>>>>>> > > > >>>>> 2. enumerator receives the report and assigns both 1 and
>>>>>>> 2 to
>>>>>>> > reader
>>>>>>> > > > B.
>>>>>>> > > > >>>>> 3. reader A failed before checkpointing. And this is a
>>>>>>> partial
>>>>>>> > > > >>>>> failure, so only reader A restarts.
>>>>>>> > > > >>>>> 4. When reader A recovers, it will again report splits
>>>>>>> (1 and 2)
>>>>>>> > to
>>>>>>> > > > >>>>> the enumerator.
>>>>>>> > > > >>>>> 5. The enumerator should ignore this report because it
>>>>>>> has
>>>>>>> > > > >>>>> assigned splits (1 and 2) to reader B.
>>>>>>> > > > >>>>>
>>>>>>> > > > >>>>> So with the new contract, the enumerator should be the
>>>>>>> source of
>>>>>>> > > > truth
>>>>>>> > > > >>>>> for split ownership.
>>>>>>> > > > >>>>>
>>>>>>> > > > >>>>> Thanks,
>>>>>>> > > > >>>>>
>>>>>>> > > > >>>>> Jiangjie (Becket) Qin
>>>>>>> > > > >>>>>
>>>>>>> > > > >>>>> On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang <
>>>>>>> > > > [email protected]>
>>>>>>> > > > >>>>> wrote:
>>>>>>> > > > >>>>>
>>>>>>> > > > >>>>>> Hi Becket,
>>>>>>> > > > >>>>>>
>>>>>>> > > > >>>>>>
>>>>>>> > > > >>>>>> I did consider this approach at the beginning (and it
>>>>>>> was also
>>>>>>> > > > >>>>>> mentioned in this FLIP), since it would allow more
>>>>>>> flexibility
>>>>>>> > in
>>>>>>> > > > >>>>>> reassigning all splits. However, there are a few
>>>>>>> potential
>>>>>>> > issues.
>>>>>>> > > > >>>>>>
>>>>>>> > > > >>>>>> 1. High Transmission Cost
>>>>>>> > > > >>>>>> If we pass the full split objects (rather than just
>>>>>>> split IDs),
>>>>>>> > the
>>>>>>> > > > >>>>>> data size could be significant, leading to high
>>>>>>> overhead during
>>>>>>> > > > >>>>>> transmission — especially when many splits are involved.
>>>>>>> > > > >>>>>>
>>>>>>> > > > >>>>>> 2. Risk of Split Loss
>>>>>>> > > > >>>>>>
>>>>>>> > > > >>>>>> Risk of split loss exists unless we have a mechanism to
>>>>>>> make
>>>>>>> > sure
>>>>>>> > > > >>>>>> only can checkpoint after all the splits are reassigned.
>>>>>>> > > > >>>>>> There are scenarios where splits could be lost due to
>>>>>>> > inconsistent
>>>>>>> > > > >>>>>> state handling during recovery:
>>>>>>> > > > >>>>>>
>>>>>>> > > > >>>>>>
>>>>>>> > > > >>>>>> Scenario 1:
>>>>>>> > > > >>>>>>
>>>>>>> > > > >>>>>>
>>>>>>> > > > >>>>>>    1. Upon restart, Reader A reports assigned splits (1
>>>>>>> and 2),
>>>>>>> > and
>>>>>>> > > > >>>>>>    Reader B reports (3 and 4).
>>>>>>> > > > >>>>>>    2. The enumerator receives these reports but only
>>>>>>> reassigns
>>>>>>> > > > >>>>>>    splits 1 and 2 — not 3 and 4.
>>>>>>> > > > >>>>>>    3. A checkpoint or savepoint is then triggered. Only
>>>>>>> splits 1
>>>>>>> > and
>>>>>>> > > > >>>>>>    2 are recorded in the reader states; splits 3 and 4
>>>>>>> are not
>>>>>>> > > > persisted.
>>>>>>> > > > >>>>>>    4. If the job is later restarted from this
>>>>>>> checkpoint, splits
>>>>>>> > 3
>>>>>>> > > > >>>>>>    and 4 will be permanently lost.
>>>>>>> > > > >>>>>>
>>>>>>> > > > >>>>>>
>>>>>>> > > > >>>>>> Scenario 2:
>>>>>>> > > > >>>>>>
>>>>>>> > > > >>>>>>    1. Reader A reports splits (1 and 2), and Reader B
>>>>>>> reports (3
>>>>>>> > and
>>>>>>> > > > >>>>>>    4) upon restart.
>>>>>>> > > > >>>>>>    2. Before the enumerator receives all reports and
>>>>>>> performs
>>>>>>> > > > >>>>>>    reassignment, a checkpoint is triggered.
>>>>>>> > > > >>>>>>    3. Since no splits have been reassigned yet, both
>>>>>>> readers
>>>>>>> > have
>>>>>>> > > > >>>>>>    empty states.
>>>>>>> > > > >>>>>>    4. When restarting from this checkpoint, all four
>>>>>>> splits are
>>>>>>> > > > lost.
>>>>>>> > > > >>>>>>
>>>>>>> > > > >>>>>>
>>>>>>> > > > >>>>>> Let me know if you have thoughts on how we might
>>>>>>> mitigate these
>>>>>>> > > > risks!
>>>>>>> > > > >>>>>>
>>>>>>> > > > >>>>>> Best
>>>>>>> > > > >>>>>> Hongshun
>>>>>>> > > > >>>>>>
>>>>>>> > > > >>>>>> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin <
>>>>>>> [email protected]>
>>>>>>> > > > >>>>>> wrote:
>>>>>>> > > > >>>>>>
>>>>>>> > > > >>>>>>> Hi Hongshun,
>>>>>>> > > > >>>>>>>
>>>>>>> > > > >>>>>>> The steps sound reasonable to me in general. In terms
>>>>>>> of the
>>>>>>> > > > updated
>>>>>>> > > > >>>>>>> FLIP wiki, it would be good to see if we can keep the
>>>>>>> protocol
>>>>>>> > > > simple. One
>>>>>>> > > > >>>>>>> alternative way to achieve this behavior is following:
>>>>>>> > > > >>>>>>>
>>>>>>> > > > >>>>>>> 1. Upon SourceOperator startup, the SourceOperator
>>>>>>> sends
>>>>>>> > > > >>>>>>> ReaderRegistrationEvent with the currently assigned
>>>>>>> splits to
>>>>>>> > the
>>>>>>> > > > >>>>>>> enumerator. It does not add these splits to the
>>>>>>> SourceReader.
>>>>>>> > > > >>>>>>> 2. The enumerator will always use the
>>>>>>> > > > >>>>>>> SourceEnumeratorContext.assignSplits() to assign the
>>>>>>> splits.
>>>>>>> > (not
>>>>>>> > > > via the
>>>>>>> > > > >>>>>>> response of the SourceRegistrationEvent, this allows
>>>>>>> async
>>>>>>> > split
>>>>>>> > > > assignment
>>>>>>> > > > >>>>>>> in case the enumerator wants to wait until all the
>>>>>>> readers are
>>>>>>> > > > registered)
>>>>>>> > > > >>>>>>> 3. The SourceOperator will only call
>>>>>>> SourceReader.addSplits()
>>>>>>> > when
>>>>>>> > > > >>>>>>> it receives the AddSplitEvent from the enumerator.
>>>>>>> > > > >>>>>>>
>>>>>>> > > > >>>>>>> This protocol has a few benefits:
>>>>>>> > > > >>>>>>> 1. it basically allows arbitrary split reassignment
>>>>>>> upon
>>>>>>> > restart
>>>>>>> > > > >>>>>>> 2. simplicity: there is only one way to assign splits.
>>>>>>> > > > >>>>>>>
>>>>>>> > > > >>>>>>> So we only need one interface change:
>>>>>>> > > > >>>>>>> - add the initially assigned splits to ReaderInfo so
>>>>>>> the
>>>>>>> > Enumerator
>>>>>>> > > > >>>>>>> can access it.
>>>>>>> > > > >>>>>>> and one behavior change:
>>>>>>> > > > >>>>>>> - The SourceOperator should stop assigning splits to
>>>>>>> the from
>>>>>>> > state
>>>>>>> > > > >>>>>>> restoration, but
>>>>>>> > [message truncated...]
>>>>>>> >
>>>>>>>
>>>>>>
>>>>
>>

Reply via email to