Thanks Becket and Hongshun for the insightful discussion.

The underlying implementation and communication mechanisms of Flink Source 
indeed involve many intricate details, we discussed the issue of splits 
re-assignment in specific scenarios, but fortunately, the final decision turned 
out to be pretty clear.

 +1 to Becket’s proposal to keeps the framework cleaner and more flexible. 
+1 to Hongshun’s point to provide comprehensive guidance for connector 
developers. 


Best, 
Leonard



> 2025 9月 26 16:30,Hongshun Wang <loserwang1...@gmail.com> 写道:
> 
> Hi Becket,
> 
> I Got it. You’re suggesting we should not handle this in the source framework 
> but instead let the split enumerator manage these three scenarios.
> 
> Let me explain why I originally favored handling it in the framework: I'm 
> concerned that connector developers might overlook certain edge cases (after 
> all, we even payed extensive discussions to fully clarify the logic)
> 
> However, your point keeps the framework cleaner and more flexible. Thus, I 
> will take it.
> 
> Perhaps, in this FLIP, we should focus on providing comprehensive guidance 
> for connector developers: explain how to implement a split enumerator, 
> including the underlying challenges and their solutions.
>  
> Additionally, we can use the Kafka connector as a reference implementation to 
> demonstrate the practical steps. This way, developers who want to implement 
> similar connectors can directly reference this example.
> 
> 
> Best,
> Hongshun
> 
> 
> 
> On Fri, Sep 26, 2025 at 1:27 PM Becket Qin <becket....@gmail.com 
> <mailto:becket....@gmail.com>> wrote:
>> It would be good to not expose runtime details to the source implementation 
>> if possible. 
>> 
>> Today, the split enumerator implementations are expected to track the split 
>> assignment.
>> 
>> Assuming the split enumerator implementation keeps a split assignment map, 
>> that means the enumerator should already know whether a split is assigned or 
>> unassigned. So it can handle the three scenarios you mentioned.
>> 
>>> The split is reported by a reader during a global restoration.
>> The split enumerator should have just been restored / created. If the 
>> enumerator expects a full reassignment of splits up on global recovery, 
>> there should be no assigned splits to that reader in the split assignment 
>> mapping.
>> 
>>> The split is reported by a reader during a partial failure recovery.
>> In this case, when SplitEnumerator.addReader() is invoked, the split 
>> assignment map in the enumerator implementation should already have some 
>> split assignments for the reader. Therefore it is a partial failover. If the 
>> source supports split reassignment on recovery, the enumerator can assign 
>> splits that are different from the reported assignment of that reader in the 
>> SplitEnumeratorContext, or it can also assign the same splits. In any case, 
>> the enumerator knows that this is a partial recovery because the assignment 
>> map is non-empty.
>> 
>>> The split is not reported by a reader, but is assigned after the last 
>>> successful checkpoint and was never acknowledged.
>> This is actually one of the step in the partial failure recover. 
>> SplitEnumerator.addSplitsBack() will be called first before 
>> SplitReader.addReader() is called for the recovered reader. When the 
>> SplitEnumerator.addSplitsBack() is invoked, it is for sure a partial 
>> recovery. And the enumerator should remove these splits from the split 
>> assignment map as if they were never assigned.
>> 
>> I think this should work, right?
>> 
>> Thanks,
>> 
>> Jiangjie (Becket) Qin
>> 
>> On Thu, Sep 25, 2025 at 8:34 PM Hongshun Wang <loserwang1...@gmail.com 
>> <mailto:loserwang1...@gmail.com>> wrote:
>>> Hi Becket and Leonard,
>>> 
>>> Thanks for your advice. 
>>> 
>>> > put all the reader information in the SplitEnumerator context
>>> I have a concern: the current registeredReaders in SourceCoordinatorContext 
>>> will be removed after subtaskResetor execution on failure.However, this 
>>> approach has merit.
>>> 
>>> One more situation I found my previous design does not cover:
>>> Initial state: Reader A reports splits (1, 2).
>>> Enumerator action: Assigns split 1 to Reader A, and split 2 to Reader B.
>>> Failure scenario: Reader A fails before checkpointing. Since this is a 
>>> partial failure, only Reader A restarts.
>>> Recovery issue: Upon recovery, Reader A re-reports split (1).
>>> In my previous design, the enumerator will ignore Reader A's 
>>> re-registration which will cause data loss.
>>> 
>>> Thus, when the enumerator receives a split, the split may originate from 
>>> three scenarios:
>>> The split is reported by a reader during a global restoration.
>>> The split is reported by a reader during a partial failure recovery.
>>> The split is not reported by a reader, but is assigned after the last 
>>> successful checkpoint and was never acknowledged.
>>> In the first scenario (global restore), the split should be re-distributed. 
>>> For the latter two scenarios (partial failover and post-checkpoint 
>>> assignment), we need to reassign the split to its originally assigned 
>>> subtask.
>>> 
>>> By implementing a method in the SplitEnumerator context to track each 
>>> assigned split's status, the system can correctly identify and resolve 
>>> split ownership in all three scenarios.What about adding a  
>>> `SplitRecoveryType splitRecoveryType(Split split)` in 
>>> SplitEnumeratorContext. SplitRecoveryTypeis a enum including 
>>> `UNASSIGNED`、`GLOBAL_RESTORE`、`PARTIAL_FAILOVER` and 
>>> `POST_CHECKPOINT_ASSIGNMENT`.
>>> 
>>> What do you think? Are there any details or scenarios I haven't considered? 
>>> Looking forward to your advice.
>>> 
>>> Best,
>>> Hongshun
>>> 
>>> On Thu, Sep 11, 2025 at 12:41 AM Becket Qin <becket....@gmail.com 
>>> <mailto:becket....@gmail.com>> wrote:
>>>> Thanks for the explanation, Hongshun.
>>>> 
>>>> Current pattern of handling new reader registration following:
>>>> 1. put all the reader information in the SplitEnumerator context
>>>> 2. notify the enumerator about the new reader registration.
>>>> 3. Let the split enumerator get whatever information it wants from the
>>>> context and do its job.
>>>> This pattern decouples the information passing and the reader registration
>>>> notification. This makes the API extensible - we can add more information
>>>> (e.g. reported assigned splits in our case) about the reader to the context
>>>> without introducing new methods.
>>>> 
>>>> Introducing a new method of addSplitBackOnRecovery() is redundant to the
>>>> above pattern. Do we really need it?
>>>> 
>>>> Thanks,
>>>> 
>>>> Jiangjie (Becket) Qin
>>>> 
>>>> On Mon, Sep 8, 2025 at 8:18 PM Hongshun Wang <loserwang1...@gmail.com 
>>>> <mailto:loserwang1...@gmail.com>>
>>>> wrote:
>>>> 
>>>> > Hi Becket,
>>>> >
>>>> > > I am curious what would the enumerator do differently for the splits
>>>> > added via addSplitsBackOnRecovery() V.S. addSplitsBack()?
>>>> >
>>>> >  In this FLIP, there are two distinct scenarios in which the enumerator
>>>> > receives splits being added back:
>>>> > 1.  Job-level restore: The job is restored,  splits from reader’s state 
>>>> > are
>>>> > reported by ReaderRegistrationEvent.
>>>> > 2.  Reader-level restart: a reader is started but not the whole  job,
>>>> >  splits assigned to it after the last successful checkpoint. This is what
>>>> > addSplitsBack used to do.
>>>> >
>>>> >
>>>> > In these two situations, the enumerator will choose different strategies.
>>>> > 1. Job-level restore: the splits should be redistributed across readers
>>>> > according to the current partitioner strategy.
>>>> > 2. Reader-level restart: the splits should be reassigned directly back to
>>>> > the same reader they were originally assigned to, preserving locality and
>>>> > avoiding unnecessary redistribution
>>>> >
>>>> > Therefore, the enumerator must clearly distinguish between these two
>>>> > scenarios.I used to deprecate the former  addSplitsBack(List<SplitT>
>>>> > splits, int subtaskId) but add a new addSplitsBack(List<SplitT>
>>>> > splits, int subtaskId,
>>>> > boolean reportedByReader).
>>>> > Leonard suggest to use another method addSplitsBackOnRecovery but not
>>>> > influenced  currently addSplitsBack.
>>>> >
>>>> > Best
>>>> > Hongshun
>>>> >
>>>> >
>>>> >
>>>> > On 2025/09/08 17:20:31 Becket Qin wrote:
>>>> > > Hi Leonard,
>>>> > >
>>>> > >
>>>> > > > Could we introduce a new method like addSplitsBackOnRecovery  with
>>>> > default
>>>> > > > implementation. In this way, we can provide better backward
>>>> > compatibility
>>>> > > > and also makes it easier for developers to understand.
>>>> > >
>>>> > >
>>>> > > I am curious what would the enumerator do differently for the splits
>>>> > added
>>>> > > via addSplitsBackOnRecovery() V.S. addSplitsBack()?  Today,
>>>> > addSplitsBack()
>>>> > > is also only called upon recovery. So the new method seems confusing. 
>>>> > > One
>>>> > > thing worth clarifying is if the Source implements
>>>> > > SupportSplitReassignmentOnRecovery, upon recovery, should the splits
>>>> > > reported by the readers also be added back to the SplitEnumerator via 
>>>> > > the
>>>> > > addSplitsBack() call? Or should the SplitEnumerator explicitly query 
>>>> > > the
>>>> > > registered reader information via the SplitEnumeratorContext to get the
>>>> > > originally assigned splits when addReader() is invoked? I was assuming
>>>> > the
>>>> > > latter in the beginning, so the behavior of addSplitsBack() remains
>>>> > > unchanged, but I am not opposed in doing the former.
>>>> > >
>>>> > > Also, can you elaborate on the backwards compatibility issue you see if
>>>> > we
>>>> > > do not have a separate addSplitsBackOnRecovery() method? Even without
>>>> > this
>>>> > > new method, the behavior remains exactly the same unless the end users
>>>> > > implement the mix-in interface of "SupportSplitReassignmentOnRecovery",
>>>> > > right?
>>>> > >
>>>> > > Thanks,
>>>> > >
>>>> > > Jiangjie (Becket) Qin
>>>> > >
>>>> > > On Mon, Sep 8, 2025 at 1:48 AM Hongshun Wang <lo...@gmail.com 
>>>> > > <mailto:lo...@gmail.com>>
>>>> > > wrote:
>>>> > >
>>>> > > > Hi devs,
>>>> > > >
>>>> > > > It has been quite some time since this FLIP[1] was first proposed.
>>>> > Thank
>>>> > > > you for your valuable feedback—based on your suggestions, the FLIP 
>>>> > > > has
>>>> > > > undergone several rounds of revisions.
>>>> > > >
>>>> > > > Any more advice is welcome and appreciated. If there are no further
>>>> > > > concerns, I plan to start the vote tomorrow.
>>>> > > >
>>>> > > > Best
>>>> > > > Hongshun
>>>> > > >
>>>> > > > [1]
>>>> > > >
>>>> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=373886480
>>>> > > >
>>>> > > > On Mon, Sep 8, 2025 at 4:42 PM Hongshun Wang <lo...@gmail.com 
>>>> > > > <mailto:lo...@gmail.com>>
>>>> > > > wrote:
>>>> > > >
>>>> > > > > Hi Leonard,
>>>> > > > > Thanks for your advice.  It makes sense and I have modified it.
>>>> > > > >
>>>> > > > > Best,
>>>> > > > > Hongshun
>>>> > > > >
>>>> > > > > On Mon, Sep 8, 2025 at 11:40 AM Leonard Xu <xb...@gmail.com 
>>>> > > > > <mailto:xb...@gmail.com>> wrote:
>>>> > > > >
>>>> > > > >> Thanks Hongshun and Becket for the deep discussion.
>>>> > > > >>
>>>> > > > >> I only have one comment for one API design:
>>>> > > > >>
>>>> > > > >> > Deprecate the old addSplitsBack  method, use a addSplitsBack 
>>>> > > > >> > with
>>>> > > > >> param isReportedByReader instead. Because, The enumerator can 
>>>> > > > >> apply
>>>> > > > >> different reassignment policies based on the context.
>>>> > > > >>
>>>> > > > >> Could we introduce a new method like *addSplitsBackOnRecovery*  
>>>> > > > >> with
>>>> > > > default
>>>> > > > >> implementation. In this way, we can provide better backward
>>>> > > > >> compatibility and also makes it easier for developers to 
>>>> > > > >> understand.
>>>> > > > >>
>>>> > > > >> Best,
>>>> > > > >> Leonard
>>>> > > > >>
>>>> > > > >>
>>>> > > > >>
>>>> > > > >> 2025 9月 3 20:26,Hongshun Wang <lo...@gmail.com 
>>>> > > > >> <mailto:lo...@gmail.com>> 写道:
>>>> > > > >>
>>>> > > > >> Hi Becket,
>>>> > > > >>
>>>> > > > >> I think that's a great idea!  I have added the
>>>> > > > >> SupportSplitReassignmentOnRecovery interface in this FLIP. If a
>>>> > Source
>>>> > > > >> implements this interface indicates that the source operator needs
>>>> > to
>>>> > > > >> report splits to the enumerator and receive reassignment.[1]
>>>> > > > >>
>>>> > > > >> Best,
>>>> > > > >> Hongshun
>>>> > > > >>
>>>> > > > >> [1]
>>>> > > > >>
>>>> > > >
>>>> >
>>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>>>> > > > >>
>>>> > > > >> On Thu, Aug 21, 2025 at 12:09 PM Becket Qin <be...@gmail.com 
>>>> > > > >> <mailto:be...@gmail.com>>
>>>> > > > wrote:
>>>> > > > >>
>>>> > > > >>> Hi Hongshun,
>>>> > > > >>>
>>>> > > > >>> I think the convention for such optional features in Source is 
>>>> > > > >>> via
>>>> > > > >>> mix-in interfaces. So instead of adding a method to the
>>>> > SourceReader,
>>>> > > > maybe
>>>> > > > >>> we should introduce an interface 
>>>> > > > >>> SupportSplitReassingmentOnRecovery
>>>> > > > with
>>>> > > > >>> this method. If a Source implementation implements that 
>>>> > > > >>> interface,
>>>> > > > then the
>>>> > > > >>> SourceOperator will check the desired behavior and act 
>>>> > > > >>> accordingly.
>>>> > > > >>>
>>>> > > > >>> Thanks,
>>>> > > > >>>
>>>> > > > >>> Jiangjie (Becket) Qin
>>>> > > > >>>
>>>> > > > >>> On Wed, Aug 20, 2025 at 8:52 PM Hongshun Wang <
>>>> > loserwang1...@gmail.com <mailto:loserwang1...@gmail.com>
>>>> > > > >
>>>> > > > >>> wrote:
>>>> > > > >>>
>>>> > > > >>>> Hi de vs,
>>>> > > > >>>>
>>>> > > > >>>> Would anyone like to discuss this FLIP? I'd appreciate your
>>>> > feedback
>>>> > > > >>>> and suggestions.
>>>> > > > >>>>
>>>> > > > >>>> Best,
>>>> > > > >>>> Hongshun
>>>> > > > >>>>
>>>> > > > >>>>
>>>> > > > >>>> 2025年8月13日 14:23,Hongshun Wang <lo...@gmail.com 
>>>> > > > >>>> <mailto:lo...@gmail.com>> 写道:
>>>> > > > >>>>
>>>> > > > >>>> Hi Becket,
>>>> > > > >>>>
>>>> > > > >>>> Thank you for your detailed feedback. The new contract makes 
>>>> > > > >>>> good
>>>> > > > sense
>>>> > > > >>>> to me and effectively addresses the issues I encountered at the
>>>> > > > beginning
>>>> > > > >>>> of the design.
>>>> > > > >>>>
>>>> > > > >>>> That said, I recommend not reporting splits by default, 
>>>> > > > >>>> primarily
>>>> > for
>>>> > > > >>>> compatibility and practical reasons:
>>>> > > > >>>>
>>>> > > > >>>> >  For these reasons, we do not expect the Split objects to be
>>>> > huge,
>>>> > > > >>>> and we are not trying to design for huge Split objects either as
>>>> > they
>>>> > > > will
>>>> > > > >>>> have problems even today.
>>>> > > > >>>>
>>>> > > > >>>>    1.
>>>> > > > >>>>
>>>> > > > >>>>    Not all existing connector match this rule
>>>> > > > >>>>    For example, in mysql cdc connector, a binlog split may 
>>>> > > > >>>> contain
>>>> > > > >>>>    hundreds (or even more) snapshot split completion records. 
>>>> > > > >>>> This
>>>> > > > state is
>>>> > > > >>>>    large and is currently transmitted incrementally through
>>>> > multiple
>>>> > > > >>>>    BinlogSplitMetaEvent messages. Since the binlog reader 
>>>> > > > >>>> operates
>>>> > > > >>>>    with single parallelism, reporting the full split state on
>>>> > recovery
>>>> > > > >>>>    could be inefficient or even infeasible.
>>>> > > > >>>>    For such sources, it would be better to provide a mechanism 
>>>> > > > >>>> to
>>>> > skip
>>>> > > > >>>>    split reporting during restart until they redesign and reduce
>>>> > the
>>>> > > > >>>>    split size.
>>>> > > > >>>>    2.
>>>> > > > >>>>
>>>> > > > >>>>    Not all enumerators maintain unassigned splits in state.
>>>> > > > >>>>    Some SplitEnumerator(such as kafka connector) implementations
>>>> > do
>>>> > > > >>>>    not track or persistently manage unassigned splits. Requiring
>>>> > them
>>>> > > > to
>>>> > > > >>>>    handle re-registration would add unnecessary complexity. Even
>>>> > > > though we
>>>> > > > >>>>    maybe implements in kafka connector, currently, kafka 
>>>> > > > >>>> connector
>>>> > is
>>>> > > > decouple
>>>> > > > >>>>    with flink version, we also need to make sure the elder 
>>>> > > > >>>> version
>>>> > is
>>>> > > > >>>>    compatible.
>>>> > > > >>>>
>>>> > > > >>>> ------------------------------
>>>> > > > >>>>
>>>> > > > >>>> To address these concerns, I propose introducing a new method:
>>>> > boolean
>>>> > > > >>>> SourceReader#shouldReassignSplitsOnRecovery() with a default
>>>> > > > >>>> implementation returning false. This allows source readers to 
>>>> > > > >>>> opt
>>>> > in
>>>> > > > >>>> to split reassignment only when necessary. Since the new 
>>>> > > > >>>> contract
>>>> > > > already
>>>> > > > >>>> places the responsibility for split assignment on the 
>>>> > > > >>>> enumerator,
>>>> > not
>>>> > > > >>>> reporting splits by default is a safe and clean default 
>>>> > > > >>>> behavior.
>>>> > > > >>>>
>>>> > > > >>>>
>>>> > > > >>>> ------------------------------
>>>> > > > >>>>
>>>> > > > >>>>
>>>> > > > >>>> I’ve updated the implementation and the FIP accordingly[1]. It
>>>> > quite a
>>>> > > > >>>> big change. In particular, for the Kafka connector, we can now 
>>>> > > > >>>> use
>>>> > a
>>>> > > > >>>> pluggable SplitPartitioner to support different split assignment
>>>> > > > >>>> strategies (e.g., default, round-robin).
>>>> > > > >>>>
>>>> > > > >>>>
>>>> > > > >>>> Could you please review it when you have a chance?
>>>> > > > >>>>
>>>> > > > >>>>
>>>> > > > >>>> Best,
>>>> > > > >>>>
>>>> > > > >>>> Hongshun
>>>> > > > >>>>
>>>> > > > >>>>
>>>> > > > >>>> [1]
>>>> > > > >>>>
>>>> > > >
>>>> >
>>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>>>> > > > >>>>
>>>> > > > >>>> On Sat, Aug 9, 2025 at 3:03 AM Becket Qin <be...@gmail.com 
>>>> > > > >>>> <mailto:be...@gmail.com>>
>>>> > > > wrote:
>>>> > > > >>>>
>>>> > > > >>>>> Hi Hongshun,
>>>> > > > >>>>>
>>>> > > > >>>>> I am not too concerned about the transmission cost. Because the
>>>> > full
>>>> > > > >>>>> split transmission has to happen in the initial assignment 
>>>> > > > >>>>> phase
>>>> > > > already.
>>>> > > > >>>>> And in the future, we probably want to also introduce some kind
>>>> > of
>>>> > > > workload
>>>> > > > >>>>> balance across source readers, e.g. based on the per-split
>>>> > > > throughput or
>>>> > > > >>>>> the per-source-reader workload in heterogeneous clusters. For
>>>> > these
>>>> > > > >>>>> reasons, we do not expect the Split objects to be huge, and we
>>>> > are
>>>> > > > not
>>>> > > > >>>>> trying to design for huge Split objects either as they will 
>>>> > > > >>>>> have
>>>> > > > problems
>>>> > > > >>>>> even today.
>>>> > > > >>>>>
>>>> > > > >>>>> Good point on the potential split loss, please see the reply
>>>> > below:
>>>> > > > >>>>>
>>>> > > > >>>>> Scenario 2:
>>>> > > > >>>>>
>>>> > > > >>>>>
>>>> > > > >>>>>> 1. Reader A reports splits (1 and 2), and Reader B reports (3
>>>> > and 4)
>>>> > > > >>>>>> upon restart.
>>>> > > > >>>>>> 2. Before the enumerator receives all reports and performs
>>>> > > > >>>>>> reassignment, a checkpoint is triggered.
>>>> > > > >>>>>> 3. Since no splits have been reassigned yet, both readers have
>>>> > empty
>>>> > > > >>>>>> states.
>>>> > > > >>>>>> 4. When restarting from this checkpoint, all four splits are
>>>> > lost.
>>>> > > > >>>>>
>>>> > > > >>>>> The reader registration happens in the SourceOperator.open(),
>>>> > which
>>>> > > > >>>>> means the task is still in the initializing state, therefore 
>>>> > > > >>>>> the
>>>> > > > checkpoint
>>>> > > > >>>>> should not be triggered until the enumerator receives all the
>>>> > split
>>>> > > > reports.
>>>> > > > >>>>>
>>>> > > > >>>>> There is a nuance here. Today, the RPC call from the TM to the 
>>>> > > > >>>>> JM
>>>> > is
>>>> > > > >>>>> async. So it is possible that the SourceOpertor.open() has
>>>> > returned,
>>>> > > > but
>>>> > > > >>>>> the enumerator has not received the split reports. However,
>>>> > because
>>>> > > > the
>>>> > > > >>>>> task status update RPC call goes to the same channel as the 
>>>> > > > >>>>> split
>>>> > > > reports
>>>> > > > >>>>> call, so the task status RPC call will happen after the split
>>>> > > > reports call
>>>> > > > >>>>> on the JM side. Therefore, on the JM side, the 
>>>> > > > >>>>> SourceCoordinator
>>>> > will
>>>> > > > >>>>> always first receive the split reports, then receive the
>>>> > checkpoint
>>>> > > > request.
>>>> > > > >>>>> This "happen before" relationship is kind of important to
>>>> > guarantee
>>>> > > > >>>>> the consistent state between enumerator and readers.
>>>> > > > >>>>>
>>>> > > > >>>>> Scenario 1:
>>>> > > > >>>>>
>>>> > > > >>>>>
>>>> > > > >>>>>> 1. Upon restart, Reader A reports assigned splits (1 and 2), 
>>>> > > > >>>>>> and
>>>> > > > >>>>>> Reader B reports (3 and 4).
>>>> > > > >>>>>> 2. The enumerator receives these reports but only reassigns
>>>> > splits 1
>>>> > > > >>>>>> and 2 — not 3 and 4.
>>>> > > > >>>>>> 3. A checkpoint or savepoint is then triggered. Only splits 1
>>>> > and 2
>>>> > > > >>>>>> are recorded in the reader states; splits 3 and 4 are not
>>>> > persisted.
>>>> > > > >>>>>> 4. If the job is later restarted from this checkpoint, splits 
>>>> > > > >>>>>> 3
>>>> > and
>>>> > > > 4
>>>> > > > >>>>>> will be permanently lost.
>>>> > > > >>>>>
>>>> > > > >>>>> This scenario is possible. One solution is to let the 
>>>> > > > >>>>> enumerator
>>>> > > > >>>>> implementation handle this. That means if the enumerator relies
>>>> > on
>>>> > > > the
>>>> > > > >>>>> initial split reports from the source readers, it should 
>>>> > > > >>>>> maintain
>>>> > > > these
>>>> > > > >>>>> reports by itself. In the above example, the enumerator will 
>>>> > > > >>>>> need
>>>> > to
>>>> > > > >>>>> remember that 3 and 4 are not assigned and put it into its own
>>>> > state.
>>>> > > > >>>>> The current contract is that anything assigned to the
>>>> > SourceReaders
>>>> > > > >>>>> are completely owned by the SourceReaders. Enumerators can
>>>> > remember
>>>> > > > the
>>>> > > > >>>>> assignments but cannot change them, even when the source reader
>>>> > > > recovers /
>>>> > > > >>>>> restarts.
>>>> > > > >>>>> With this FLIP, the contract becomes that the source readers 
>>>> > > > >>>>> will
>>>> > > > >>>>> return the ownership of the splits to the enumerator. So the
>>>> > > > enumerator is
>>>> > > > >>>>> responsible for maintaining these splits, until they are 
>>>> > > > >>>>> assigned
>>>> > to
>>>> > > > a
>>>> > > > >>>>> source reader again.
>>>> > > > >>>>>
>>>> > > > >>>>> There are other cases where there may be conflict information
>>>> > between
>>>> > > > >>>>> reader and enumerator. For example, consider the following
>>>> > sequence:
>>>> > > > >>>>> 1. reader A reports splits (1 and 2) up on restart.
>>>> > > > >>>>> 2. enumerator receives the report and assigns both 1 and 2 to
>>>> > reader
>>>> > > > B.
>>>> > > > >>>>> 3. reader A failed before checkpointing. And this is a partial
>>>> > > > >>>>> failure, so only reader A restarts.
>>>> > > > >>>>> 4. When reader A recovers, it will again report splits (1 and 
>>>> > > > >>>>> 2)
>>>> > to
>>>> > > > >>>>> the enumerator.
>>>> > > > >>>>> 5. The enumerator should ignore this report because it has
>>>> > > > >>>>> assigned splits (1 and 2) to reader B.
>>>> > > > >>>>>
>>>> > > > >>>>> So with the new contract, the enumerator should be the source 
>>>> > > > >>>>> of
>>>> > > > truth
>>>> > > > >>>>> for split ownership.
>>>> > > > >>>>>
>>>> > > > >>>>> Thanks,
>>>> > > > >>>>>
>>>> > > > >>>>> Jiangjie (Becket) Qin
>>>> > > > >>>>>
>>>> > > > >>>>> On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang <
>>>> > > > loserwang1...@gmail.com <mailto:loserwang1...@gmail.com>>
>>>> > > > >>>>> wrote:
>>>> > > > >>>>>
>>>> > > > >>>>>> Hi Becket,
>>>> > > > >>>>>>
>>>> > > > >>>>>>
>>>> > > > >>>>>> I did consider this approach at the beginning (and it was also
>>>> > > > >>>>>> mentioned in this FLIP), since it would allow more flexibility
>>>> > in
>>>> > > > >>>>>> reassigning all splits. However, there are a few potential
>>>> > issues.
>>>> > > > >>>>>>
>>>> > > > >>>>>> 1. High Transmission Cost
>>>> > > > >>>>>> If we pass the full split objects (rather than just split 
>>>> > > > >>>>>> IDs),
>>>> > the
>>>> > > > >>>>>> data size could be significant, leading to high overhead 
>>>> > > > >>>>>> during
>>>> > > > >>>>>> transmission — especially when many splits are involved.
>>>> > > > >>>>>>
>>>> > > > >>>>>> 2. Risk of Split Loss
>>>> > > > >>>>>>
>>>> > > > >>>>>> Risk of split loss exists unless we have a mechanism to make
>>>> > sure
>>>> > > > >>>>>> only can checkpoint after all the splits are reassigned.
>>>> > > > >>>>>> There are scenarios where splits could be lost due to
>>>> > inconsistent
>>>> > > > >>>>>> state handling during recovery:
>>>> > > > >>>>>>
>>>> > > > >>>>>>
>>>> > > > >>>>>> Scenario 1:
>>>> > > > >>>>>>
>>>> > > > >>>>>>
>>>> > > > >>>>>>    1. Upon restart, Reader A reports assigned splits (1 and 
>>>> > > > >>>>>> 2),
>>>> > and
>>>> > > > >>>>>>    Reader B reports (3 and 4).
>>>> > > > >>>>>>    2. The enumerator receives these reports but only reassigns
>>>> > > > >>>>>>    splits 1 and 2 — not 3 and 4.
>>>> > > > >>>>>>    3. A checkpoint or savepoint is then triggered. Only 
>>>> > > > >>>>>> splits 1
>>>> > and
>>>> > > > >>>>>>    2 are recorded in the reader states; splits 3 and 4 are not
>>>> > > > persisted.
>>>> > > > >>>>>>    4. If the job is later restarted from this checkpoint, 
>>>> > > > >>>>>> splits
>>>> > 3
>>>> > > > >>>>>>    and 4 will be permanently lost.
>>>> > > > >>>>>>
>>>> > > > >>>>>>
>>>> > > > >>>>>> Scenario 2:
>>>> > > > >>>>>>
>>>> > > > >>>>>>    1. Reader A reports splits (1 and 2), and Reader B reports 
>>>> > > > >>>>>> (3
>>>> > and
>>>> > > > >>>>>>    4) upon restart.
>>>> > > > >>>>>>    2. Before the enumerator receives all reports and performs
>>>> > > > >>>>>>    reassignment, a checkpoint is triggered.
>>>> > > > >>>>>>    3. Since no splits have been reassigned yet, both readers
>>>> > have
>>>> > > > >>>>>>    empty states.
>>>> > > > >>>>>>    4. When restarting from this checkpoint, all four splits 
>>>> > > > >>>>>> are
>>>> > > > lost.
>>>> > > > >>>>>>
>>>> > > > >>>>>>
>>>> > > > >>>>>> Let me know if you have thoughts on how we might mitigate 
>>>> > > > >>>>>> these
>>>> > > > risks!
>>>> > > > >>>>>>
>>>> > > > >>>>>> Best
>>>> > > > >>>>>> Hongshun
>>>> > > > >>>>>>
>>>> > > > >>>>>> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin <be...@gmail.com 
>>>> > > > >>>>>> <mailto:be...@gmail.com>>
>>>> > > > >>>>>> wrote:
>>>> > > > >>>>>>
>>>> > > > >>>>>>> Hi Hongshun,
>>>> > > > >>>>>>>
>>>> > > > >>>>>>> The steps sound reasonable to me in general. In terms of the
>>>> > > > updated
>>>> > > > >>>>>>> FLIP wiki, it would be good to see if we can keep the 
>>>> > > > >>>>>>> protocol
>>>> > > > simple. One
>>>> > > > >>>>>>> alternative way to achieve this behavior is following:
>>>> > > > >>>>>>>
>>>> > > > >>>>>>> 1. Upon SourceOperator startup, the SourceOperator sends
>>>> > > > >>>>>>> ReaderRegistrationEvent with the currently assigned splits to
>>>> > the
>>>> > > > >>>>>>> enumerator. It does not add these splits to the SourceReader.
>>>> > > > >>>>>>> 2. The enumerator will always use the
>>>> > > > >>>>>>> SourceEnumeratorContext.assignSplits() to assign the splits.
>>>> > (not
>>>> > > > via the
>>>> > > > >>>>>>> response of the SourceRegistrationEvent, this allows async
>>>> > split
>>>> > > > assignment
>>>> > > > >>>>>>> in case the enumerator wants to wait until all the readers 
>>>> > > > >>>>>>> are
>>>> > > > registered)
>>>> > > > >>>>>>> 3. The SourceOperator will only call SourceReader.addSplits()
>>>> > when
>>>> > > > >>>>>>> it receives the AddSplitEvent from the enumerator.
>>>> > > > >>>>>>>
>>>> > > > >>>>>>> This protocol has a few benefits:
>>>> > > > >>>>>>> 1. it basically allows arbitrary split reassignment upon
>>>> > restart
>>>> > > > >>>>>>> 2. simplicity: there is only one way to assign splits.
>>>> > > > >>>>>>>
>>>> > > > >>>>>>> So we only need one interface change:
>>>> > > > >>>>>>> - add the initially assigned splits to ReaderInfo so the
>>>> > Enumerator
>>>> > > > >>>>>>> can access it.
>>>> > > > >>>>>>> and one behavior change:
>>>> > > > >>>>>>> - The SourceOperator should stop assigning splits to the from
>>>> > state
>>>> > > > >>>>>>> restoration, but
>>>> > [message truncated...]
>>>> >

Reply via email to