Hi de vs,

Would anyone like to discuss this FLIP? I'd appreciate your feedback and 
suggestions.

Best,
Hongshun


> 2025年8月13日 14:23,Hongshun Wang <loserwang1...@gmail.com> 写道:
> 
> Hi Becket,
> Thank you for your detailed feedback. The new contract makes good sense to me 
> and effectively addresses the issues I encountered at the beginning of the 
> design.
> That said, I recommend not reporting splits by default, primarily for 
> compatibility and practical reasons:
> >  For these reasons, we do not expect the Split objects to be huge, and we 
> > are not trying to design for huge Split objects either as they will have 
> > problems even today.
> Not all existing connector match this rule
> For example, in mysql cdc connector, a binlog split may contain hundreds (or 
> even more) snapshot split completion records. This state is large and is 
> currently transmitted incrementally through multiple BinlogSplitMetaEvent 
> messages. Since the binlog reader operates with single parallelism, reporting 
> the full split state on recovery could be inefficient or even infeasible.
> For such sources, it would be better to provide a mechanism to skip split 
> reporting during restart until they redesign and reduce the split size.
> Not all enumerators maintain unassigned splits in state.
> Some SplitEnumerator(such as kafka connector) implementations do not track or 
> persistently manage unassigned splits. Requiring them to handle 
> re-registration would add unnecessary complexity. Even though we maybe 
> implements in kafka connector, currently, kafka connector is decouple with 
> flink version, we also need to make sure the elder version is compatible.
> To address these concerns, I propose introducing a new method: boolean 
> SourceReader#shouldReassignSplitsOnRecovery() with a default implementation 
> returning false. This allows source readers to opt in to split reassignment 
> only when necessary. Since the new contract already places the responsibility 
> for split assignment on the enumerator, not reporting splits by default is a 
> safe and clean default behavior.
> 
> 
> I’ve updated the implementation and the FIP accordingly[1]. It quite a big 
> change. In particular, for the Kafka connector, we can now use a pluggable 
> SplitPartitioner to support different split assignment strategies (e.g., 
> default, round-robin).
> 
> Could you please review it when you have a chance?
> 
> Best,
> Hongshun
> 
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
> 
> On Sat, Aug 9, 2025 at 3:03 AM Becket Qin <becket....@gmail.com 
> <mailto:becket....@gmail.com>> wrote:
>> Hi Hongshun,
>> 
>> I am not too concerned about the transmission cost. Because the full split 
>> transmission has to happen in the initial assignment phase already. And in 
>> the future, we probably want to also introduce some kind of workload balance 
>> across source readers, e.g. based on the per-split throughput or the 
>> per-source-reader workload in heterogeneous clusters. For these reasons, we 
>> do not expect the Split objects to be huge, and we are not trying to design 
>> for huge Split objects either as they will have problems even today.
>> 
>> Good point on the potential split loss, please see the reply below:
>> 
>>> Scenario 2:
>>> 
>>> 1. Reader A reports splits (1 and 2), and Reader B reports (3 and 4) upon 
>>> restart.
>>> 2. Before the enumerator receives all reports and performs reassignment, a 
>>> checkpoint is triggered.
>>> 3. Since no splits have been reassigned yet, both readers have empty states.
>>> 4. When restarting from this checkpoint, all four splits are lost.
>> The reader registration happens in the SourceOperator.open(), which means 
>> the task is still in the initializing state, therefore the checkpoint should 
>> not be triggered until the enumerator receives all the split reports.
>> 
>> There is a nuance here. Today, the RPC call from the TM to the JM is async. 
>> So it is possible that the SourceOpertor.open() has returned, but the 
>> enumerator has not received the split reports. However, because the task 
>> status update RPC call goes to the same channel as the split reports call, 
>> so the task status RPC call will happen after the split reports call on the 
>> JM side. Therefore, on the JM side, the SourceCoordinator will always first 
>> receive the split reports, then receive the checkpoint request.
>> This "happen before" relationship is kind of important to guarantee the 
>> consistent state between enumerator and readers.
>> 
>>> Scenario 1:
>>> 
>>> 1. Upon restart, Reader A reports assigned splits (1 and 2), and Reader B 
>>> reports (3 and 4).
>>> 2. The enumerator receives these reports but only reassigns splits 1 and 2 
>>> — not 3 and 4.
>>> 3. A checkpoint or savepoint is then triggered. Only splits 1 and 2 are 
>>> recorded in the reader states; splits 3 and 4 are not persisted.
>>> 4. If the job is later restarted from this checkpoint, splits 3 and 4 will 
>>> be permanently lost.
>> This scenario is possible. One solution is to let the enumerator 
>> implementation handle this. That means if the enumerator relies on the 
>> initial split reports from the source readers, it should maintain these 
>> reports by itself. In the above example, the enumerator will need to 
>> remember that 3 and 4 are not assigned and put it into its own state.
>> The current contract is that anything assigned to the SourceReaders are 
>> completely owned by the SourceReaders. Enumerators can remember the 
>> assignments but cannot change them, even when the source reader recovers / 
>> restarts.
>> With this FLIP, the contract becomes that the source readers will return the 
>> ownership of the splits to the enumerator. So the enumerator is responsible 
>> for maintaining these splits, until they are assigned to a source reader 
>> again.
>> 
>> There are other cases where there may be conflict information between reader 
>> and enumerator. For example, consider the following sequence:
>> 1. reader A reports splits (1 and 2) up on restart.
>> 2. enumerator receives the report and assigns both 1 and 2 to reader B.
>> 3. reader A failed before checkpointing. And this is a partial failure, so 
>> only reader A restarts.
>> 4. When reader A recovers, it will again report splits (1 and 2) to the 
>> enumerator. 
>> 5. The enumerator should ignore this report because it has assigned splits 
>> (1 and 2) to reader B.
>> 
>> So with the new contract, the enumerator should be the source of truth for 
>> split ownership.
>> 
>> Thanks,
>> 
>> Jiangjie (Becket) Qin
>> 
>> On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang <loserwang1...@gmail.com 
>> <mailto:loserwang1...@gmail.com>> wrote:
>>> Hi Becket,
>>> 
>>> I did consider this approach at the beginning (and it was also mentioned in 
>>> this FLIP), since it would allow more flexibility in reassigning all 
>>> splits. However, there are a few potential issues.
>>> 1. High Transmission Cost
>>> If we pass the full split objects (rather than just split IDs), the data 
>>> size could be significant, leading to high overhead during transmission — 
>>> especially when many splits are involved.
>>> 2. Risk of Split Loss
>>> Risk of split loss exists unless we have a mechanism to make sure only can 
>>> checkpoint after all the splits are reassigned.
>>> There are scenarios where splits could be lost due to inconsistent state 
>>> handling during recovery:
>>> 
>>> Scenario 1:
>>> 
>>> Upon restart, Reader A reports assigned splits (1 and 2), and Reader B 
>>> reports (3 and 4).
>>> The enumerator receives these reports but only reassigns splits 1 and 2 — 
>>> not 3 and 4.
>>> A checkpoint or savepoint is then triggered. Only splits 1 and 2 are 
>>> recorded in the reader states; splits 3 and 4 are not persisted.
>>> If the job is later restarted from this checkpoint, splits 3 and 4 will be 
>>> permanently lost.
>>> 
>>> Scenario 2: 
>>> Reader A reports splits (1 and 2), and Reader B reports (3 and 4) upon 
>>> restart.
>>> Before the enumerator receives all reports and performs reassignment, a 
>>> checkpoint is triggered.
>>> Since no splits have been reassigned yet, both readers have empty states.
>>> When restarting from this checkpoint, all four splits are lost.
>>> 
>>> Let me know if you have thoughts on how we might mitigate these risks!
>>> 
>>> Best
>>> Hongshun
>>> 
>>> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin <becket....@gmail.com 
>>> <mailto:becket....@gmail.com>> wrote:
>>>> Hi Hongshun,
>>>> 
>>>> The steps sound reasonable to me in general. In terms of the updated FLIP 
>>>> wiki, it would be good to see if we can keep the protocol simple. One 
>>>> alternative way to achieve this behavior is following:
>>>> 
>>>> 1. Upon SourceOperator startup, the SourceOperator sends 
>>>> ReaderRegistrationEvent with the currently assigned splits to the 
>>>> enumerator. It does not add these splits to the SourceReader.
>>>> 2. The enumerator will always use the 
>>>> SourceEnumeratorContext.assignSplits() to assign the splits. (not via the 
>>>> response of the SourceRegistrationEvent, this allows async split 
>>>> assignment in case the enumerator wants to wait until all the readers are 
>>>> registered)
>>>> 3. The SourceOperator will only call SourceReader.addSplits() when it 
>>>> receives the AddSplitEvent from the enumerator.
>>>> 
>>>> This protocol has a few benefits: 
>>>> 1. it basically allows arbitrary split reassignment upon restart
>>>> 2. simplicity: there is only one way to assign splits.
>>>> 
>>>> So we only need one interface change:
>>>> - add the initially assigned splits to ReaderInfo so the Enumerator can 
>>>> access it.
>>>> and one behavior change:
>>>> - The SourceOperator should stop assigning splits to the from state 
>>>> restoration, but only do that when it receives AddSplitsEvent from the 
>>>> enumerator.
>>>> 
>>>> The enumerator story is also simple:
>>>> 1. Receive some kind of notification (new partition, new reader, etc)
>>>> 2. look at the reader information (in the enumerator context or 
>>>> self-maintained state)
>>>> 3. assign splits via the enumerator context.
>>>> 
>>>> Thanks,
>>>> 
>>>> Jiangjie (Becket) Qin
>>>> 
>>>> On Thu, Aug 7, 2025 at 1:31 AM Hongshun Wang <loserwang1...@gmail.com 
>>>> <mailto:loserwang1...@gmail.com>> wrote:
>>>>> Hi Becket,
>>>>> Thanks for your advice — I’ve quickly learned a lot about the reader’s 
>>>>> design principle. It’s really interesting!
>>>>> 
>>>>> > One principle we want to follow is that the enumerator should be the 
>>>>> > brain doing the splits assignment, while the source readers read from 
>>>>> > the assigned splits. So we want to avoid the case where the 
>>>>> > SourceReader ignores the split assignment. 
>>>>> 
>>>>> It appears that MySQL CDC currently bypasses this principle by 
>>>>> proactively removing unused splits directly in the SourceReader. This may 
>>>>> be due to the lack of built-in framework support for such cleanup, 
>>>>> forcing connectors to handle it manually. However, this responsibility 
>>>>> ideally belongs in the framework.
>>>>> 
>>>>> With this FLIP, we propose a redesigned mechanism that centralizes split 
>>>>> cleanup logic in the SplitEnumerator, allowing connectors like MySQL CDC 
>>>>> to eventually adopt it( @leneord, CC).
>>>>> 
>>>>> To achieve this, we must carefully manage state consistency during 
>>>>> startup and recovery. The proposed approach is as follows:
>>>>> Reader Registration with Deferred Assignment
>>>>> When a reader starts (SourceOperator#open), it sends a 
>>>>> ReaderRegistrationEvent to the SplitEnumerator, including its previously 
>>>>> assigned splits (restored from state). However, these splits are not yet 
>>>>> assigned to the reader. The SourceOperator is placed in a PENDING state.
>>>>> Prevent State Pollution During Registration
>>>>> While in the PENDING state, SourceOperator#snapshotState will not update 
>>>>> the operator state. This prevents empty or outdated reader state (e.g., 
>>>>> with removed splits) from polluting the checkpoint.
>>>>> Enumerator Performs Split Cleanup and Acknowledges
>>>>> Upon receiving the ReaderRegistrationEvent, the SplitEnumerator removes 
>>>>> any splits that are no longer valid (e.g., due to removed topics or 
>>>>> tables) and returns the list of remaining valid split IDs to the reader 
>>>>> via a ReaderRegistrationACKEvent.
>>>>> For backward compatibility, the default behavior is to return all split 
>>>>> IDs (i.e., no filtering).
>>>>> Finalize Registration and Resume Normal Operation
>>>>> When the SourceOperator receives the ReaderRegistrationACKEvent, it 
>>>>> assigns the confirmed splits to the reader and transitions its state to 
>>>>> REGISTERED. From this point onward, SourceOperator#snapshotState can 
>>>>> safely update the operator state.
>>>>> 
>>>>> Best,
>>>>> Hongshun
>>>>> 
>>>>> On Thu, Aug 7, 2025 at 1:57 AM Becket Qin <becket....@gmail.com 
>>>>> <mailto:becket....@gmail.com>> wrote:
>>>>>>> SourceCoordinator doesn't store splits that have already been assigned 
>>>>>>> to readers, and SplitAssignmentTracker stores the splits only for this 
>>>>>>> checkpoint, which will be removed after checkpoint. Maybe you mean 
>>>>>>> SourceOperator?
>>>>>> Yes, I meant SourceOperator.
>>>>>> 
>>>>>>> At the beginning, I also thought about using it. However, there are two 
>>>>>>> situations:
>>>>>>> 1. During restart, if source options remove a topic or table: sometimes 
>>>>>>> connectors like MySQL CDC will remove unused splits after restart in 
>>>>>>> MySqlSourceReader#addSplits [1]. Kafka lacks this, so if the configured 
>>>>>>> topics change, removed topic's splits are still read. I also want to do 
>>>>>>> the same thing in Kafka.
>>>>>>> 2. In Kafka or MySQL CDC, some bounded splits, if finished, can be 
>>>>>>> removed after restart.
>>>>>>> In these cases, I have to get the assigned splits after 
>>>>>>> SourceReader#addSplits, rather than get them from SourceOperator 
>>>>>>> directly.
>>>>>> 
>>>>>> One principle we want to follow is that the enumerator should be the 
>>>>>> brain doing the splits assignment, while the source readers read from 
>>>>>> the assigned splits. So we want to avoid the case where the SourceReader 
>>>>>> ignores the split assignment. Given this principle, 
>>>>>> For case 1, if there is a subscription change, it might be better to 
>>>>>> hold back calling SourceReader.addSplits() until an assignment is 
>>>>>> confirmed by the Enumerator. In fact, this might be a good default 
>>>>>> behavior regardless of whether there is a subscription change.
>>>>>> For case 2: if a bounded split is finished, the 
>>>>>> SourceReader.snapshotState() will not contain that split. So upon 
>>>>>> restoration, those splits should not appear, right?
>>>>>> 
>>>>>> Thanks,
>>>>>> 
>>>>>> Jiangjie (Becket) Qin
>>>>>> 
>>>>>> On Wed, Aug 6, 2025 at 5:19 AM Hongshun Wang <loserwang1...@gmail.com 
>>>>>> <mailto:loserwang1...@gmail.com>> wrote:
>>>>>>> Hi Becket,
>>>>>>> 
>>>>>>> Thank you a lot for your advice, which helped me a lot.
>>>>>>> >  It seems that we don't need the method 
>>>>>>> > `SourceReader.getAssignedSplits()`. The assigned splits are available 
>>>>>>> > in the SourceCoordinator upon state restoration.
>>>>>>> 
>>>>>>>  SourceCoordinator doesn't store splits that have already been assigned 
>>>>>>> to readers, and SplitAssignmentTracker stores the splits only for this 
>>>>>>> checkpoint, which will be removed after checkpoint. Maybe you mean 
>>>>>>> SourceOperator?
>>>>>>> 
>>>>>>> At the beginning, I also thought about using it. However, there are two 
>>>>>>> situations:
>>>>>>> 1. During restart, if source options remove a topic or table: sometimes 
>>>>>>> connectors like MySQL CDC will remove unused splits after restart in 
>>>>>>> MySqlSourceReader#addSplits [1]. Kafka lacks this, so if the configured 
>>>>>>> topics change, removed topic's splits are still read. I also want to do 
>>>>>>> the same thing in Kafka.
>>>>>>> 2. In Kafka or MySQL CDC, some bounded splits, if finished, can be 
>>>>>>> removed after restart.
>>>>>>> In these cases, I have to get the assigned splits after 
>>>>>>> SourceReader#addSplits, rather than get them from SourceOperator 
>>>>>>> directly.
>>>>>>> 
>>>>>>> >  By design, the SplitEnumerator can get the reader information any 
>>>>>>> > time from the `SplitEnumeratorContext.registeredReaders()`.
>>>>>>> It looks good.
>>>>>>> 
>>>>>>> Thanks again.
>>>>>>> 
>>>>>>> Best,
>>>>>>> Hongshun
>>>>>>> 
>>>>>>> 
>>>>>>> [1] 
>>>>>>> https://github.com/apache/flink-cdc/blob/42f91a864e329c00959828fe0ca4f1e9e8e1de75/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java#L238
>>>>>>> 
>>>>>>> 
>>>>>>> On Tue, Aug 5, 2025 at 2:35 PM Becket Qin <becket....@gmail.com 
>>>>>>> <mailto:becket....@gmail.com>> wrote:
>>>>>>>> Hi Hongshun,
>>>>>>>> 
>>>>>>>> Thanks for the proposal. The current Kafka split assignment algorithm 
>>>>>>>> does seem to have issues. (I cannot recall why it was implemented this 
>>>>>>>> way at that time...).
>>>>>>>> 
>>>>>>>> Two quick comments:
>>>>>>>> 1. It seems that we don't need the method 
>>>>>>>> `SourceReader.getAssignedSplits()`. The assigned splits are available 
>>>>>>>> in the SourceCoordinator upon state restoration and can be put into 
>>>>>>>> the ReaderRegistrationEvent.
>>>>>>>> 2. Instead of adding the method `SplitEnumerator.addReader(int 
>>>>>>>> subtaskId, List<SplitT> assignedSplits)`, add a new field of 
>>>>>>>> `InitialSplitAssignment` to the ReaderInfo. By design, the 
>>>>>>>> SplitEnumerator can get the reader information any time from the 
>>>>>>>> `SplitEnumeratorContext.registeredReaders()`. This also avoids the 
>>>>>>>> Enumerator implementation to remember the initially assigned splits, 
>>>>>>>> if it wants to wait until all the readers are registered. This also 
>>>>>>>> allow future addition of reader information.
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> 
>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Mon, Aug 4, 2025 at 8:39 PM Hongshun Wang <loserwang1...@gmail.com 
>>>>>>>> <mailto:loserwang1...@gmail.com>> wrote:
>>>>>>>>> Anyone familiar with kafka connector can help review this FLIP? I am 
>>>>>>>>> looking forward for your reply.
>>>>>>>>> 
>>>>>>>>> Best
>>>>>>>>> Hongshun
>>>>>>>>> 
>>>>>>>>> On Thu, Jul 24, 2025 at 8:13 PM Leonard Xu <xbjt...@gmail.com 
>>>>>>>>> <mailto:xbjt...@gmail.com>> wrote:
>>>>>>>>>> Thanks Hongshun for driving this work.
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> We also suffering the issue in production Kafka restoration usage, 
>>>>>>>>>> current design is a nice tradeoff and has considered the new Source 
>>>>>>>>>> implementation details, +1 from my side.
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> Best,
>>>>>>>>>> Leonard
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> > 2025 7月 19 18:59,Hongshun Wang <loserwang1...@gmail.com 
>>>>>>>>>> > <mailto:loserwang1...@gmail.com>> 写道:
>>>>>>>>>> > 
>>>>>>>>>> > Hi devs,
>>>>>>>>>> > 
>>>>>>>>>> > I'd like to initiate a discussion about [FLIP-537: Enumerator with 
>>>>>>>>>> > Global
>>>>>>>>>> > Split Assignment Distribution for Balanced Split Assignment] [1], 
>>>>>>>>>> > which
>>>>>>>>>> > addresses critical limitations in our current Kafka connector split
>>>>>>>>>> > distribution mechanism.
>>>>>>>>>> > 
>>>>>>>>>> > As documented in [FLINK-31762] [2], several scenarios currently 
>>>>>>>>>> > lead to
>>>>>>>>>> > uneven Kafka split distribution, causing reader delays and 
>>>>>>>>>> > performance
>>>>>>>>>> > bottlenecks. The core issue stems from the enumerator's lack of 
>>>>>>>>>> > visibility
>>>>>>>>>> > into post-assignment split distribution.
>>>>>>>>>> > 
>>>>>>>>>> > This flip does two things:
>>>>>>>>>> > 1. ReaderRegistrationEvent Enhancement: SourceOperator should send
>>>>>>>>>> > ReaderRegistrationEvent with assigned splits metadata after 
>>>>>>>>>> > startup to
>>>>>>>>>> > ensure state consistency.
>>>>>>>>>> > 2. Implementation in the Kafka connector to resolve imbalanced 
>>>>>>>>>> > splits and
>>>>>>>>>> > state awareness during recovery (the enumerator will always choose 
>>>>>>>>>> > the
>>>>>>>>>> > least assigned subtask,and reason aslo as follows)
>>>>>>>>>> > 
>>>>>>>>>> > Any additional questions regarding this FLIP? Looking forward to 
>>>>>>>>>> > hearing
>>>>>>>>>> > from you.
>>>>>>>>>> > 
>>>>>>>>>> > Best
>>>>>>>>>> > Hongshun
>>>>>>>>>> > 
>>>>>>>>>> > 
>>>>>>>>>> > [1]
>>>>>>>>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>>>>>>>>>> > [2] https://issues.apache.org/jira/browse/FLINK-31762
>>>>>>>>>> 

Reply via email to