Hi Leonard,
Thanks for your advice.  It makes sense and I have modified it.

Best,
Hongshun

On Mon, Sep 8, 2025 at 11:40 AM Leonard Xu <xbjt...@gmail.com> wrote:

> Thanks Hongshun and Becket for the deep discussion.
>
> I only have one comment for one API design:
>
> > Deprecate the old addSplitsBack  method, use a addSplitsBack with param
> isReportedByReader instead. Because, The enumerator can apply different
> reassignment policies based on the context.
>
> Could we introduce a new method like *addSplitsBackOnRecovery*  with default
> implementation. In this way, we can provide better backward compatibility
> and also makes it easier for developers to understand.
>
> Best,
> Leonard
>
>
>
> 2025 9月 3 20:26,Hongshun Wang <loserwang1...@gmail.com> 写道:
>
> Hi Becket,
>
> I think that's a great idea!  I have added the
> SupportSplitReassignmentOnRecovery interface in this FLIP. If a Source
> implements this interface indicates that the source operator needs to
> report splits to the enumerator and receive reassignment.[1]
>
> Best,
> Hongshun
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>
> On Thu, Aug 21, 2025 at 12:09 PM Becket Qin <becket....@gmail.com> wrote:
>
>> Hi Hongshun,
>>
>> I think the convention for such optional features in Source is via mix-in
>> interfaces. So instead of adding a method to the SourceReader, maybe we
>> should introduce an interface SupportSplitReassingmentOnRecovery with this
>> method. If a Source implementation implements that interface, then the
>> SourceOperator will check the desired behavior and act accordingly.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Wed, Aug 20, 2025 at 8:52 PM Hongshun Wang <loserwang1...@gmail.com>
>> wrote:
>>
>>> Hi de vs,
>>>
>>> Would anyone like to discuss this FLIP? I'd appreciate your feedback and
>>> suggestions.
>>>
>>> Best,
>>> Hongshun
>>>
>>>
>>> 2025年8月13日 14:23,Hongshun Wang <loserwang1...@gmail.com> 写道:
>>>
>>> Hi Becket,
>>>
>>> Thank you for your detailed feedback. The new contract makes good sense
>>> to me and effectively addresses the issues I encountered at the beginning
>>> of the design.
>>>
>>> That said, I recommend not reporting splits by default, primarily for
>>> compatibility and practical reasons:
>>>
>>> >  For these reasons, we do not expect the Split objects to be huge,
>>> and we are not trying to design for huge Split objects either as they will
>>> have problems even today.
>>>
>>>    1.
>>>
>>>    Not all existing connector match this rule
>>>    For example, in mysql cdc connector, a binlog split may contain
>>>    hundreds (or even more) snapshot split completion records. This state is
>>>    large and is currently transmitted incrementally through multiple
>>>    BinlogSplitMetaEvent messages. Since the binlog reader operates with 
>>> single
>>>    parallelism, reporting the full split state on recovery could be
>>>    inefficient or even infeasible.
>>>    For such sources, it would be better to provide a mechanism to skip
>>>    split reporting during restart until they redesign and reduce the
>>>    split size.
>>>    2.
>>>
>>>    Not all enumerators maintain unassigned splits in state.
>>>    Some SplitEnumerator(such as kafka connector) implementations do not
>>>    track or persistently manage unassigned splits. Requiring them to handle
>>>    re-registration would add unnecessary complexity. Even though we maybe
>>>    implements in kafka connector, currently, kafka connector is decouple 
>>> with
>>>    flink version, we also need to make sure the elder version is compatible.
>>>
>>> ------------------------------
>>>
>>> To address these concerns, I propose introducing a new method: boolean
>>> SourceReader#shouldReassignSplitsOnRecovery() with a default
>>> implementation returning false. This allows source readers to opt in to
>>> split reassignment only when necessary. Since the new contract already
>>> places the responsibility for split assignment on the enumerator, not
>>> reporting splits by default is a safe and clean default behavior.
>>>
>>>
>>> ------------------------------
>>>
>>>
>>> I’ve updated the implementation and the FIP accordingly[1]. It quite a
>>> big change. In particular, for the Kafka connector, we can now use a
>>> pluggable SplitPartitioner to support different split assignment
>>> strategies (e.g., default, round-robin).
>>>
>>>
>>> Could you please review it when you have a chance?
>>>
>>>
>>> Best,
>>>
>>> Hongshun
>>>
>>>
>>> [1]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>>>
>>> On Sat, Aug 9, 2025 at 3:03 AM Becket Qin <becket....@gmail.com> wrote:
>>>
>>>> Hi Hongshun,
>>>>
>>>> I am not too concerned about the transmission cost. Because the full
>>>> split transmission has to happen in the initial assignment phase already.
>>>> And in the future, we probably want to also introduce some kind of workload
>>>> balance across source readers, e.g. based on the per-split throughput or
>>>> the per-source-reader workload in heterogeneous clusters. For these
>>>> reasons, we do not expect the Split objects to be huge, and we are not
>>>> trying to design for huge Split objects either as they will have problems
>>>> even today.
>>>>
>>>> Good point on the potential split loss, please see the reply below:
>>>>
>>>> Scenario 2:
>>>>
>>>>
>>>>> 1. Reader A reports splits (1 and 2), and Reader B reports (3 and 4)
>>>>> upon restart.
>>>>> 2. Before the enumerator receives all reports and performs
>>>>> reassignment, a checkpoint is triggered.
>>>>> 3. Since no splits have been reassigned yet, both readers have empty
>>>>> states.
>>>>> 4. When restarting from this checkpoint, all four splits are lost.
>>>>
>>>> The reader registration happens in the SourceOperator.open(), which
>>>> means the task is still in the initializing state, therefore the checkpoint
>>>> should not be triggered until the enumerator receives all the split 
>>>> reports.
>>>>
>>>> There is a nuance here. Today, the RPC call from the TM to the JM is
>>>> async. So it is possible that the SourceOpertor.open() has returned, but
>>>> the enumerator has not received the split reports. However, because the
>>>> task status update RPC call goes to the same channel as the split reports
>>>> call, so the task status RPC call will happen after the split reports call
>>>> on the JM side. Therefore, on the JM side, the SourceCoordinator will
>>>> always first receive the split reports, then receive the checkpoint 
>>>> request.
>>>> This "happen before" relationship is kind of important to guarantee the
>>>> consistent state between enumerator and readers.
>>>>
>>>> Scenario 1:
>>>>
>>>>
>>>>> 1. Upon restart, Reader A reports assigned splits (1 and 2), and
>>>>> Reader B reports (3 and 4).
>>>>> 2. The enumerator receives these reports but only reassigns splits 1
>>>>> and 2 — not 3 and 4.
>>>>> 3. A checkpoint or savepoint is then triggered. Only splits 1 and 2
>>>>> are recorded in the reader states; splits 3 and 4 are not persisted.
>>>>> 4. If the job is later restarted from this checkpoint, splits 3 and 4
>>>>> will be permanently lost.
>>>>
>>>> This scenario is possible. One solution is to let the enumerator
>>>> implementation handle this. That means if the enumerator relies on the
>>>> initial split reports from the source readers, it should maintain these
>>>> reports by itself. In the above example, the enumerator will need to
>>>> remember that 3 and 4 are not assigned and put it into its own state.
>>>> The current contract is that anything assigned to the SourceReaders
>>>> are completely owned by the SourceReaders. Enumerators can remember the
>>>> assignments but cannot change them, even when the source reader recovers /
>>>> restarts.
>>>> With this FLIP, the contract becomes that the source readers will
>>>> return the ownership of the splits to the enumerator. So the enumerator is
>>>> responsible for maintaining these splits, until they are assigned to a
>>>> source reader again.
>>>>
>>>> There are other cases where there may be conflict information between
>>>> reader and enumerator. For example, consider the following sequence:
>>>> 1. reader A reports splits (1 and 2) up on restart.
>>>> 2. enumerator receives the report and assigns both 1 and 2 to reader B.
>>>> 3. reader A failed before checkpointing. And this is a partial failure,
>>>> so only reader A restarts.
>>>> 4. When reader A recovers, it will again report splits (1 and 2) to the
>>>> enumerator.
>>>> 5. The enumerator should ignore this report because it has
>>>> assigned splits (1 and 2) to reader B.
>>>>
>>>> So with the new contract, the enumerator should be the source of truth
>>>> for split ownership.
>>>>
>>>> Thanks,
>>>>
>>>> Jiangjie (Becket) Qin
>>>>
>>>> On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang <loserwang1...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Becket,
>>>>>
>>>>>
>>>>> I did consider this approach at the beginning (and it was also
>>>>> mentioned in this FLIP), since it would allow more flexibility in
>>>>> reassigning all splits. However, there are a few potential issues.
>>>>>
>>>>> 1. High Transmission Cost
>>>>> If we pass the full split objects (rather than just split IDs), the
>>>>> data size could be significant, leading to high overhead during
>>>>> transmission — especially when many splits are involved.
>>>>>
>>>>> 2. Risk of Split Loss
>>>>>
>>>>> Risk of split loss exists unless we have a mechanism to make sure only
>>>>> can checkpoint after all the splits are reassigned.
>>>>> There are scenarios where splits could be lost due to inconsistent
>>>>> state handling during recovery:
>>>>>
>>>>>
>>>>> Scenario 1:
>>>>>
>>>>>
>>>>>    1. Upon restart, Reader A reports assigned splits (1 and 2), and
>>>>>    Reader B reports (3 and 4).
>>>>>    2. The enumerator receives these reports but only reassigns splits
>>>>>    1 and 2 — not 3 and 4.
>>>>>    3. A checkpoint or savepoint is then triggered. Only splits 1 and
>>>>>    2 are recorded in the reader states; splits 3 and 4 are not persisted.
>>>>>    4. If the job is later restarted from this checkpoint, splits 3
>>>>>    and 4 will be permanently lost.
>>>>>
>>>>>
>>>>> Scenario 2:
>>>>>
>>>>>    1. Reader A reports splits (1 and 2), and Reader B reports (3 and
>>>>>    4) upon restart.
>>>>>    2. Before the enumerator receives all reports and performs
>>>>>    reassignment, a checkpoint is triggered.
>>>>>    3. Since no splits have been reassigned yet, both readers have
>>>>>    empty states.
>>>>>    4. When restarting from this checkpoint, all four splits are lost.
>>>>>
>>>>>
>>>>> Let me know if you have thoughts on how we might mitigate these risks!
>>>>>
>>>>> Best
>>>>> Hongshun
>>>>>
>>>>> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin <becket....@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Hongshun,
>>>>>>
>>>>>> The steps sound reasonable to me in general. In terms of the updated
>>>>>> FLIP wiki, it would be good to see if we can keep the protocol simple. 
>>>>>> One
>>>>>> alternative way to achieve this behavior is following:
>>>>>>
>>>>>> 1. Upon SourceOperator startup, the SourceOperator sends
>>>>>> ReaderRegistrationEvent with the currently assigned splits to the
>>>>>> enumerator. It does not add these splits to the SourceReader.
>>>>>> 2. The enumerator will always use the
>>>>>> SourceEnumeratorContext.assignSplits() to assign the splits. (not via the
>>>>>> response of the SourceRegistrationEvent, this allows async split 
>>>>>> assignment
>>>>>> in case the enumerator wants to wait until all the readers are 
>>>>>> registered)
>>>>>> 3. The SourceOperator will only call SourceReader.addSplits() when it
>>>>>> receives the AddSplitEvent from the enumerator.
>>>>>>
>>>>>> This protocol has a few benefits:
>>>>>> 1. it basically allows arbitrary split reassignment upon restart
>>>>>> 2. simplicity: there is only one way to assign splits.
>>>>>>
>>>>>> So we only need one interface change:
>>>>>> - add the initially assigned splits to ReaderInfo so the Enumerator
>>>>>> can access it.
>>>>>> and one behavior change:
>>>>>> - The SourceOperator should stop assigning splits to the from state
>>>>>> restoration, but only do that when it receives AddSplitsEvent from the
>>>>>> enumerator.
>>>>>>
>>>>>> The enumerator story is also simple:
>>>>>> 1. Receive some kind of notification (new partition, new reader, etc)
>>>>>> 2. look at the reader information (in the enumerator context or
>>>>>> self-maintained state)
>>>>>> 3. assign splits via the enumerator context.
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Jiangjie (Becket) Qin
>>>>>>
>>>>>> On Thu, Aug 7, 2025 at 1:31 AM Hongshun Wang <loserwang1...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Becket,
>>>>>>> Thanks for your advice — I’ve quickly learned a lot about the
>>>>>>> reader’s design principle. It’s really interesting!
>>>>>>>
>>>>>>> > One principle we want to follow is that the enumerator should be
>>>>>>> the brain doing the splits assignment, while the source readers read 
>>>>>>> from
>>>>>>> the assigned splits. So we want to avoid the case where the SourceReader
>>>>>>> ignores the split assignment.
>>>>>>>
>>>>>>> It appears that MySQL CDC currently bypasses this principle by
>>>>>>> proactively removing unused splits directly in the SourceReader. This 
>>>>>>> may
>>>>>>> be due to the lack of built-in framework support for such cleanup, 
>>>>>>> forcing
>>>>>>> connectors to handle it manually. However, this responsibility ideally
>>>>>>> belongs in the framework.
>>>>>>>
>>>>>>> With this FLIP, we propose a redesigned mechanism that centralizes
>>>>>>> split cleanup logic in the SplitEnumerator, allowing connectors like 
>>>>>>> MySQL
>>>>>>> CDC to eventually adopt it( @leneord, CC).
>>>>>>>
>>>>>>> To achieve this, we must carefully manage state consistency during
>>>>>>> startup and recovery. The proposed approach is as follows:
>>>>>>>
>>>>>>>    1. Reader Registration with Deferred Assignment
>>>>>>>    When a reader starts (SourceOperator#open), it sends a
>>>>>>>    ReaderRegistrationEvent to the SplitEnumerator, including its
>>>>>>>    previously assigned splits (restored from state). However, these 
>>>>>>> splits
>>>>>>>    are not yet assigned to the reader. The SourceOperator is placed
>>>>>>>    in a PENDING state.
>>>>>>>    2. Prevent State Pollution During Registration
>>>>>>>    While in the PENDING state, SourceOperator#snapshotState will
>>>>>>>    not update the operator state. This prevents empty or outdated 
>>>>>>> reader state
>>>>>>>    (e.g., with removed splits) from polluting the checkpoint.
>>>>>>>    3. Enumerator Performs Split Cleanup and Acknowledges
>>>>>>>    Upon receiving the ReaderRegistrationEvent, the SplitEnumerator 
>>>>>>> removes
>>>>>>>    any splits that are no longer valid (e.g., due to removed topics or 
>>>>>>> tables)
>>>>>>>    and returns the list of remaining valid split IDs to the reader via a
>>>>>>>    ReaderRegistrationACKEvent.
>>>>>>>    For backward compatibility, the default behavior is to return
>>>>>>>    all split IDs (i.e., no filtering).
>>>>>>>    4. Finalize Registration and Resume Normal Operation
>>>>>>>    When the SourceOperator receives the ReaderRegistrationACKEvent,
>>>>>>>    it assigns the confirmed splits to the reader and transitions its 
>>>>>>> state to
>>>>>>>    REGISTERED. From this point onward, SourceOperator#snapshotState can
>>>>>>>    safely update the operator state.
>>>>>>>
>>>>>>>
>>>>>>> Best,
>>>>>>> Hongshun
>>>>>>>
>>>>>>> On Thu, Aug 7, 2025 at 1:57 AM Becket Qin <becket....@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> SourceCoordinator doesn't store splits that have already been
>>>>>>>>> assigned to readers, and SplitAssignmentTracker stores the splits 
>>>>>>>>> only for
>>>>>>>>> this checkpoint, which will be removed after checkpoint. Maybe you 
>>>>>>>>> mean
>>>>>>>>> SourceOperator?
>>>>>>>>
>>>>>>>> Yes, I meant SourceOperator.
>>>>>>>>
>>>>>>>> At the beginning, I also thought about using it. However, there are
>>>>>>>>> two situations:
>>>>>>>>> 1. During restart, if source options remove a topic or table:
>>>>>>>>> sometimes connectors like MySQL CDC will remove unused splits after 
>>>>>>>>> restart
>>>>>>>>> in MySqlSourceReader#addSplits [1]. Kafka lacks this, so if the 
>>>>>>>>> configured
>>>>>>>>> topics change, removed topic's splits are still read. I also want to 
>>>>>>>>> do the
>>>>>>>>> same thing in Kafka.
>>>>>>>>> 2. In Kafka or MySQL CDC, some bounded splits, if finished, can be
>>>>>>>>> removed after restart.
>>>>>>>>> In these cases, I have to get the assigned splits after
>>>>>>>>> SourceReader#addSplits, rather than get them from SourceOperator
>>>>>>>>> directly.
>>>>>>>>
>>>>>>>>
>>>>>>>> One principle we want to follow is that the enumerator should be
>>>>>>>> the brain doing the splits assignment, while the source readers read 
>>>>>>>> from
>>>>>>>> the assigned splits. So we want to avoid the case where the 
>>>>>>>> SourceReader
>>>>>>>> ignores the split assignment. Given this principle,
>>>>>>>> For case 1, if there is a subscription change, it might be better
>>>>>>>> to hold back calling SourceReader.addSplits() until an assignment is
>>>>>>>> confirmed by the Enumerator. In fact, this might be a good default 
>>>>>>>> behavior
>>>>>>>> regardless of whether there is a subscription change.
>>>>>>>> For case 2: if a bounded split is finished, the
>>>>>>>> SourceReader.snapshotState() will not contain that split. So upon
>>>>>>>> restoration, those splits should not appear, right?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>
>>>>>>>> On Wed, Aug 6, 2025 at 5:19 AM Hongshun Wang <
>>>>>>>> loserwang1...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Becket,
>>>>>>>>>
>>>>>>>>> Thank you a lot for your advice, which helped me a lot.
>>>>>>>>> >  It seems that we don't need the method `SourceReader.
>>>>>>>>> getAssignedSplits()`. The assigned splits are available in the
>>>>>>>>> SourceCoordinator upon state restoration.
>>>>>>>>>
>>>>>>>>>  SourceCoordinator doesn't store splits that have already been
>>>>>>>>> assigned to readers, and SplitAssignmentTracker stores the splits 
>>>>>>>>> only for
>>>>>>>>> this checkpoint, which will be removed after checkpoint. Maybe you 
>>>>>>>>> mean
>>>>>>>>> SourceOperator?
>>>>>>>>>
>>>>>>>>> At the beginning, I also thought about using it. However, there
>>>>>>>>> are two situations:
>>>>>>>>> 1. During restart, if source options remove a topic or table:
>>>>>>>>> sometimes connectors like MySQL CDC will remove unused splits after 
>>>>>>>>> restart
>>>>>>>>> in MySqlSourceReader#addSplits [1]. Kafka lacks this, so if the 
>>>>>>>>> configured
>>>>>>>>> topics change, removed topic's splits are still read. I also want to 
>>>>>>>>> do the
>>>>>>>>> same thing in Kafka.
>>>>>>>>> 2. In Kafka or MySQL CDC, some bounded splits, if finished, can be
>>>>>>>>> removed after restart.
>>>>>>>>> In these cases, I have to get the assigned splits after
>>>>>>>>> SourceReader#addSplits, rather than get them from SourceOperator
>>>>>>>>> directly.
>>>>>>>>>
>>>>>>>>> >  By design, the SplitEnumerator can get the reader information
>>>>>>>>> any time from the `SplitEnumeratorContext.registeredReaders()`.
>>>>>>>>> It looks good.
>>>>>>>>>
>>>>>>>>> Thanks again.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Hongshun
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://github.com/apache/flink-cdc/blob/42f91a864e329c00959828fe0ca4f1e9e8e1de75/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java#L238
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Aug 5, 2025 at 2:35 PM Becket Qin <becket....@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Hongshun,
>>>>>>>>>>
>>>>>>>>>> Thanks for the proposal. The current Kafka split assignment
>>>>>>>>>> algorithm does seem to have issues. (I cannot recall why it was 
>>>>>>>>>> implemented
>>>>>>>>>> this way at that time...).
>>>>>>>>>>
>>>>>>>>>> Two quick comments:
>>>>>>>>>> 1. It seems that we don't need the method `SourceReader.
>>>>>>>>>> getAssignedSplits()`. The assigned splits are available in the
>>>>>>>>>> SourceCoordinator upon state restoration and can be put into the
>>>>>>>>>> ReaderRegistrationEvent.
>>>>>>>>>> 2. Instead of adding the method `SplitEnumerator.addReader(int
>>>>>>>>>> subtaskId, List<SplitT> assignedSplits)`, add a new field of
>>>>>>>>>> `InitialSplitAssignment` to the ReaderInfo. By design, the 
>>>>>>>>>> SplitEnumerator
>>>>>>>>>> can get the reader information any time from the
>>>>>>>>>> `SplitEnumeratorContext.registeredReaders()`. This also avoids the
>>>>>>>>>> Enumerator implementation to remember the initially assigned splits, 
>>>>>>>>>> if it
>>>>>>>>>> wants to wait until all the readers are registered. This also allow 
>>>>>>>>>> future
>>>>>>>>>> addition of reader information.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>>
>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Aug 4, 2025 at 8:39 PM Hongshun Wang <
>>>>>>>>>> loserwang1...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Anyone familiar with kafka connector can help review this FLIP?
>>>>>>>>>>> I am looking forward for your reply.
>>>>>>>>>>>
>>>>>>>>>>> Best
>>>>>>>>>>> Hongshun
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jul 24, 2025 at 8:13 PM Leonard Xu <xbjt...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thanks Hongshun for driving this work.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> We also suffering the issue in production Kafka restoration
>>>>>>>>>>>> usage, current design is a nice tradeoff and has considered the 
>>>>>>>>>>>> new Source
>>>>>>>>>>>> implementation details, +1 from my side.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Leonard
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> > 2025 7月 19 18:59,Hongshun Wang <loserwang1...@gmail.com> 写道:
>>>>>>>>>>>> >
>>>>>>>>>>>> > Hi devs,
>>>>>>>>>>>> >
>>>>>>>>>>>> > I'd like to initiate a discussion about [FLIP-537: Enumerator
>>>>>>>>>>>> with Global
>>>>>>>>>>>> > Split Assignment Distribution for Balanced Split Assignment]
>>>>>>>>>>>> [1], which
>>>>>>>>>>>> > addresses critical limitations in our current Kafka connector
>>>>>>>>>>>> split
>>>>>>>>>>>> > distribution mechanism.
>>>>>>>>>>>> >
>>>>>>>>>>>> > As documented in [FLINK-31762] [2], several scenarios
>>>>>>>>>>>> currently lead to
>>>>>>>>>>>> > uneven Kafka split distribution, causing reader delays and
>>>>>>>>>>>> performance
>>>>>>>>>>>> > bottlenecks. The core issue stems from the enumerator's lack
>>>>>>>>>>>> of visibility
>>>>>>>>>>>> > into post-assignment split distribution.
>>>>>>>>>>>> >
>>>>>>>>>>>> > This flip does two things:
>>>>>>>>>>>> > 1. ReaderRegistrationEvent Enhancement: SourceOperator should
>>>>>>>>>>>> send
>>>>>>>>>>>> > ReaderRegistrationEvent with assigned splits metadata after
>>>>>>>>>>>> startup to
>>>>>>>>>>>> > ensure state consistency.
>>>>>>>>>>>> > 2. Implementation in the Kafka connector to resolve
>>>>>>>>>>>> imbalanced splits and
>>>>>>>>>>>> > state awareness during recovery (the enumerator will always
>>>>>>>>>>>> choose the
>>>>>>>>>>>> > least assigned subtask,and reason aslo as follows)
>>>>>>>>>>>> >
>>>>>>>>>>>> > Any additional questions regarding this FLIP? Looking forward
>>>>>>>>>>>> to hearing
>>>>>>>>>>>> > from you.
>>>>>>>>>>>> >
>>>>>>>>>>>> > Best
>>>>>>>>>>>> > Hongshun
>>>>>>>>>>>> >
>>>>>>>>>>>> >
>>>>>>>>>>>> > [1]
>>>>>>>>>>>> >
>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>>>>>>>>>>>> > [2] https://issues.apache.org/jira/browse/FLINK-31762
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>
>

Reply via email to