Hi Becket,

Get it! I have modified it again.

Best,
Hongshun

On Wed, Oct 15, 2025 at 1:18 PM Becket Qin <[email protected]> wrote:

> Hi Hongshun,
>
> Thanks for updating the FLIP. It looks much cleaner. Some minor comments:
>
> 1. Rename the `splitsOnRecovery` in ReaderInfo to
> `reportedSplitsOnRegistration`.
> 2. The SourceOperator should always send the ReaderRegistrationEvent with
> the `reportedSplitsOnRegistration` list. But it will not add the splits to
> readers if `SupportSplitReassignmentOnRecovery` is implemented.
> 3. "*Thus, If a connector want to use this FLIP-537, enumerator must keep
> a unAssignedSplit in state*" - this is up to the
> enumerator implementation to decide. It is not a must have for all the
> enumerator impl.
> 4. The RoundRobin algorithm in KafkaSource is not deterministic. Why not
> just get all the splits and do a round robin based on the numReaders? E.g.
> Sort all the splits, and assign the splits to reader 0, 1, 2, 3...N, 0, 1,
> 2, 3... N...
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Oct 14, 2025 at 7:28 PM Hongshun Wang <[email protected]>
> wrote:
>
>> Hi devs,
>>
>> If there is no other problem, I will start a vote later.
>>
>> Best,
>> Hongshun
>>
>> On Mon, Oct 13, 2025 at 4:17 PM Hongshun Wang <[email protected]>
>> wrote:
>>
>>> Hi Becket and Leonard,
>>>
>>> It seems adding  `splitsOnRecovery` to `ReaderInfo` makes the split
>>> enumerator simpler and cleaner.
>>>
>>> I have modified this FLIP again. Please have a look and let me know what
>>> you think.
>>>
>>> Best,
>>> Hongshun
>>>
>>> On Mon, Oct 13, 2025 at 10:48 AM Hongshun Wang <[email protected]>
>>> wrote:
>>>
>>>> Hi Becket,
>>>> Thanks for your explanation.
>>>>
>>>> > For the same three input above, the assignment should be consistently
>>>> the same.
>>>>
>>>> That is exactly what troubles me. For *assignment algorithms such as
>>>> hash, it does behave the same. What If we use round-robin? Each *the 
>>>> *reader
>>>> information, the same split will be assigned to different readers. There is
>>>> also what I used to list as an example.*
>>>>
>>>>    1. *Initial state:*: 2 parallelism, 2 splits.
>>>>    2. *Enumerator action:*  Split 1 → Task 1, Split 2 → Task 2 ,  ,
>>>>    3. *Failure scenario: *After Split 2 is assigned to Task 2 but
>>>>    before next checkpoint success, task 1 restarts.
>>>>    4. *Recovery issue:* Split 2 is re-added to the enumerator.
>>>>    Round-robin strategy assigns Split 2 to Task 1. Then Task 1 now has 2
>>>>    splits, Task 2 has 0 → Imbalanced distribution.
>>>>
>>>>
>>>> > Please let me know if you think a meeting would be more efficient.
>>>> Yes, I’d like to reach an agreement as soon as possible. If you’re
>>>> available, we could schedule a meeting with Lenenord as well.
>>>>
>>>> Best,
>>>> Hongshun
>>>>
>>>> On Sat, Oct 11, 2025 at 3:59 PM Becket Qin <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Hongshun,
>>>>>
>>>>> I am confused. First of all, regardless of what the assignment
>>>>> algorithm is. Using SplitEnumeratorContext to return the splits only gives
>>>>> more information than using addSplitsBack(). So there should be no
>>>>> regression.
>>>>>
>>>>> Secondly, at this point. The SplitEnumerator should only take the
>>>>> following three input to generate the global splits assignment:
>>>>> 1. the *reader information (num readers, locations, etc)*
>>>>> 2. *all the splits to assign*
>>>>> 3. *configured assignment algorithm *
>>>>> Preferably, for the same three input above, the assignment should be
>>>>> consistently the same. I don't see why it should care about why a new
>>>>> reader is added, whether due to partial failover or global failover or job
>>>>> restart.
>>>>>
>>>>> If you want to do global redistribution on global failover and
>>>>> restart, but honor the existing assignment for partial failover. The
>>>>> enumerator will just do the following:
>>>>> 1. Generate a new global assignment (global redistribution) in start()
>>>>> because start() will only be invoked in global failover or restart. That
>>>>> means all the readers are also new with empty assignment.
>>>>> 2. After the global assignment is generated, it should be honored for
>>>>> the whole life cycle. there might be many reader registrations, again for
>>>>> different reasons but does not matter:
>>>>>     - reader registration after this job restart
>>>>>     - reader registration after this global failover
>>>>>     - reader registration due to partial failover which may or may not
>>>>> have a addSplitsBack() call.
>>>>>     Regardless of the reason, the split enumerator will just enforce
>>>>> the global assignment it has already generated, i.e. without split
>>>>> redistribution.
>>>>>
>>>>> Wouldn't that give the behavior you want? I feel the discussion
>>>>> somehow goes to circles. Please let me know if you think a meeting would 
>>>>> be
>>>>> more efficient.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Jiangjie (Becket) Qin
>>>>>
>>>>> On Fri, Oct 10, 2025 at 7:58 PM Hongshun Wang <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Becket,
>>>>>>
>>>>>> > Ignore a returned split if it has been assigned to a different
>>>>>> reader, otherwise put it back to unassigned splits / pending splits. Then
>>>>>> the enumerator assigns new splits to the newly added reader, which may 
>>>>>> use
>>>>>> the previous assignment as a reference. This should work regardless of
>>>>>> whether it is a global failover, partial failover, restart, etc. There is
>>>>>> no need for the SplitEnumerator to distinguish what failover scenario it 
>>>>>> is.
>>>>>>
>>>>>> In this case, it seems that global failover and partial failover
>>>>>> share the same distribution strategy If it has not been assigned to a
>>>>>> different reader. However, global failover needs to be redistributed(this
>>>>>> is why we need this FLIP) , while partial failover is not. I have no idea
>>>>>> how we distinguish them.
>>>>>>
>>>>>> What do you think?
>>>>>>
>>>>>> Best,
>>>>>> Hongshun
>>>>>>
>>>>>> On Sat, Oct 11, 2025 at 12:54 AM Becket Qin <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Hongshun,
>>>>>>>
>>>>>>> The problem we are trying to solve here is to give the splits back
>>>>>>> to the SplitEnumerator. There are only two types of splits to give back:
>>>>>>> 1) splits whose assignment has been checkpointed. - In this case, we
>>>>>>> rely on addReader() + SplitEnumeratorContext to give the splits back, 
>>>>>>> this
>>>>>>> provides more information associated with those splits.
>>>>>>> 2) splits whose assignment has not been checkpointed. -  In this
>>>>>>> case, we use addSplitsBack(), there is no reader info to give because 
>>>>>>> the
>>>>>>> previous assignment did not take effect to begin with.
>>>>>>>
>>>>>>> From the SplitEnumerator implementation perspective, the contract is
>>>>>>> straightforward.
>>>>>>> 1. The SplitEnumerator is the source of truth for assignment.
>>>>>>> 2. When the enumerator receives the addSplits() call, it always add
>>>>>>> these splits back to unassigned splits / pending splits.
>>>>>>> 3. When the enumerator receives the addReader() call, that means the
>>>>>>> reader has no current assignment, and has returned its previous 
>>>>>>> assignment
>>>>>>> based on the reader side info. The SplitEnumerator checks the
>>>>>>> SplitEnumeratorContext to retrieve the returned splits from that reader
>>>>>>> (i.e. previous assignment) and handle them according to its own source 
>>>>>>> of
>>>>>>> truth knowledge of assignment - Ignore a returned split if it has been
>>>>>>> assigned to a different reader, otherwise put it back to unassigned 
>>>>>>> splits
>>>>>>> / pending splits. Then the enumerator assigns new splits to the newly 
>>>>>>> added
>>>>>>> reader, which may use the previous assignment as a reference. This 
>>>>>>> should
>>>>>>> work regardless of whether it is a global failover, partial failover,
>>>>>>> restart, etc. There is no need for the SplitEnumerator to distinguish 
>>>>>>> what
>>>>>>> failover scenario it is.
>>>>>>>
>>>>>>> Would this work?
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Jiangjie (Becket) Qin
>>>>>>>
>>>>>>> On Fri, Oct 10, 2025 at 1:28 AM Hongshun Wang <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Hi Becket,
>>>>>>>>  > why do we need to change the behavior of addSplitsBack()? Should
>>>>>>>> it remain the same?
>>>>>>>>
>>>>>>>> How does the enumerator get the splits from ReaderRegistrationEvent
>>>>>>>> and then reassign it?
>>>>>>>>
>>>>>>>> You have given a advice before:
>>>>>>>> > 1. Put all the reader information in the SplitEnumerator
>>>>>>>> context.  2. notify the enumerator about the new reader registration. 
>>>>>>>> 3.
>>>>>>>> let the split enumerator get whatever information it wants from the 
>>>>>>>> context
>>>>>>>> and do its job.
>>>>>>>>
>>>>>>>> However, each time a source task fails over, the
>>>>>>>> ConcurrentMap<Integer, ConcurrentMap<Integer, ReaderInfo>>
>>>>>>>> registeredReaders will remove this reader infos. When the source task 
>>>>>>>> is
>>>>>>>> registered again, it will be added again. *Thus, registeredReaders
>>>>>>>> cannot know whether is registered before. *
>>>>>>>>
>>>>>>>> Therefore, registeredReaders enumerator#addReader does not
>>>>>>>> distinguish the following situations:
>>>>>>>> However, each time one source task is failover. The
>>>>>>>> `ConcurrentMap<Integer, ConcurrentMap<Integer, ReaderInfo>>
>>>>>>>> registeredReaders` will remove this source. When source Task is 
>>>>>>>> registered
>>>>>>>> again, enumerator#addReader not distinguished three situations:
>>>>>>>> 1. The Reader is registered when the global restart. In this case,
>>>>>>>> redistribution the split from the infos. (take off all the splits from
>>>>>>>> ReaderInfo).
>>>>>>>> 2. The Reader is registered when a partial failover(before the
>>>>>>>> first successful checkpoint). In this case,  ignore the split from the
>>>>>>>> infos. (leave alone all the splits from ReaderInfo).
>>>>>>>> 3. The Reader is registered when a partial failover(after the first
>>>>>>>> successful checkpoint).In this case, we need assign the split to same
>>>>>>>> reader again. (take off all the splits from ReaderInfo but assigned to 
>>>>>>>> it
>>>>>>>> again).
>>>>>>>> we still need the enumerator to distinguish them (using
>>>>>>>> pendingSplitAssignment & assignedSplitAssignment. However, it is 
>>>>>>>> redundant
>>>>>>>> to maintain split assigned information both in the enumerator and the
>>>>>>>> enumerator context.
>>>>>>>>
>>>>>>>> I think if we change the behavior of addSplitsBack, it will be more
>>>>>>>> simple. Just let the enumerator to handle these split based on 
>>>>>>>> pendingSplitAssignment
>>>>>>>> & assignedSplitments.
>>>>>>>>
>>>>>>>> What do you think?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Hongshun
>>>>>>>>
>>>>>>>> On Fri, Oct 10, 2025 at 12:55 PM Becket Qin <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Hongshun,
>>>>>>>>>
>>>>>>>>> Thanks for updating the FLIP. A quick question: why do we need to
>>>>>>>>> change the behavior of addSplitsBack()? Should it remain the same?
>>>>>>>>>
>>>>>>>>> Regarding the case of restart with changed subscription. I think
>>>>>>>>> the only correct behavior is removing obsolete splits without any 
>>>>>>>>> warning /
>>>>>>>>> exception. It is OK to add an info level logging if we want to. It is 
>>>>>>>>> a
>>>>>>>>> clear intention if the user has explicitly changed subscription and
>>>>>>>>> restarted the job. There is no need to add a config to double confirm.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>>
>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>
>>>>>>>>> On Thu, Oct 9, 2025 at 7:28 PM Hongshun Wang <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Leonard,
>>>>>>>>>>
>>>>>>>>>> If the SplitEnumerator received all splits after a restart, it
>>>>>>>>>> becomes straightforward to clear and un-assign the unmatched
>>>>>>>>>> splits(checking whether matches the source options). However, a key
>>>>>>>>>> question arises: *should  automatically discard obsolete splits,
>>>>>>>>>> or explicitly notify the user via an exception?*
>>>>>>>>>>
>>>>>>>>>> We provided a option `scan.partition-unsubscribe.strategy`:
>>>>>>>>>> 1. If Strict, throws an exception when encountering removed
>>>>>>>>>> splits.
>>>>>>>>>> 2. If Lenient, automatically removes obsolete splits silently.
>>>>>>>>>>
>>>>>>>>>> What Do you think?
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Hongshun
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 9, 2025 at 9:37 PM Leonard Xu <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks hongshun for the updating and pretty detailed analysis
>>>>>>>>>>> for edge cases,  the updated FLIP looks good to me now.
>>>>>>>>>>>
>>>>>>>>>>> Only last implementation details about scenario in motivation
>>>>>>>>>>> section:
>>>>>>>>>>>
>>>>>>>>>>> *Restart with Changed subscription: During restart, if source
>>>>>>>>>>> options remove a topic or table. The splits which have already 
>>>>>>>>>>> assigned can
>>>>>>>>>>> not be removed.*
>>>>>>>>>>>
>>>>>>>>>>> Could you clarify how we resolve this in Kafka connector ?
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Leonard
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 2025 10月 9 19:48,Hongshun Wang <[email protected]> 写道:
>>>>>>>>>>>
>>>>>>>>>>> Hi devs,
>>>>>>>>>>> If there are no further suggestions, I will start the voting
>>>>>>>>>>> tomorrow。
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Hongshun
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Sep 26, 2025 at 7:48 PM Hongshun Wang <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Becket and Leonard,
>>>>>>>>>>>>
>>>>>>>>>>>> I have updated the content of this FLIP. The key point is that:
>>>>>>>>>>>>
>>>>>>>>>>>> When the split enumerator receives a split, *these splits must
>>>>>>>>>>>> have already existed in pendingSplitAssignment or 
>>>>>>>>>>>> assignedSplitments*
>>>>>>>>>>>> .
>>>>>>>>>>>>
>>>>>>>>>>>>    - If the split is in pendingSplitAssignments, ignore it.
>>>>>>>>>>>>    - If the split is in assignedSplitAssignments but has a
>>>>>>>>>>>>    different taskId, ignore it (this indicates it was already
>>>>>>>>>>>>    assigned to another task).
>>>>>>>>>>>>    - If the split is in assignedSplitAssignments and shares
>>>>>>>>>>>>    the same taskId, move the assignment from assignedSplitments
>>>>>>>>>>>>     to pendingSplitAssignment to re-assign again.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> For better understanding why use these strategies. I added some
>>>>>>>>>>>> examples and pictures to show it.
>>>>>>>>>>>>
>>>>>>>>>>>> Would you like to help me check whether there are still some
>>>>>>>>>>>> problems?
>>>>>>>>>>>>
>>>>>>>>>>>> Best
>>>>>>>>>>>> Hongshun
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Sep 26, 2025 at 5:08 PM Leonard Xu <[email protected]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks Becket and Hongshun for the insightful discussion.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The underlying implementation and communication mechanisms of
>>>>>>>>>>>>> Flink Source indeed involve many intricate details, we discussed 
>>>>>>>>>>>>> the issue
>>>>>>>>>>>>> of splits re-assignment in specific scenarios, but fortunately, 
>>>>>>>>>>>>> the final
>>>>>>>>>>>>> decision turned out to be pretty clear.
>>>>>>>>>>>>>
>>>>>>>>>>>>>  +1 to Becket’s proposal to keeps the framework cleaner and
>>>>>>>>>>>>> more flexible.
>>>>>>>>>>>>> +1 to Hongshun’s point to provide comprehensive guidance for
>>>>>>>>>>>>> connector developers.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Leonard
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2025 9月 26 16:30,Hongshun Wang <[email protected]> 写道:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Becket,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I Got it. You’re suggesting we should not handle this in the
>>>>>>>>>>>>> source framework but instead let the split enumerator manage 
>>>>>>>>>>>>> these three
>>>>>>>>>>>>> scenarios.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Let me explain why I originally favored handling it in the
>>>>>>>>>>>>> framework: I'm concerned that connector developers might overlook 
>>>>>>>>>>>>> certain
>>>>>>>>>>>>> edge cases (after all, we even payed extensive discussions to 
>>>>>>>>>>>>> fully clarify
>>>>>>>>>>>>> the logic)
>>>>>>>>>>>>>
>>>>>>>>>>>>> However, your point keeps the framework cleaner and more
>>>>>>>>>>>>> flexible. Thus, I will take it.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Perhaps, in this FLIP, we should focus on providing
>>>>>>>>>>>>> comprehensive guidance for connector developers: explain how
>>>>>>>>>>>>> to implement a split enumerator, including the underlying 
>>>>>>>>>>>>> challenges and
>>>>>>>>>>>>> their solutions.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Additionally, we can use the Kafka connector as a reference
>>>>>>>>>>>>> implementation to demonstrate the practical steps. This way, 
>>>>>>>>>>>>> developers who
>>>>>>>>>>>>> want to implement similar connectors can directly reference this 
>>>>>>>>>>>>> example.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Hongshun
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Sep 26, 2025 at 1:27 PM Becket Qin <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> It would be good to not expose runtime details to the source
>>>>>>>>>>>>>> implementation if possible.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Today, the split enumerator implementations are expected to
>>>>>>>>>>>>>> track the split assignment.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Assuming the split enumerator implementation keeps a split
>>>>>>>>>>>>>> assignment map, that means the enumerator should already know 
>>>>>>>>>>>>>> whether a
>>>>>>>>>>>>>> split is assigned or unassigned. So it can handle the three 
>>>>>>>>>>>>>> scenarios you
>>>>>>>>>>>>>> mentioned.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The split is reported by a reader during a global restoration.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The split enumerator should have just been restored /
>>>>>>>>>>>>>> created. If the enumerator expects a full reassignment of splits 
>>>>>>>>>>>>>> up on
>>>>>>>>>>>>>> global recovery, there should be no assigned splits to that 
>>>>>>>>>>>>>> reader in the
>>>>>>>>>>>>>> split assignment mapping.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The split is reported by a reader during a partial failure
>>>>>>>>>>>>>>> recovery.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> In this case, when SplitEnumerator.addReader() is invoked,
>>>>>>>>>>>>>> the split assignment map in the enumerator implementation should 
>>>>>>>>>>>>>> already
>>>>>>>>>>>>>> have some split assignments for the reader. Therefore it is a 
>>>>>>>>>>>>>> partial
>>>>>>>>>>>>>> failover. If the source supports split reassignment on recovery, 
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> enumerator can assign splits that are different from the reported
>>>>>>>>>>>>>> assignment of that reader in the SplitEnumeratorContext, or it 
>>>>>>>>>>>>>> can also
>>>>>>>>>>>>>> assign the same splits. In any case, the enumerator knows that 
>>>>>>>>>>>>>> this is a
>>>>>>>>>>>>>> partial recovery because the assignment map is non-empty.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The split is not reported by a reader, but is assigned after
>>>>>>>>>>>>>>> the last successful checkpoint and was never acknowledged.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This is actually one of the step in the partial failure
>>>>>>>>>>>>>> recover. SplitEnumerator.addSplitsBack() will be called first 
>>>>>>>>>>>>>> before
>>>>>>>>>>>>>> SplitReader.addReader() is called for the recovered reader. When 
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> SplitEnumerator.addSplitsBack() is invoked, it is for sure a 
>>>>>>>>>>>>>> partial
>>>>>>>>>>>>>> recovery. And the enumerator should remove these splits from the 
>>>>>>>>>>>>>> split
>>>>>>>>>>>>>> assignment map as if they were never assigned.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I think this should work, right?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Sep 25, 2025 at 8:34 PM Hongshun Wang <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Becket and Leonard,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks for your advice.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> > put all the reader information in the SplitEnumerator
>>>>>>>>>>>>>>> context
>>>>>>>>>>>>>>> I have a concern: the current registeredReaders in*
>>>>>>>>>>>>>>> SourceCoordinatorContext will be removed after subtaskResetor 
>>>>>>>>>>>>>>> execution on
>>>>>>>>>>>>>>> failure*.However, this approach has merit.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> One more situation I found my previous design does not cover:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>    1. Initial state: Reader A reports splits (1, 2).
>>>>>>>>>>>>>>>    2. Enumerator action: Assigns split 1 to Reader A, and
>>>>>>>>>>>>>>>    split 2 to Reader B.
>>>>>>>>>>>>>>>    3. Failure scenario: Reader A fails before
>>>>>>>>>>>>>>>    checkpointing. Since this is a partial failure, only Reader 
>>>>>>>>>>>>>>> A restarts.
>>>>>>>>>>>>>>>    4. Recovery issue: Upon recovery, Reader A re-reports
>>>>>>>>>>>>>>>    split (1).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> In my previous design, the enumerator will ignore Reader A's
>>>>>>>>>>>>>>> re-registration which will cause data loss.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thus, when the enumerator receives a split, the split may
>>>>>>>>>>>>>>> originate from three scenarios:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>    1. The split is reported by a reader during a global
>>>>>>>>>>>>>>>    restoration.
>>>>>>>>>>>>>>>    2. The split is reported by a reader during a partial
>>>>>>>>>>>>>>>    failure recovery.
>>>>>>>>>>>>>>>    3. The split is not reported by a reader, but is
>>>>>>>>>>>>>>>    assigned after the last successful checkpoint and was never 
>>>>>>>>>>>>>>> acknowledged.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> In the first scenario (global restore), the split should
>>>>>>>>>>>>>>> be re-distributed. For the latter two scenarios (partial 
>>>>>>>>>>>>>>> failover and
>>>>>>>>>>>>>>> post-checkpoint assignment), we need to reassign the split to
>>>>>>>>>>>>>>> its originally assigned subtask.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> By implementing a method in the SplitEnumerator context to
>>>>>>>>>>>>>>> track each assigned split's status, the system can correctly 
>>>>>>>>>>>>>>> identify and
>>>>>>>>>>>>>>> resolve split ownership in all three scenarios.*What about
>>>>>>>>>>>>>>> adding a  `SplitRecoveryType splitRecoveryType(Split split)` in
>>>>>>>>>>>>>>> SplitEnumeratorContext.* SplitRecoveryTypeis a enum
>>>>>>>>>>>>>>> including `UNASSIGNED`、`GLOBAL_RESTORE`、`PARTIAL_FAILOVER` and
>>>>>>>>>>>>>>> `POST_CHECKPOINT_ASSIGNMENT`.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> What do you think? Are there any details or scenarios I
>>>>>>>>>>>>>>> haven't considered? Looking forward to your advice.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>> Hongshun
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Sep 11, 2025 at 12:41 AM Becket Qin <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks for the explanation, Hongshun.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Current pattern of handling new reader registration
>>>>>>>>>>>>>>>> following:
>>>>>>>>>>>>>>>> 1. put all the reader information in the SplitEnumerator
>>>>>>>>>>>>>>>> context
>>>>>>>>>>>>>>>> 2. notify the enumerator about the new reader registration.
>>>>>>>>>>>>>>>> 3. Let the split enumerator get whatever information it
>>>>>>>>>>>>>>>> wants from the
>>>>>>>>>>>>>>>> context and do its job.
>>>>>>>>>>>>>>>> This pattern decouples the information passing and the
>>>>>>>>>>>>>>>> reader registration
>>>>>>>>>>>>>>>> notification. This makes the API extensible - we can add
>>>>>>>>>>>>>>>> more information
>>>>>>>>>>>>>>>> (e.g. reported assigned splits in our case) about the
>>>>>>>>>>>>>>>> reader to the context
>>>>>>>>>>>>>>>> without introducing new methods.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Introducing a new method of addSplitBackOnRecovery() is
>>>>>>>>>>>>>>>> redundant to the
>>>>>>>>>>>>>>>> above pattern. Do we really need it?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Mon, Sep 8, 2025 at 8:18 PM Hongshun Wang <
>>>>>>>>>>>>>>>> [email protected]>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> > Hi Becket,
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> > > I am curious what would the enumerator do differently
>>>>>>>>>>>>>>>> for the splits
>>>>>>>>>>>>>>>> > added via addSplitsBackOnRecovery() V.S. addSplitsBack()?
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> >  In this FLIP, there are two distinct scenarios in which
>>>>>>>>>>>>>>>> the enumerator
>>>>>>>>>>>>>>>> > receives splits being added back:
>>>>>>>>>>>>>>>> > 1.  Job-level restore: The job is restored,  splits from
>>>>>>>>>>>>>>>> reader’s state are
>>>>>>>>>>>>>>>> > reported by ReaderRegistrationEvent.
>>>>>>>>>>>>>>>> > 2.  Reader-level restart: a reader is started but not the
>>>>>>>>>>>>>>>> whole  job,
>>>>>>>>>>>>>>>> >  splits assigned to it after the last successful
>>>>>>>>>>>>>>>> checkpoint. This is what
>>>>>>>>>>>>>>>> > addSplitsBack used to do.
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> > In these two situations, the enumerator will choose
>>>>>>>>>>>>>>>> different strategies.
>>>>>>>>>>>>>>>> > 1. Job-level restore: the splits should be redistributed
>>>>>>>>>>>>>>>> across readers
>>>>>>>>>>>>>>>> > according to the current partitioner strategy.
>>>>>>>>>>>>>>>> > 2. Reader-level restart: the splits should be reassigned
>>>>>>>>>>>>>>>> directly back to
>>>>>>>>>>>>>>>> > the same reader they were originally assigned to,
>>>>>>>>>>>>>>>> preserving locality and
>>>>>>>>>>>>>>>> > avoiding unnecessary redistribution
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> > Therefore, the enumerator must clearly distinguish
>>>>>>>>>>>>>>>> between these two
>>>>>>>>>>>>>>>> > scenarios.I used to deprecate the former
>>>>>>>>>>>>>>>> addSplitsBack(List<SplitT>
>>>>>>>>>>>>>>>> > splits, int subtaskId) but add a new
>>>>>>>>>>>>>>>> addSplitsBack(List<SplitT>
>>>>>>>>>>>>>>>> > splits, int subtaskId,
>>>>>>>>>>>>>>>> > boolean reportedByReader).
>>>>>>>>>>>>>>>> > Leonard suggest to use another method
>>>>>>>>>>>>>>>> addSplitsBackOnRecovery but not
>>>>>>>>>>>>>>>> > influenced  currently addSplitsBack.
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> > Best
>>>>>>>>>>>>>>>> > Hongshun
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> > On 2025/09/08 17:20:31 Becket Qin wrote:
>>>>>>>>>>>>>>>> > > Hi Leonard,
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > > > Could we introduce a new method like
>>>>>>>>>>>>>>>> addSplitsBackOnRecovery  with
>>>>>>>>>>>>>>>> > default
>>>>>>>>>>>>>>>> > > > implementation. In this way, we can provide better
>>>>>>>>>>>>>>>> backward
>>>>>>>>>>>>>>>> > compatibility
>>>>>>>>>>>>>>>> > > > and also makes it easier for developers to understand.
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > > I am curious what would the enumerator do differently
>>>>>>>>>>>>>>>> for the splits
>>>>>>>>>>>>>>>> > added
>>>>>>>>>>>>>>>> > > via addSplitsBackOnRecovery() V.S. addSplitsBack()?
>>>>>>>>>>>>>>>> Today,
>>>>>>>>>>>>>>>> > addSplitsBack()
>>>>>>>>>>>>>>>> > > is also only called upon recovery. So the new method
>>>>>>>>>>>>>>>> seems confusing. One
>>>>>>>>>>>>>>>> > > thing worth clarifying is if the Source implements
>>>>>>>>>>>>>>>> > > SupportSplitReassignmentOnRecovery, upon recovery,
>>>>>>>>>>>>>>>> should the splits
>>>>>>>>>>>>>>>> > > reported by the readers also be added back to the
>>>>>>>>>>>>>>>> SplitEnumerator via the
>>>>>>>>>>>>>>>> > > addSplitsBack() call? Or should the SplitEnumerator
>>>>>>>>>>>>>>>> explicitly query the
>>>>>>>>>>>>>>>> > > registered reader information via the
>>>>>>>>>>>>>>>> SplitEnumeratorContext to get the
>>>>>>>>>>>>>>>> > > originally assigned splits when addReader() is invoked?
>>>>>>>>>>>>>>>> I was assuming
>>>>>>>>>>>>>>>> > the
>>>>>>>>>>>>>>>> > > latter in the beginning, so the behavior of
>>>>>>>>>>>>>>>> addSplitsBack() remains
>>>>>>>>>>>>>>>> > > unchanged, but I am not opposed in doing the former.
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > > Also, can you elaborate on the backwards compatibility
>>>>>>>>>>>>>>>> issue you see if
>>>>>>>>>>>>>>>> > we
>>>>>>>>>>>>>>>> > > do not have a separate addSplitsBackOnRecovery()
>>>>>>>>>>>>>>>> method? Even without
>>>>>>>>>>>>>>>> > this
>>>>>>>>>>>>>>>> > > new method, the behavior remains exactly the same
>>>>>>>>>>>>>>>> unless the end users
>>>>>>>>>>>>>>>> > > implement the mix-in interface of
>>>>>>>>>>>>>>>> "SupportSplitReassignmentOnRecovery",
>>>>>>>>>>>>>>>> > > right?
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > > Thanks,
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > > Jiangjie (Becket) Qin
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > > On Mon, Sep 8, 2025 at 1:48 AM Hongshun Wang <
>>>>>>>>>>>>>>>> [email protected]>
>>>>>>>>>>>>>>>> > > wrote:
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > > > Hi devs,
>>>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>>>> > > > It has been quite some time since this FLIP[1] was
>>>>>>>>>>>>>>>> first proposed.
>>>>>>>>>>>>>>>> > Thank
>>>>>>>>>>>>>>>> > > > you for your valuable feedback—based on your
>>>>>>>>>>>>>>>> suggestions, the FLIP has
>>>>>>>>>>>>>>>> > > > undergone several rounds of revisions.
>>>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>>>> > > > Any more advice is welcome and appreciated. If there
>>>>>>>>>>>>>>>> are no further
>>>>>>>>>>>>>>>> > > > concerns, I plan to start the vote tomorrow.
>>>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>>>> > > > Best
>>>>>>>>>>>>>>>> > > > Hongshun
>>>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>>>> > > > [1]
>>>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=373886480
>>>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>>>> > > > On Mon, Sep 8, 2025 at 4:42 PM Hongshun Wang <
>>>>>>>>>>>>>>>> [email protected]>
>>>>>>>>>>>>>>>> > > > wrote:
>>>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>>>> > > > > Hi Leonard,
>>>>>>>>>>>>>>>> > > > > Thanks for your advice.  It makes sense and I have
>>>>>>>>>>>>>>>> modified it.
>>>>>>>>>>>>>>>> > > > >
>>>>>>>>>>>>>>>> > > > > Best,
>>>>>>>>>>>>>>>> > > > > Hongshun
>>>>>>>>>>>>>>>> > > > >
>>>>>>>>>>>>>>>> > > > > On Mon, Sep 8, 2025 at 11:40 AM Leonard Xu <
>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>> > > > >
>>>>>>>>>>>>>>>> > > > >> Thanks Hongshun and Becket for the deep discussion.
>>>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>>>> > > > >> I only have one comment for one API design:
>>>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>>>> > > > >> > Deprecate the old addSplitsBack  method, use a
>>>>>>>>>>>>>>>> addSplitsBack with
>>>>>>>>>>>>>>>> > > > >> param isReportedByReader instead. Because, The
>>>>>>>>>>>>>>>> enumerator can apply
>>>>>>>>>>>>>>>> > > > >> different reassignment policies based on the
>>>>>>>>>>>>>>>> context.
>>>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>>>> > > > >> Could we introduce a new method like
>>>>>>>>>>>>>>>> *addSplitsBackOnRecovery*  with
>>>>>>>>>>>>>>>> > > > default
>>>>>>>>>>>>>>>> > > > >> implementation. In this way, we can provide better
>>>>>>>>>>>>>>>> backward
>>>>>>>>>>>>>>>> > > > >> compatibility and also makes it easier for
>>>>>>>>>>>>>>>> developers to understand.
>>>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>>>> > > > >> Best,
>>>>>>>>>>>>>>>> > > > >> Leonard
>>>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>>>> > > > >> 2025 9月 3 20:26,Hongshun Wang <[email protected]>
>>>>>>>>>>>>>>>> 写道:
>>>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>>>> > > > >> Hi Becket,
>>>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>>>> > > > >> I think that's a great idea!  I have added the
>>>>>>>>>>>>>>>> > > > >> SupportSplitReassignmentOnRecovery interface in
>>>>>>>>>>>>>>>> this FLIP. If a
>>>>>>>>>>>>>>>> > Source
>>>>>>>>>>>>>>>> > > > >> implements this interface indicates that the
>>>>>>>>>>>>>>>> source operator needs
>>>>>>>>>>>>>>>> > to
>>>>>>>>>>>>>>>> > > > >> report splits to the enumerator and receive
>>>>>>>>>>>>>>>> reassignment.[1]
>>>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>>>> > > > >> Best,
>>>>>>>>>>>>>>>> > > > >> Hongshun
>>>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>>>> > > > >> [1]
>>>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>>>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>>>> > > > >> On Thu, Aug 21, 2025 at 12:09 PM Becket Qin <
>>>>>>>>>>>>>>>> [email protected]>
>>>>>>>>>>>>>>>> > > > wrote:
>>>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>>>> > > > >>> Hi Hongshun,
>>>>>>>>>>>>>>>> > > > >>>
>>>>>>>>>>>>>>>> > > > >>> I think the convention for such optional features
>>>>>>>>>>>>>>>> in Source is via
>>>>>>>>>>>>>>>> > > > >>> mix-in interfaces. So instead of adding a method
>>>>>>>>>>>>>>>> to the
>>>>>>>>>>>>>>>> > SourceReader,
>>>>>>>>>>>>>>>> > > > maybe
>>>>>>>>>>>>>>>> > > > >>> we should introduce an interface
>>>>>>>>>>>>>>>> SupportSplitReassingmentOnRecovery
>>>>>>>>>>>>>>>> > > > with
>>>>>>>>>>>>>>>> > > > >>> this method. If a Source implementation
>>>>>>>>>>>>>>>> implements that interface,
>>>>>>>>>>>>>>>> > > > then the
>>>>>>>>>>>>>>>> > > > >>> SourceOperator will check the desired behavior
>>>>>>>>>>>>>>>> and act accordingly.
>>>>>>>>>>>>>>>> > > > >>>
>>>>>>>>>>>>>>>> > > > >>> Thanks,
>>>>>>>>>>>>>>>> > > > >>>
>>>>>>>>>>>>>>>> > > > >>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>>>> > > > >>>
>>>>>>>>>>>>>>>> > > > >>> On Wed, Aug 20, 2025 at 8:52 PM Hongshun Wang <
>>>>>>>>>>>>>>>> > [email protected]
>>>>>>>>>>>>>>>> > > > >
>>>>>>>>>>>>>>>> > > > >>> wrote:
>>>>>>>>>>>>>>>> > > > >>>
>>>>>>>>>>>>>>>> > > > >>>> Hi de vs,
>>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>>> > > > >>>> Would anyone like to discuss this FLIP? I'd
>>>>>>>>>>>>>>>> appreciate your
>>>>>>>>>>>>>>>> > feedback
>>>>>>>>>>>>>>>> > > > >>>> and suggestions.
>>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>>> > > > >>>> Best,
>>>>>>>>>>>>>>>> > > > >>>> Hongshun
>>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>>> > > > >>>> 2025年8月13日 14:23,Hongshun Wang <[email protected]>
>>>>>>>>>>>>>>>> 写道:
>>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>>> > > > >>>> Hi Becket,
>>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>>> > > > >>>> Thank you for your detailed feedback. The new
>>>>>>>>>>>>>>>> contract makes good
>>>>>>>>>>>>>>>> > > > sense
>>>>>>>>>>>>>>>> > > > >>>> to me and effectively addresses the issues I
>>>>>>>>>>>>>>>> encountered at the
>>>>>>>>>>>>>>>> > > > beginning
>>>>>>>>>>>>>>>> > > > >>>> of the design.
>>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>>> > > > >>>> That said, I recommend not reporting splits by
>>>>>>>>>>>>>>>> default, primarily
>>>>>>>>>>>>>>>> > for
>>>>>>>>>>>>>>>> > > > >>>> compatibility and practical reasons:
>>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>>> > > > >>>> >  For these reasons, we do not expect the Split
>>>>>>>>>>>>>>>> objects to be
>>>>>>>>>>>>>>>> > huge,
>>>>>>>>>>>>>>>> > > > >>>> and we are not trying to design for huge Split
>>>>>>>>>>>>>>>> objects either as
>>>>>>>>>>>>>>>> > they
>>>>>>>>>>>>>>>> > > > will
>>>>>>>>>>>>>>>> > > > >>>> have problems even today.
>>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>>> > > > >>>>    1.
>>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>>> > > > >>>>    Not all existing connector match this rule
>>>>>>>>>>>>>>>> > > > >>>>    For example, in mysql cdc connector, a binlog
>>>>>>>>>>>>>>>> split may contain
>>>>>>>>>>>>>>>> > > > >>>>    hundreds (or even more) snapshot split
>>>>>>>>>>>>>>>> completion records. This
>>>>>>>>>>>>>>>> > > > state is
>>>>>>>>>>>>>>>> > > > >>>>    large and is currently transmitted
>>>>>>>>>>>>>>>> incrementally through
>>>>>>>>>>>>>>>> > multiple
>>>>>>>>>>>>>>>> > > > >>>>    BinlogSplitMetaEvent messages. Since the
>>>>>>>>>>>>>>>> binlog reader operates
>>>>>>>>>>>>>>>> > > > >>>>    with single parallelism, reporting the full
>>>>>>>>>>>>>>>> split state on
>>>>>>>>>>>>>>>> > recovery
>>>>>>>>>>>>>>>> > > > >>>>    could be inefficient or even infeasible.
>>>>>>>>>>>>>>>> > > > >>>>    For such sources, it would be better to
>>>>>>>>>>>>>>>> provide a mechanism to
>>>>>>>>>>>>>>>> > skip
>>>>>>>>>>>>>>>> > > > >>>>    split reporting during restart until they
>>>>>>>>>>>>>>>> redesign and reduce
>>>>>>>>>>>>>>>> > the
>>>>>>>>>>>>>>>> > > > >>>>    split size.
>>>>>>>>>>>>>>>> > > > >>>>    2.
>>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>>> > > > >>>>    Not all enumerators maintain unassigned
>>>>>>>>>>>>>>>> splits in state.
>>>>>>>>>>>>>>>> > > > >>>>    Some SplitEnumerator(such as kafka connector)
>>>>>>>>>>>>>>>> implementations
>>>>>>>>>>>>>>>> > do
>>>>>>>>>>>>>>>> > > > >>>>    not track or persistently manage unassigned
>>>>>>>>>>>>>>>> splits. Requiring
>>>>>>>>>>>>>>>> > them
>>>>>>>>>>>>>>>> > > > to
>>>>>>>>>>>>>>>> > > > >>>>    handle re-registration would add unnecessary
>>>>>>>>>>>>>>>> complexity. Even
>>>>>>>>>>>>>>>> > > > though we
>>>>>>>>>>>>>>>> > > > >>>>    maybe implements in kafka connector,
>>>>>>>>>>>>>>>> currently, kafka connector
>>>>>>>>>>>>>>>> > is
>>>>>>>>>>>>>>>> > > > decouple
>>>>>>>>>>>>>>>> > > > >>>>    with flink version, we also need to make sure
>>>>>>>>>>>>>>>> the elder version
>>>>>>>>>>>>>>>> > is
>>>>>>>>>>>>>>>> > > > >>>>    compatible.
>>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>>> > > > >>>> ------------------------------
>>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>>> > > > >>>> To address these concerns, I propose introducing
>>>>>>>>>>>>>>>> a new method:
>>>>>>>>>>>>>>>> > boolean
>>>>>>>>>>>>>>>> > > > >>>> SourceReader#shouldReassignSplitsOnRecovery()
>>>>>>>>>>>>>>>> with a default
>>>>>>>>>>>>>>>> > > > >>>> implementation returning false. This allows
>>>>>>>>>>>>>>>> source readers to opt
>>>>>>>>>>>>>>>> > in
>>>>>>>>>>>>>>>> > > > >>>> to split reassignment only when necessary. Since
>>>>>>>>>>>>>>>> the new contract
>>>>>>>>>>>>>>>> > > > already
>>>>>>>>>>>>>>>> > > > >>>> places the responsibility for split assignment
>>>>>>>>>>>>>>>> on the enumerator,
>>>>>>>>>>>>>>>> > not
>>>>>>>>>>>>>>>> > > > >>>> reporting splits by default is a safe and clean
>>>>>>>>>>>>>>>> default behavior.
>>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>>> > > > >>>> ------------------------------
>>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>>> > > > >>>> I’ve updated the implementation and the FIP
>>>>>>>>>>>>>>>> accordingly[1]. It
>>>>>>>>>>>>>>>> > quite a
>>>>>>>>>>>>>>>> > > > >>>> big change. In particular, for the Kafka
>>>>>>>>>>>>>>>> connector, we can now use
>>>>>>>>>>>>>>>> > a
>>>>>>>>>>>>>>>> > > > >>>> pluggable SplitPartitioner to support different
>>>>>>>>>>>>>>>> split assignment
>>>>>>>>>>>>>>>> > > > >>>> strategies (e.g., default, round-robin).
>>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>>> > > > >>>> Could you please review it when you have a
>>>>>>>>>>>>>>>> chance?
>>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>>> > > > >>>> Best,
>>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>>> > > > >>>> Hongshun
>>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>>> > > > >>>> [1]
>>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>>> > > > >>>> On Sat, Aug 9, 2025 at 3:03 AM Becket Qin <
>>>>>>>>>>>>>>>> [email protected]>
>>>>>>>>>>>>>>>> > > > wrote:
>>>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>>>> > > > >>>>> Hi Hongshun,
>>>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>>>> > > > >>>>> I am not too concerned about the transmission
>>>>>>>>>>>>>>>> cost. Because the
>>>>>>>>>>>>>>>> > full
>>>>>>>>>>>>>>>> > > > >>>>> split transmission has to happen in the initial
>>>>>>>>>>>>>>>> assignment phase
>>>>>>>>>>>>>>>> > > > already.
>>>>>>>>>>>>>>>> > > > >>>>> And in the future, we probably want to also
>>>>>>>>>>>>>>>> introduce some kind
>>>>>>>>>>>>>>>> > of
>>>>>>>>>>>>>>>> > > > workload
>>>>>>>>>>>>>>>> > > > >>>>> balance across source readers, e.g. based on
>>>>>>>>>>>>>>>> the per-split
>>>>>>>>>>>>>>>> > > > throughput or
>>>>>>>>>>>>>>>> > > > >>>>> the per-source-reader workload in heterogeneous
>>>>>>>>>>>>>>>> clusters. For
>>>>>>>>>>>>>>>> > these
>>>>>>>>>>>>>>>> > > > >>>>> reasons, we do not expect the Split objects to
>>>>>>>>>>>>>>>> be huge, and we
>>>>>>>>>>>>>>>> > are
>>>>>>>>>>>>>>>> > > > not
>>>>>>>>>>>>>>>> > > > >>>>> trying to design for huge Split objects either
>>>>>>>>>>>>>>>> as they will have
>>>>>>>>>>>>>>>> > > > problems
>>>>>>>>>>>>>>>> > > > >>>>> even today.
>>>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>>>> > > > >>>>> Good point on the potential split loss, please
>>>>>>>>>>>>>>>> see the reply
>>>>>>>>>>>>>>>> > below:
>>>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>>>> > > > >>>>> Scenario 2:
>>>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>>>> > > > >>>>>> 1. Reader A reports splits (1 and 2), and
>>>>>>>>>>>>>>>> Reader B reports (3
>>>>>>>>>>>>>>>> > and 4)
>>>>>>>>>>>>>>>> > > > >>>>>> upon restart.
>>>>>>>>>>>>>>>> > > > >>>>>> 2. Before the enumerator receives all reports
>>>>>>>>>>>>>>>> and performs
>>>>>>>>>>>>>>>> > > > >>>>>> reassignment, a checkpoint is triggered.
>>>>>>>>>>>>>>>> > > > >>>>>> 3. Since no splits have been reassigned yet,
>>>>>>>>>>>>>>>> both readers have
>>>>>>>>>>>>>>>> > empty
>>>>>>>>>>>>>>>> > > > >>>>>> states.
>>>>>>>>>>>>>>>> > > > >>>>>> 4. When restarting from this checkpoint, all
>>>>>>>>>>>>>>>> four splits are
>>>>>>>>>>>>>>>> > lost.
>>>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>>>> > > > >>>>> The reader registration happens in the
>>>>>>>>>>>>>>>> SourceOperator.open(),
>>>>>>>>>>>>>>>> > which
>>>>>>>>>>>>>>>> > > > >>>>> means the task is still in the initializing
>>>>>>>>>>>>>>>> state, therefore the
>>>>>>>>>>>>>>>> > > > checkpoint
>>>>>>>>>>>>>>>> > > > >>>>> should not be triggered until the enumerator
>>>>>>>>>>>>>>>> receives all the
>>>>>>>>>>>>>>>> > split
>>>>>>>>>>>>>>>> > > > reports.
>>>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>>>> > > > >>>>> There is a nuance here. Today, the RPC call
>>>>>>>>>>>>>>>> from the TM to the JM
>>>>>>>>>>>>>>>> > is
>>>>>>>>>>>>>>>> > > > >>>>> async. So it is possible that the
>>>>>>>>>>>>>>>> SourceOpertor.open() has
>>>>>>>>>>>>>>>> > returned,
>>>>>>>>>>>>>>>> > > > but
>>>>>>>>>>>>>>>> > > > >>>>> the enumerator has not received the split
>>>>>>>>>>>>>>>> reports. However,
>>>>>>>>>>>>>>>> > because
>>>>>>>>>>>>>>>> > > > the
>>>>>>>>>>>>>>>> > > > >>>>> task status update RPC call goes to the same
>>>>>>>>>>>>>>>> channel as the split
>>>>>>>>>>>>>>>> > > > reports
>>>>>>>>>>>>>>>> > > > >>>>> call, so the task status RPC call will happen
>>>>>>>>>>>>>>>> after the split
>>>>>>>>>>>>>>>> > > > reports call
>>>>>>>>>>>>>>>> > > > >>>>> on the JM side. Therefore, on the JM side, the
>>>>>>>>>>>>>>>> SourceCoordinator
>>>>>>>>>>>>>>>> > will
>>>>>>>>>>>>>>>> > > > >>>>> always first receive the split reports, then
>>>>>>>>>>>>>>>> receive the
>>>>>>>>>>>>>>>> > checkpoint
>>>>>>>>>>>>>>>> > > > request.
>>>>>>>>>>>>>>>> > > > >>>>> This "happen before" relationship is kind of
>>>>>>>>>>>>>>>> important to
>>>>>>>>>>>>>>>> > guarantee
>>>>>>>>>>>>>>>> > > > >>>>> the consistent state between enumerator and
>>>>>>>>>>>>>>>> readers.
>>>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>>>> > > > >>>>> Scenario 1:
>>>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>>>> > > > >>>>>> 1. Upon restart, Reader A reports assigned
>>>>>>>>>>>>>>>> splits (1 and 2), and
>>>>>>>>>>>>>>>> > > > >>>>>> Reader B reports (3 and 4).
>>>>>>>>>>>>>>>> > > > >>>>>> 2. The enumerator receives these reports but
>>>>>>>>>>>>>>>> only reassigns
>>>>>>>>>>>>>>>> > splits 1
>>>>>>>>>>>>>>>> > > > >>>>>> and 2 — not 3 and 4.
>>>>>>>>>>>>>>>> > > > >>>>>> 3. A checkpoint or savepoint is then
>>>>>>>>>>>>>>>> triggered. Only splits 1
>>>>>>>>>>>>>>>> > and 2
>>>>>>>>>>>>>>>> > > > >>>>>> are recorded in the reader states; splits 3
>>>>>>>>>>>>>>>> and 4 are not
>>>>>>>>>>>>>>>> > persisted.
>>>>>>>>>>>>>>>> > > > >>>>>> 4. If the job is later restarted from this
>>>>>>>>>>>>>>>> checkpoint, splits 3
>>>>>>>>>>>>>>>> > and
>>>>>>>>>>>>>>>> > > > 4
>>>>>>>>>>>>>>>> > > > >>>>>> will be permanently lost.
>>>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>>>> > > > >>>>> This scenario is possible. One solution is to
>>>>>>>>>>>>>>>> let the enumerator
>>>>>>>>>>>>>>>> > > > >>>>> implementation handle this. That means if the
>>>>>>>>>>>>>>>> enumerator relies
>>>>>>>>>>>>>>>> > on
>>>>>>>>>>>>>>>> > > > the
>>>>>>>>>>>>>>>> > > > >>>>> initial split reports from the source readers,
>>>>>>>>>>>>>>>> it should maintain
>>>>>>>>>>>>>>>> > > > these
>>>>>>>>>>>>>>>> > > > >>>>> reports by itself. In the above example, the
>>>>>>>>>>>>>>>> enumerator will need
>>>>>>>>>>>>>>>> > to
>>>>>>>>>>>>>>>> > > > >>>>> remember that 3 and 4 are not assigned and put
>>>>>>>>>>>>>>>> it into its own
>>>>>>>>>>>>>>>> > state.
>>>>>>>>>>>>>>>> > > > >>>>> The current contract is that anything assigned
>>>>>>>>>>>>>>>> to the
>>>>>>>>>>>>>>>> > SourceReaders
>>>>>>>>>>>>>>>> > > > >>>>> are completely owned by the SourceReaders.
>>>>>>>>>>>>>>>> Enumerators can
>>>>>>>>>>>>>>>> > remember
>>>>>>>>>>>>>>>> > > > the
>>>>>>>>>>>>>>>> > > > >>>>> assignments but cannot change them, even when
>>>>>>>>>>>>>>>> the source reader
>>>>>>>>>>>>>>>> > > > recovers /
>>>>>>>>>>>>>>>> > > > >>>>> restarts.
>>>>>>>>>>>>>>>> > > > >>>>> With this FLIP, the contract becomes that the
>>>>>>>>>>>>>>>> source readers will
>>>>>>>>>>>>>>>> > > > >>>>> return the ownership of the splits to the
>>>>>>>>>>>>>>>> enumerator. So the
>>>>>>>>>>>>>>>> > > > enumerator is
>>>>>>>>>>>>>>>> > > > >>>>> responsible for maintaining these splits, until
>>>>>>>>>>>>>>>> they are assigned
>>>>>>>>>>>>>>>> > to
>>>>>>>>>>>>>>>> > > > a
>>>>>>>>>>>>>>>> > > > >>>>> source reader again.
>>>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>>>> > > > >>>>> There are other cases where there may be
>>>>>>>>>>>>>>>> conflict information
>>>>>>>>>>>>>>>> > between
>>>>>>>>>>>>>>>> > > > >>>>> reader and enumerator. For example, consider
>>>>>>>>>>>>>>>> the following
>>>>>>>>>>>>>>>> > sequence:
>>>>>>>>>>>>>>>> > > > >>>>> 1. reader A reports splits (1 and 2) up on
>>>>>>>>>>>>>>>> restart.
>>>>>>>>>>>>>>>> > > > >>>>> 2. enumerator receives the report and assigns
>>>>>>>>>>>>>>>> both 1 and 2 to
>>>>>>>>>>>>>>>> > reader
>>>>>>>>>>>>>>>> > > > B.
>>>>>>>>>>>>>>>> > > > >>>>> 3. reader A failed before checkpointing. And
>>>>>>>>>>>>>>>> this is a partial
>>>>>>>>>>>>>>>> > > > >>>>> failure, so only reader A restarts.
>>>>>>>>>>>>>>>> > > > >>>>> 4. When reader A recovers, it will again report
>>>>>>>>>>>>>>>> splits (1 and 2)
>>>>>>>>>>>>>>>> > to
>>>>>>>>>>>>>>>> > > > >>>>> the enumerator.
>>>>>>>>>>>>>>>> > > > >>>>> 5. The enumerator should ignore this report
>>>>>>>>>>>>>>>> because it has
>>>>>>>>>>>>>>>> > > > >>>>> assigned splits (1 and 2) to reader B.
>>>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>>>> > > > >>>>> So with the new contract, the enumerator should
>>>>>>>>>>>>>>>> be the source of
>>>>>>>>>>>>>>>> > > > truth
>>>>>>>>>>>>>>>> > > > >>>>> for split ownership.
>>>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>>>> > > > >>>>> Thanks,
>>>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>>>> > > > >>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>>>> > > > >>>>> On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang <
>>>>>>>>>>>>>>>> > > > [email protected]>
>>>>>>>>>>>>>>>> > > > >>>>> wrote:
>>>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>>>> > > > >>>>>> Hi Becket,
>>>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>>>> > > > >>>>>> I did consider this approach at the beginning
>>>>>>>>>>>>>>>> (and it was also
>>>>>>>>>>>>>>>> > > > >>>>>> mentioned in this FLIP), since it would allow
>>>>>>>>>>>>>>>> more flexibility
>>>>>>>>>>>>>>>> > in
>>>>>>>>>>>>>>>> > > > >>>>>> reassigning all splits. However, there are a
>>>>>>>>>>>>>>>> few potential
>>>>>>>>>>>>>>>> > issues.
>>>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>>>> > > > >>>>>> 1. High Transmission Cost
>>>>>>>>>>>>>>>> > > > >>>>>> If we pass the full split objects (rather than
>>>>>>>>>>>>>>>> just split IDs),
>>>>>>>>>>>>>>>> > the
>>>>>>>>>>>>>>>> > > > >>>>>> data size could be significant, leading to
>>>>>>>>>>>>>>>> high overhead during
>>>>>>>>>>>>>>>> > > > >>>>>> transmission — especially when many splits are
>>>>>>>>>>>>>>>> involved.
>>>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>>>> > > > >>>>>> 2. Risk of Split Loss
>>>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>>>> > > > >>>>>> Risk of split loss exists unless we have a
>>>>>>>>>>>>>>>> mechanism to make
>>>>>>>>>>>>>>>> > sure
>>>>>>>>>>>>>>>> > > > >>>>>> only can checkpoint after all the splits are
>>>>>>>>>>>>>>>> reassigned.
>>>>>>>>>>>>>>>> > > > >>>>>> There are scenarios where splits could be lost
>>>>>>>>>>>>>>>> due to
>>>>>>>>>>>>>>>> > inconsistent
>>>>>>>>>>>>>>>> > > > >>>>>> state handling during recovery:
>>>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>>>> > > > >>>>>> Scenario 1:
>>>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>>>> > > > >>>>>>    1. Upon restart, Reader A reports assigned
>>>>>>>>>>>>>>>> splits (1 and 2),
>>>>>>>>>>>>>>>> > and
>>>>>>>>>>>>>>>> > > > >>>>>>    Reader B reports (3 and 4).
>>>>>>>>>>>>>>>> > > > >>>>>>    2. The enumerator receives these reports
>>>>>>>>>>>>>>>> but only reassigns
>>>>>>>>>>>>>>>> > > > >>>>>>    splits 1 and 2 — not 3 and 4.
>>>>>>>>>>>>>>>> > > > >>>>>>    3. A checkpoint or savepoint is then
>>>>>>>>>>>>>>>> triggered. Only splits 1
>>>>>>>>>>>>>>>> > and
>>>>>>>>>>>>>>>> > > > >>>>>>    2 are recorded in the reader states; splits
>>>>>>>>>>>>>>>> 3 and 4 are not
>>>>>>>>>>>>>>>> > > > persisted.
>>>>>>>>>>>>>>>> > > > >>>>>>    4. If the job is later restarted from this
>>>>>>>>>>>>>>>> checkpoint, splits
>>>>>>>>>>>>>>>> > 3
>>>>>>>>>>>>>>>> > > > >>>>>>    and 4 will be permanently lost.
>>>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>>>> > > > >>>>>> Scenario 2:
>>>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>>>> > > > >>>>>>    1. Reader A reports splits (1 and 2), and
>>>>>>>>>>>>>>>> Reader B reports (3
>>>>>>>>>>>>>>>> > and
>>>>>>>>>>>>>>>> > > > >>>>>>    4) upon restart.
>>>>>>>>>>>>>>>> > > > >>>>>>    2. Before the enumerator receives all
>>>>>>>>>>>>>>>> reports and performs
>>>>>>>>>>>>>>>> > > > >>>>>>    reassignment, a checkpoint is triggered.
>>>>>>>>>>>>>>>> > > > >>>>>>    3. Since no splits have been reassigned
>>>>>>>>>>>>>>>> yet, both readers
>>>>>>>>>>>>>>>> > have
>>>>>>>>>>>>>>>> > > > >>>>>>    empty states.
>>>>>>>>>>>>>>>> > > > >>>>>>    4. When restarting from this checkpoint,
>>>>>>>>>>>>>>>> all four splits are
>>>>>>>>>>>>>>>> > > > lost.
>>>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>>>> > > > >>>>>> Let me know if you have thoughts on how we
>>>>>>>>>>>>>>>> might mitigate these
>>>>>>>>>>>>>>>> > > > risks!
>>>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>>>> > > > >>>>>> Best
>>>>>>>>>>>>>>>> > > > >>>>>> Hongshun
>>>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>>>> > > > >>>>>> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin <
>>>>>>>>>>>>>>>> [email protected]>
>>>>>>>>>>>>>>>> > > > >>>>>> wrote:
>>>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>>>> > > > >>>>>>> Hi Hongshun,
>>>>>>>>>>>>>>>> > > > >>>>>>>
>>>>>>>>>>>>>>>> > > > >>>>>>> The steps sound reasonable to me in general.
>>>>>>>>>>>>>>>> In terms of the
>>>>>>>>>>>>>>>> > > > updated
>>>>>>>>>>>>>>>> > > > >>>>>>> FLIP wiki, it would be good to see if we can
>>>>>>>>>>>>>>>> keep the protocol
>>>>>>>>>>>>>>>> > > > simple. One
>>>>>>>>>>>>>>>> > > > >>>>>>> alternative way to achieve this behavior is
>>>>>>>>>>>>>>>> following:
>>>>>>>>>>>>>>>> > > > >>>>>>>
>>>>>>>>>>>>>>>> > > > >>>>>>> 1. Upon SourceOperator startup, the
>>>>>>>>>>>>>>>> SourceOperator sends
>>>>>>>>>>>>>>>> > > > >>>>>>> ReaderRegistrationEvent with the currently
>>>>>>>>>>>>>>>> assigned splits to
>>>>>>>>>>>>>>>> > the
>>>>>>>>>>>>>>>> > > > >>>>>>> enumerator. It does not add these splits to
>>>>>>>>>>>>>>>> the SourceReader.
>>>>>>>>>>>>>>>> > > > >>>>>>> 2. The enumerator will always use the
>>>>>>>>>>>>>>>> > > > >>>>>>> SourceEnumeratorContext.assignSplits() to
>>>>>>>>>>>>>>>> assign the splits.
>>>>>>>>>>>>>>>> > (not
>>>>>>>>>>>>>>>> > > > via the
>>>>>>>>>>>>>>>> > > > >>>>>>> response of the SourceRegistrationEvent, this
>>>>>>>>>>>>>>>> allows async
>>>>>>>>>>>>>>>> > split
>>>>>>>>>>>>>>>> > > > assignment
>>>>>>>>>>>>>>>> > > > >>>>>>> in case the enumerator wants to wait until
>>>>>>>>>>>>>>>> all the readers are
>>>>>>>>>>>>>>>> > > > registered)
>>>>>>>>>>>>>>>> > > > >>>>>>> 3. The SourceOperator will only call
>>>>>>>>>>>>>>>> SourceReader.addSplits()
>>>>>>>>>>>>>>>> > when
>>>>>>>>>>>>>>>> > > > >>>>>>> it receives the AddSplitEvent from the
>>>>>>>>>>>>>>>> enumerator.
>>>>>>>>>>>>>>>> > > > >>>>>>>
>>>>>>>>>>>>>>>> > > > >>>>>>> This protocol has a few benefits:
>>>>>>>>>>>>>>>> > > > >>>>>>> 1. it basically allows arbitrary split
>>>>>>>>>>>>>>>> reassignment upon
>>>>>>>>>>>>>>>> > restart
>>>>>>>>>>>>>>>> > > > >>>>>>> 2. simplicity: there is only one way to
>>>>>>>>>>>>>>>> assign splits.
>>>>>>>>>>>>>>>> > > > >>>>>>>
>>>>>>>>>>>>>>>> > > > >>>>>>> So we only need one interface change:
>>>>>>>>>>>>>>>> > > > >>>>>>> - add the initially assigned splits to
>>>>>>>>>>>>>>>> ReaderInfo so the
>>>>>>>>>>>>>>>> > Enumerator
>>>>>>>>>>>>>>>> > > > >>>>>>> can access it.
>>>>>>>>>>>>>>>> > > > >>>>>>> and one behavior change:
>>>>>>>>>>>>>>>> > > > >>>>>>> - The SourceOperator should stop assigning
>>>>>>>>>>>>>>>> splits to the from
>>>>>>>>>>>>>>>> > state
>>>>>>>>>>>>>>>> > > > >>>>>>> restoration, but
>>>>>>>>>>>>>>>> > [message truncated...]
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to