Hi de vs, Would anyone like to discuss this FLIP? I'd appreciate your feedback and suggestions.
Best, Hongshun > 2025年8月13日 14:23,Hongshun Wang <loserwang1...@gmail.com> 写道: > > Hi Becket, > Thank you for your detailed feedback. The new contract makes good sense to me > and effectively addresses the issues I encountered at the beginning of the > design. > That said, I recommend not reporting splits by default, primarily for > compatibility and practical reasons: > > For these reasons, we do not expect the Split objects to be huge, and we > > are not trying to design for huge Split objects either as they will have > > problems even today. > Not all existing connector match this rule > For example, in mysql cdc connector, a binlog split may contain hundreds (or > even more) snapshot split completion records. This state is large and is > currently transmitted incrementally through multiple BinlogSplitMetaEvent > messages. Since the binlog reader operates with single parallelism, reporting > the full split state on recovery could be inefficient or even infeasible. > For such sources, it would be better to provide a mechanism to skip split > reporting during restart until they redesign and reduce the split size. > Not all enumerators maintain unassigned splits in state. > Some SplitEnumerator(such as kafka connector) implementations do not track or > persistently manage unassigned splits. Requiring them to handle > re-registration would add unnecessary complexity. Even though we maybe > implements in kafka connector, currently, kafka connector is decouple with > flink version, we also need to make sure the elder version is compatible. > To address these concerns, I propose introducing a new method: boolean > SourceReader#shouldReassignSplitsOnRecovery() with a default implementation > returning false. This allows source readers to opt in to split reassignment > only when necessary. Since the new contract already places the responsibility > for split assignment on the enumerator, not reporting splits by default is a > safe and clean default behavior. > > > I’ve updated the implementation and the FIP accordingly[1]. It quite a big > change. In particular, for the Kafka connector, we can now use a pluggable > SplitPartitioner to support different split assignment strategies (e.g., > default, round-robin). > > Could you please review it when you have a chance? > > Best, > Hongshun > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment > > On Sat, Aug 9, 2025 at 3:03 AM Becket Qin <becket....@gmail.com > <mailto:becket....@gmail.com>> wrote: >> Hi Hongshun, >> >> I am not too concerned about the transmission cost. Because the full split >> transmission has to happen in the initial assignment phase already. And in >> the future, we probably want to also introduce some kind of workload balance >> across source readers, e.g. based on the per-split throughput or the >> per-source-reader workload in heterogeneous clusters. For these reasons, we >> do not expect the Split objects to be huge, and we are not trying to design >> for huge Split objects either as they will have problems even today. >> >> Good point on the potential split loss, please see the reply below: >> >>> Scenario 2: >>> >>> 1. Reader A reports splits (1 and 2), and Reader B reports (3 and 4) upon >>> restart. >>> 2. Before the enumerator receives all reports and performs reassignment, a >>> checkpoint is triggered. >>> 3. Since no splits have been reassigned yet, both readers have empty states. >>> 4. When restarting from this checkpoint, all four splits are lost. >> The reader registration happens in the SourceOperator.open(), which means >> the task is still in the initializing state, therefore the checkpoint should >> not be triggered until the enumerator receives all the split reports. >> >> There is a nuance here. Today, the RPC call from the TM to the JM is async. >> So it is possible that the SourceOpertor.open() has returned, but the >> enumerator has not received the split reports. However, because the task >> status update RPC call goes to the same channel as the split reports call, >> so the task status RPC call will happen after the split reports call on the >> JM side. Therefore, on the JM side, the SourceCoordinator will always first >> receive the split reports, then receive the checkpoint request. >> This "happen before" relationship is kind of important to guarantee the >> consistent state between enumerator and readers. >> >>> Scenario 1: >>> >>> 1. Upon restart, Reader A reports assigned splits (1 and 2), and Reader B >>> reports (3 and 4). >>> 2. The enumerator receives these reports but only reassigns splits 1 and 2 >>> — not 3 and 4. >>> 3. A checkpoint or savepoint is then triggered. Only splits 1 and 2 are >>> recorded in the reader states; splits 3 and 4 are not persisted. >>> 4. If the job is later restarted from this checkpoint, splits 3 and 4 will >>> be permanently lost. >> This scenario is possible. One solution is to let the enumerator >> implementation handle this. That means if the enumerator relies on the >> initial split reports from the source readers, it should maintain these >> reports by itself. In the above example, the enumerator will need to >> remember that 3 and 4 are not assigned and put it into its own state. >> The current contract is that anything assigned to the SourceReaders are >> completely owned by the SourceReaders. Enumerators can remember the >> assignments but cannot change them, even when the source reader recovers / >> restarts. >> With this FLIP, the contract becomes that the source readers will return the >> ownership of the splits to the enumerator. So the enumerator is responsible >> for maintaining these splits, until they are assigned to a source reader >> again. >> >> There are other cases where there may be conflict information between reader >> and enumerator. For example, consider the following sequence: >> 1. reader A reports splits (1 and 2) up on restart. >> 2. enumerator receives the report and assigns both 1 and 2 to reader B. >> 3. reader A failed before checkpointing. And this is a partial failure, so >> only reader A restarts. >> 4. When reader A recovers, it will again report splits (1 and 2) to the >> enumerator. >> 5. The enumerator should ignore this report because it has assigned splits >> (1 and 2) to reader B. >> >> So with the new contract, the enumerator should be the source of truth for >> split ownership. >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang <loserwang1...@gmail.com >> <mailto:loserwang1...@gmail.com>> wrote: >>> Hi Becket, >>> >>> I did consider this approach at the beginning (and it was also mentioned in >>> this FLIP), since it would allow more flexibility in reassigning all >>> splits. However, there are a few potential issues. >>> 1. High Transmission Cost >>> If we pass the full split objects (rather than just split IDs), the data >>> size could be significant, leading to high overhead during transmission — >>> especially when many splits are involved. >>> 2. Risk of Split Loss >>> Risk of split loss exists unless we have a mechanism to make sure only can >>> checkpoint after all the splits are reassigned. >>> There are scenarios where splits could be lost due to inconsistent state >>> handling during recovery: >>> >>> Scenario 1: >>> >>> Upon restart, Reader A reports assigned splits (1 and 2), and Reader B >>> reports (3 and 4). >>> The enumerator receives these reports but only reassigns splits 1 and 2 — >>> not 3 and 4. >>> A checkpoint or savepoint is then triggered. Only splits 1 and 2 are >>> recorded in the reader states; splits 3 and 4 are not persisted. >>> If the job is later restarted from this checkpoint, splits 3 and 4 will be >>> permanently lost. >>> >>> Scenario 2: >>> Reader A reports splits (1 and 2), and Reader B reports (3 and 4) upon >>> restart. >>> Before the enumerator receives all reports and performs reassignment, a >>> checkpoint is triggered. >>> Since no splits have been reassigned yet, both readers have empty states. >>> When restarting from this checkpoint, all four splits are lost. >>> >>> Let me know if you have thoughts on how we might mitigate these risks! >>> >>> Best >>> Hongshun >>> >>> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin <becket....@gmail.com >>> <mailto:becket....@gmail.com>> wrote: >>>> Hi Hongshun, >>>> >>>> The steps sound reasonable to me in general. In terms of the updated FLIP >>>> wiki, it would be good to see if we can keep the protocol simple. One >>>> alternative way to achieve this behavior is following: >>>> >>>> 1. Upon SourceOperator startup, the SourceOperator sends >>>> ReaderRegistrationEvent with the currently assigned splits to the >>>> enumerator. It does not add these splits to the SourceReader. >>>> 2. The enumerator will always use the >>>> SourceEnumeratorContext.assignSplits() to assign the splits. (not via the >>>> response of the SourceRegistrationEvent, this allows async split >>>> assignment in case the enumerator wants to wait until all the readers are >>>> registered) >>>> 3. The SourceOperator will only call SourceReader.addSplits() when it >>>> receives the AddSplitEvent from the enumerator. >>>> >>>> This protocol has a few benefits: >>>> 1. it basically allows arbitrary split reassignment upon restart >>>> 2. simplicity: there is only one way to assign splits. >>>> >>>> So we only need one interface change: >>>> - add the initially assigned splits to ReaderInfo so the Enumerator can >>>> access it. >>>> and one behavior change: >>>> - The SourceOperator should stop assigning splits to the from state >>>> restoration, but only do that when it receives AddSplitsEvent from the >>>> enumerator. >>>> >>>> The enumerator story is also simple: >>>> 1. Receive some kind of notification (new partition, new reader, etc) >>>> 2. look at the reader information (in the enumerator context or >>>> self-maintained state) >>>> 3. assign splits via the enumerator context. >>>> >>>> Thanks, >>>> >>>> Jiangjie (Becket) Qin >>>> >>>> On Thu, Aug 7, 2025 at 1:31 AM Hongshun Wang <loserwang1...@gmail.com >>>> <mailto:loserwang1...@gmail.com>> wrote: >>>>> Hi Becket, >>>>> Thanks for your advice — I’ve quickly learned a lot about the reader’s >>>>> design principle. It’s really interesting! >>>>> >>>>> > One principle we want to follow is that the enumerator should be the >>>>> > brain doing the splits assignment, while the source readers read from >>>>> > the assigned splits. So we want to avoid the case where the >>>>> > SourceReader ignores the split assignment. >>>>> >>>>> It appears that MySQL CDC currently bypasses this principle by >>>>> proactively removing unused splits directly in the SourceReader. This may >>>>> be due to the lack of built-in framework support for such cleanup, >>>>> forcing connectors to handle it manually. However, this responsibility >>>>> ideally belongs in the framework. >>>>> >>>>> With this FLIP, we propose a redesigned mechanism that centralizes split >>>>> cleanup logic in the SplitEnumerator, allowing connectors like MySQL CDC >>>>> to eventually adopt it( @leneord, CC). >>>>> >>>>> To achieve this, we must carefully manage state consistency during >>>>> startup and recovery. The proposed approach is as follows: >>>>> Reader Registration with Deferred Assignment >>>>> When a reader starts (SourceOperator#open), it sends a >>>>> ReaderRegistrationEvent to the SplitEnumerator, including its previously >>>>> assigned splits (restored from state). However, these splits are not yet >>>>> assigned to the reader. The SourceOperator is placed in a PENDING state. >>>>> Prevent State Pollution During Registration >>>>> While in the PENDING state, SourceOperator#snapshotState will not update >>>>> the operator state. This prevents empty or outdated reader state (e.g., >>>>> with removed splits) from polluting the checkpoint. >>>>> Enumerator Performs Split Cleanup and Acknowledges >>>>> Upon receiving the ReaderRegistrationEvent, the SplitEnumerator removes >>>>> any splits that are no longer valid (e.g., due to removed topics or >>>>> tables) and returns the list of remaining valid split IDs to the reader >>>>> via a ReaderRegistrationACKEvent. >>>>> For backward compatibility, the default behavior is to return all split >>>>> IDs (i.e., no filtering). >>>>> Finalize Registration and Resume Normal Operation >>>>> When the SourceOperator receives the ReaderRegistrationACKEvent, it >>>>> assigns the confirmed splits to the reader and transitions its state to >>>>> REGISTERED. From this point onward, SourceOperator#snapshotState can >>>>> safely update the operator state. >>>>> >>>>> Best, >>>>> Hongshun >>>>> >>>>> On Thu, Aug 7, 2025 at 1:57 AM Becket Qin <becket....@gmail.com >>>>> <mailto:becket....@gmail.com>> wrote: >>>>>>> SourceCoordinator doesn't store splits that have already been assigned >>>>>>> to readers, and SplitAssignmentTracker stores the splits only for this >>>>>>> checkpoint, which will be removed after checkpoint. Maybe you mean >>>>>>> SourceOperator? >>>>>> Yes, I meant SourceOperator. >>>>>> >>>>>>> At the beginning, I also thought about using it. However, there are two >>>>>>> situations: >>>>>>> 1. During restart, if source options remove a topic or table: sometimes >>>>>>> connectors like MySQL CDC will remove unused splits after restart in >>>>>>> MySqlSourceReader#addSplits [1]. Kafka lacks this, so if the configured >>>>>>> topics change, removed topic's splits are still read. I also want to do >>>>>>> the same thing in Kafka. >>>>>>> 2. In Kafka or MySQL CDC, some bounded splits, if finished, can be >>>>>>> removed after restart. >>>>>>> In these cases, I have to get the assigned splits after >>>>>>> SourceReader#addSplits, rather than get them from SourceOperator >>>>>>> directly. >>>>>> >>>>>> One principle we want to follow is that the enumerator should be the >>>>>> brain doing the splits assignment, while the source readers read from >>>>>> the assigned splits. So we want to avoid the case where the SourceReader >>>>>> ignores the split assignment. Given this principle, >>>>>> For case 1, if there is a subscription change, it might be better to >>>>>> hold back calling SourceReader.addSplits() until an assignment is >>>>>> confirmed by the Enumerator. In fact, this might be a good default >>>>>> behavior regardless of whether there is a subscription change. >>>>>> For case 2: if a bounded split is finished, the >>>>>> SourceReader.snapshotState() will not contain that split. So upon >>>>>> restoration, those splits should not appear, right? >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Jiangjie (Becket) Qin >>>>>> >>>>>> On Wed, Aug 6, 2025 at 5:19 AM Hongshun Wang <loserwang1...@gmail.com >>>>>> <mailto:loserwang1...@gmail.com>> wrote: >>>>>>> Hi Becket, >>>>>>> >>>>>>> Thank you a lot for your advice, which helped me a lot. >>>>>>> > It seems that we don't need the method >>>>>>> > `SourceReader.getAssignedSplits()`. The assigned splits are available >>>>>>> > in the SourceCoordinator upon state restoration. >>>>>>> >>>>>>> SourceCoordinator doesn't store splits that have already been assigned >>>>>>> to readers, and SplitAssignmentTracker stores the splits only for this >>>>>>> checkpoint, which will be removed after checkpoint. Maybe you mean >>>>>>> SourceOperator? >>>>>>> >>>>>>> At the beginning, I also thought about using it. However, there are two >>>>>>> situations: >>>>>>> 1. During restart, if source options remove a topic or table: sometimes >>>>>>> connectors like MySQL CDC will remove unused splits after restart in >>>>>>> MySqlSourceReader#addSplits [1]. Kafka lacks this, so if the configured >>>>>>> topics change, removed topic's splits are still read. I also want to do >>>>>>> the same thing in Kafka. >>>>>>> 2. In Kafka or MySQL CDC, some bounded splits, if finished, can be >>>>>>> removed after restart. >>>>>>> In these cases, I have to get the assigned splits after >>>>>>> SourceReader#addSplits, rather than get them from SourceOperator >>>>>>> directly. >>>>>>> >>>>>>> > By design, the SplitEnumerator can get the reader information any >>>>>>> > time from the `SplitEnumeratorContext.registeredReaders()`. >>>>>>> It looks good. >>>>>>> >>>>>>> Thanks again. >>>>>>> >>>>>>> Best, >>>>>>> Hongshun >>>>>>> >>>>>>> >>>>>>> [1] >>>>>>> https://github.com/apache/flink-cdc/blob/42f91a864e329c00959828fe0ca4f1e9e8e1de75/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java#L238 >>>>>>> >>>>>>> >>>>>>> On Tue, Aug 5, 2025 at 2:35 PM Becket Qin <becket....@gmail.com >>>>>>> <mailto:becket....@gmail.com>> wrote: >>>>>>>> Hi Hongshun, >>>>>>>> >>>>>>>> Thanks for the proposal. The current Kafka split assignment algorithm >>>>>>>> does seem to have issues. (I cannot recall why it was implemented this >>>>>>>> way at that time...). >>>>>>>> >>>>>>>> Two quick comments: >>>>>>>> 1. It seems that we don't need the method >>>>>>>> `SourceReader.getAssignedSplits()`. The assigned splits are available >>>>>>>> in the SourceCoordinator upon state restoration and can be put into >>>>>>>> the ReaderRegistrationEvent. >>>>>>>> 2. Instead of adding the method `SplitEnumerator.addReader(int >>>>>>>> subtaskId, List<SplitT> assignedSplits)`, add a new field of >>>>>>>> `InitialSplitAssignment` to the ReaderInfo. By design, the >>>>>>>> SplitEnumerator can get the reader information any time from the >>>>>>>> `SplitEnumeratorContext.registeredReaders()`. This also avoids the >>>>>>>> Enumerator implementation to remember the initially assigned splits, >>>>>>>> if it wants to wait until all the readers are registered. This also >>>>>>>> allow future addition of reader information. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> >>>>>>>> Jiangjie (Becket) Qin >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Aug 4, 2025 at 8:39 PM Hongshun Wang <loserwang1...@gmail.com >>>>>>>> <mailto:loserwang1...@gmail.com>> wrote: >>>>>>>>> Anyone familiar with kafka connector can help review this FLIP? I am >>>>>>>>> looking forward for your reply. >>>>>>>>> >>>>>>>>> Best >>>>>>>>> Hongshun >>>>>>>>> >>>>>>>>> On Thu, Jul 24, 2025 at 8:13 PM Leonard Xu <xbjt...@gmail.com >>>>>>>>> <mailto:xbjt...@gmail.com>> wrote: >>>>>>>>>> Thanks Hongshun for driving this work. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> We also suffering the issue in production Kafka restoration usage, >>>>>>>>>> current design is a nice tradeoff and has considered the new Source >>>>>>>>>> implementation details, +1 from my side. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Leonard >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> > 2025 7月 19 18:59,Hongshun Wang <loserwang1...@gmail.com >>>>>>>>>> > <mailto:loserwang1...@gmail.com>> 写道: >>>>>>>>>> > >>>>>>>>>> > Hi devs, >>>>>>>>>> > >>>>>>>>>> > I'd like to initiate a discussion about [FLIP-537: Enumerator with >>>>>>>>>> > Global >>>>>>>>>> > Split Assignment Distribution for Balanced Split Assignment] [1], >>>>>>>>>> > which >>>>>>>>>> > addresses critical limitations in our current Kafka connector split >>>>>>>>>> > distribution mechanism. >>>>>>>>>> > >>>>>>>>>> > As documented in [FLINK-31762] [2], several scenarios currently >>>>>>>>>> > lead to >>>>>>>>>> > uneven Kafka split distribution, causing reader delays and >>>>>>>>>> > performance >>>>>>>>>> > bottlenecks. The core issue stems from the enumerator's lack of >>>>>>>>>> > visibility >>>>>>>>>> > into post-assignment split distribution. >>>>>>>>>> > >>>>>>>>>> > This flip does two things: >>>>>>>>>> > 1. ReaderRegistrationEvent Enhancement: SourceOperator should send >>>>>>>>>> > ReaderRegistrationEvent with assigned splits metadata after >>>>>>>>>> > startup to >>>>>>>>>> > ensure state consistency. >>>>>>>>>> > 2. Implementation in the Kafka connector to resolve imbalanced >>>>>>>>>> > splits and >>>>>>>>>> > state awareness during recovery (the enumerator will always choose >>>>>>>>>> > the >>>>>>>>>> > least assigned subtask,and reason aslo as follows) >>>>>>>>>> > >>>>>>>>>> > Any additional questions regarding this FLIP? Looking forward to >>>>>>>>>> > hearing >>>>>>>>>> > from you. >>>>>>>>>> > >>>>>>>>>> > Best >>>>>>>>>> > Hongshun >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > [1] >>>>>>>>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment >>>>>>>>>> > [2] https://issues.apache.org/jira/browse/FLINK-31762 >>>>>>>>>>