Hi Hongshun,

Thanks for the proposal. The current Kafka split assignment algorithm does
seem to have issues. (I cannot recall why it was implemented this way at
that time...).

Two quick comments:
1. It seems that we don't need the method `SourceReader.getAssignedSplits()`.
The assigned splits are available in the SourceCoordinator upon state
restoration and can be put into the ReaderRegistrationEvent.
2. Instead of adding the method `SplitEnumerator.addReader(int subtaskId,
List<SplitT> assignedSplits)`, add a new field of `InitialSplitAssignment`
to the ReaderInfo. By design, the SplitEnumerator can get the reader
information any time from the `SplitEnumeratorContext.registeredReaders()`.
This also avoids the Enumerator implementation to remember the initially
assigned splits, if it wants to wait until all the readers are registered.
This also allow future addition of reader information.

Thanks,

Jiangjie (Becket) Qin



On Mon, Aug 4, 2025 at 8:39 PM Hongshun Wang <loserwang1...@gmail.com>
wrote:

> Anyone familiar with kafka connector can help review this FLIP? I am
> looking forward for your reply.
>
> Best
> Hongshun
>
> On Thu, Jul 24, 2025 at 8:13 PM Leonard Xu <xbjt...@gmail.com> wrote:
>
>> Thanks Hongshun for driving this work.
>>
>>
>> We also suffering the issue in production Kafka restoration usage,
>> current design is a nice tradeoff and has considered the new Source
>> implementation details, +1 from my side.
>>
>>
>> Best,
>> Leonard
>>
>>
>>
>> > 2025 7月 19 18:59,Hongshun Wang <loserwang1...@gmail.com> 写道:
>> >
>> > Hi devs,
>> >
>> > I'd like to initiate a discussion about [FLIP-537: Enumerator with
>> Global
>> > Split Assignment Distribution for Balanced Split Assignment] [1], which
>> > addresses critical limitations in our current Kafka connector split
>> > distribution mechanism.
>> >
>> > As documented in [FLINK-31762] [2], several scenarios currently lead to
>> > uneven Kafka split distribution, causing reader delays and performance
>> > bottlenecks. The core issue stems from the enumerator's lack of
>> visibility
>> > into post-assignment split distribution.
>> >
>> > This flip does two things:
>> > 1. ReaderRegistrationEvent Enhancement: SourceOperator should send
>> > ReaderRegistrationEvent with assigned splits metadata after startup to
>> > ensure state consistency.
>> > 2. Implementation in the Kafka connector to resolve imbalanced splits
>> and
>> > state awareness during recovery (the enumerator will always choose the
>> > least assigned subtask,and reason aslo as follows)
>> >
>> > Any additional questions regarding this FLIP? Looking forward to hearing
>> > from you.
>> >
>> > Best
>> > Hongshun
>> >
>> >
>> > [1]
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>> > [2] https://issues.apache.org/jira/browse/FLINK-31762
>>
>>

Reply via email to