Hi Becket, Thank you for your detailed feedback. The new contract makes good sense to me and effectively addresses the issues I encountered at the beginning of the design.
That said, I recommend not reporting splits by default, primarily for compatibility and practical reasons: > For these reasons, we do not expect the Split objects to be huge, and we are not trying to design for huge Split objects either as they will have problems even today. 1. Not all existing connector match this rule For example, in mysql cdc connector, a binlog split may contain hundreds (or even more) snapshot split completion records. This state is large and is currently transmitted incrementally through multiple BinlogSplitMetaEvent messages. Since the binlog reader operates with single parallelism, reporting the full split state on recovery could be inefficient or even infeasible. For such sources, it would be better to provide a mechanism to skip split reporting during restart until they redesign and reduce the split size. 2. Not all enumerators maintain unassigned splits in state. Some SplitEnumerator(such as kafka connector) implementations do not track or persistently manage unassigned splits. Requiring them to handle re-registration would add unnecessary complexity. Even though we maybe implements in kafka connector, currently, kafka connector is decouple with flink version, we also need to make sure the elder version is compatible. ------------------------------ To address these concerns, I propose introducing a new method: boolean SourceReader#shouldReassignSplitsOnRecovery() with a default implementation returning false. This allows source readers to opt in to split reassignment only when necessary. Since the new contract already places the responsibility for split assignment on the enumerator, not reporting splits by default is a safe and clean default behavior. ------------------------------ I’ve updated the implementation and the FIP accordingly[1]. It quite a big change. In particular, for the Kafka connector, we can now use a pluggable SplitPartitioner to support different split assignment strategies (e.g., default, round-robin). Could you please review it when you have a chance? Best, Hongshun [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment On Sat, Aug 9, 2025 at 3:03 AM Becket Qin <becket....@gmail.com> wrote: > Hi Hongshun, > > I am not too concerned about the transmission cost. Because the full split > transmission has to happen in the initial assignment phase already. And in > the future, we probably want to also introduce some kind of workload > balance across source readers, e.g. based on the per-split throughput or > the per-source-reader workload in heterogeneous clusters. For these > reasons, we do not expect the Split objects to be huge, and we are not > trying to design for huge Split objects either as they will have problems > even today. > > Good point on the potential split loss, please see the reply below: > > Scenario 2: > > >> 1. Reader A reports splits (1 and 2), and Reader B reports (3 and 4) upon >> restart. >> 2. Before the enumerator receives all reports and performs reassignment, >> a checkpoint is triggered. >> 3. Since no splits have been reassigned yet, both readers have empty >> states. >> 4. When restarting from this checkpoint, all four splits are lost. > > The reader registration happens in the SourceOperator.open(), which means > the task is still in the initializing state, therefore the checkpoint > should not be triggered until the enumerator receives all the split reports. > > There is a nuance here. Today, the RPC call from the TM to the JM is > async. So it is possible that the SourceOpertor.open() has returned, but > the enumerator has not received the split reports. However, because the > task status update RPC call goes to the same channel as the split reports > call, so the task status RPC call will happen after the split reports call > on the JM side. Therefore, on the JM side, the SourceCoordinator will > always first receive the split reports, then receive the checkpoint request. > This "happen before" relationship is kind of important to guarantee the > consistent state between enumerator and readers. > > Scenario 1: > > >> 1. Upon restart, Reader A reports assigned splits (1 and 2), and Reader B >> reports (3 and 4). >> 2. The enumerator receives these reports but only reassigns splits 1 and >> 2 — not 3 and 4. >> 3. A checkpoint or savepoint is then triggered. Only splits 1 and 2 are >> recorded in the reader states; splits 3 and 4 are not persisted. >> 4. If the job is later restarted from this checkpoint, splits 3 and 4 >> will be permanently lost. > > This scenario is possible. One solution is to let the enumerator > implementation handle this. That means if the enumerator relies on the > initial split reports from the source readers, it should maintain these > reports by itself. In the above example, the enumerator will need to > remember that 3 and 4 are not assigned and put it into its own state. > The current contract is that anything assigned to the SourceReaders > are completely owned by the SourceReaders. Enumerators can remember the > assignments but cannot change them, even when the source reader recovers / > restarts. > With this FLIP, the contract becomes that the source readers will return > the ownership of the splits to the enumerator. So the enumerator is > responsible for maintaining these splits, until they are assigned to a > source reader again. > > There are other cases where there may be conflict information between > reader and enumerator. For example, consider the following sequence: > 1. reader A reports splits (1 and 2) up on restart. > 2. enumerator receives the report and assigns both 1 and 2 to reader B. > 3. reader A failed before checkpointing. And this is a partial failure, so > only reader A restarts. > 4. When reader A recovers, it will again report splits (1 and 2) to the > enumerator. > 5. The enumerator should ignore this report because it has assigned splits > (1 and 2) to reader B. > > So with the new contract, the enumerator should be the source of truth for > split ownership. > > Thanks, > > Jiangjie (Becket) Qin > > On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang <loserwang1...@gmail.com> > wrote: > >> Hi Becket, >> >> >> I did consider this approach at the beginning (and it was also mentioned >> in this FLIP), since it would allow more flexibility in reassigning all >> splits. However, there are a few potential issues. >> >> 1. High Transmission Cost >> If we pass the full split objects (rather than just split IDs), the data >> size could be significant, leading to high overhead during transmission — >> especially when many splits are involved. >> >> 2. Risk of Split Loss >> >> Risk of split loss exists unless we have a mechanism to make sure only >> can checkpoint after all the splits are reassigned. >> There are scenarios where splits could be lost due to inconsistent state >> handling during recovery: >> >> >> Scenario 1: >> >> >> 1. Upon restart, Reader A reports assigned splits (1 and 2), and >> Reader B reports (3 and 4). >> 2. The enumerator receives these reports but only reassigns splits 1 >> and 2 — not 3 and 4. >> 3. A checkpoint or savepoint is then triggered. Only splits 1 and 2 >> are recorded in the reader states; splits 3 and 4 are not persisted. >> 4. If the job is later restarted from this checkpoint, splits 3 and 4 >> will be permanently lost. >> >> >> Scenario 2: >> >> 1. Reader A reports splits (1 and 2), and Reader B reports (3 and 4) >> upon restart. >> 2. Before the enumerator receives all reports and performs >> reassignment, a checkpoint is triggered. >> 3. Since no splits have been reassigned yet, both readers have empty >> states. >> 4. When restarting from this checkpoint, all four splits are lost. >> >> >> Let me know if you have thoughts on how we might mitigate these risks! >> >> Best >> Hongshun >> >> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin <becket....@gmail.com> wrote: >> >>> Hi Hongshun, >>> >>> The steps sound reasonable to me in general. In terms of the updated >>> FLIP wiki, it would be good to see if we can keep the protocol simple. One >>> alternative way to achieve this behavior is following: >>> >>> 1. Upon SourceOperator startup, the SourceOperator sends >>> ReaderRegistrationEvent with the currently assigned splits to the >>> enumerator. It does not add these splits to the SourceReader. >>> 2. The enumerator will always use the >>> SourceEnumeratorContext.assignSplits() to assign the splits. (not via the >>> response of the SourceRegistrationEvent, this allows async split assignment >>> in case the enumerator wants to wait until all the readers are registered) >>> 3. The SourceOperator will only call SourceReader.addSplits() when it >>> receives the AddSplitEvent from the enumerator. >>> >>> This protocol has a few benefits: >>> 1. it basically allows arbitrary split reassignment upon restart >>> 2. simplicity: there is only one way to assign splits. >>> >>> So we only need one interface change: >>> - add the initially assigned splits to ReaderInfo so the Enumerator can >>> access it. >>> and one behavior change: >>> - The SourceOperator should stop assigning splits to the from state >>> restoration, but only do that when it receives AddSplitsEvent from the >>> enumerator. >>> >>> The enumerator story is also simple: >>> 1. Receive some kind of notification (new partition, new reader, etc) >>> 2. look at the reader information (in the enumerator context or >>> self-maintained state) >>> 3. assign splits via the enumerator context. >>> >>> Thanks, >>> >>> Jiangjie (Becket) Qin >>> >>> On Thu, Aug 7, 2025 at 1:31 AM Hongshun Wang <loserwang1...@gmail.com> >>> wrote: >>> >>>> Hi Becket, >>>> Thanks for your advice — I’ve quickly learned a lot about the reader’s >>>> design principle. It’s really interesting! >>>> >>>> > One principle we want to follow is that the enumerator should be the >>>> brain doing the splits assignment, while the source readers read from the >>>> assigned splits. So we want to avoid the case where the SourceReader >>>> ignores the split assignment. >>>> >>>> It appears that MySQL CDC currently bypasses this principle by >>>> proactively removing unused splits directly in the SourceReader. This may >>>> be due to the lack of built-in framework support for such cleanup, forcing >>>> connectors to handle it manually. However, this responsibility ideally >>>> belongs in the framework. >>>> >>>> With this FLIP, we propose a redesigned mechanism that centralizes >>>> split cleanup logic in the SplitEnumerator, allowing connectors like MySQL >>>> CDC to eventually adopt it( @leneord, CC). >>>> >>>> >>>> To achieve this, we must carefully manage state consistency during >>>> startup and recovery. The proposed approach is as follows: >>>> >>>> 1. Reader Registration with Deferred Assignment >>>> When a reader starts (SourceOperator#open), it sends a >>>> ReaderRegistrationEvent to the SplitEnumerator, including its >>>> previously assigned splits (restored from state). However, these splits >>>> are not yet assigned to the reader. The SourceOperator is placed in >>>> a PENDING state. >>>> 2. Prevent State Pollution During Registration >>>> While in the PENDING state, SourceOperator#snapshotState will not >>>> update the operator state. This prevents empty or outdated reader state >>>> (e.g., with removed splits) from polluting the checkpoint. >>>> 3. Enumerator Performs Split Cleanup and Acknowledges >>>> Upon receiving the ReaderRegistrationEvent, the SplitEnumerator removes >>>> any splits that are no longer valid (e.g., due to removed topics or >>>> tables) >>>> and returns the list of remaining valid split IDs to the reader via a >>>> ReaderRegistrationACKEvent. >>>> For backward compatibility, the default behavior is to return all >>>> split IDs (i.e., no filtering). >>>> 4. Finalize Registration and Resume Normal Operation >>>> When the SourceOperator receives the ReaderRegistrationACKEvent, it >>>> assigns the confirmed splits to the reader and transitions its state to >>>> REGISTERED. From this point onward, SourceOperator#snapshotState can >>>> safely update the operator state. >>>> >>>> >>>> Best, >>>> Hongshun >>>> >>>> On Thu, Aug 7, 2025 at 1:57 AM Becket Qin <becket....@gmail.com> wrote: >>>> >>>>> SourceCoordinator doesn't store splits that have already been assigned >>>>>> to readers, and SplitAssignmentTracker stores the splits only for this >>>>>> checkpoint, which will be removed after checkpoint. Maybe you mean >>>>>> SourceOperator? >>>>> >>>>> Yes, I meant SourceOperator. >>>>> >>>>> At the beginning, I also thought about using it. However, there are >>>>>> two situations: >>>>>> 1. During restart, if source options remove a topic or table: >>>>>> sometimes connectors like MySQL CDC will remove unused splits after >>>>>> restart >>>>>> in MySqlSourceReader#addSplits [1]. Kafka lacks this, so if the >>>>>> configured >>>>>> topics change, removed topic's splits are still read. I also want to do >>>>>> the >>>>>> same thing in Kafka. >>>>>> 2. In Kafka or MySQL CDC, some bounded splits, if finished, can be >>>>>> removed after restart. >>>>>> In these cases, I have to get the assigned splits after >>>>>> SourceReader#addSplits, rather than get them from SourceOperator >>>>>> directly. >>>>> >>>>> >>>>> One principle we want to follow is that the enumerator should be the >>>>> brain doing the splits assignment, while the source readers read from the >>>>> assigned splits. So we want to avoid the case where the SourceReader >>>>> ignores the split assignment. Given this principle, >>>>> For case 1, if there is a subscription change, it might be better to >>>>> hold back calling SourceReader.addSplits() until an assignment is >>>>> confirmed >>>>> by the Enumerator. In fact, this might be a good default behavior >>>>> regardless of whether there is a subscription change. >>>>> For case 2: if a bounded split is finished, the >>>>> SourceReader.snapshotState() will not contain that split. So upon >>>>> restoration, those splits should not appear, right? >>>>> >>>>> Thanks, >>>>> >>>>> Jiangjie (Becket) Qin >>>>> >>>>> On Wed, Aug 6, 2025 at 5:19 AM Hongshun Wang <loserwang1...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Becket, >>>>>> >>>>>> Thank you a lot for your advice, which helped me a lot. >>>>>> > It seems that we don't need the method `SourceReader. >>>>>> getAssignedSplits()`. The assigned splits are available in the >>>>>> SourceCoordinator upon state restoration. >>>>>> >>>>>> SourceCoordinator doesn't store splits that have already been >>>>>> assigned to readers, and SplitAssignmentTracker stores the splits only >>>>>> for >>>>>> this checkpoint, which will be removed after checkpoint. Maybe you mean >>>>>> SourceOperator? >>>>>> >>>>>> At the beginning, I also thought about using it. However, there are >>>>>> two situations: >>>>>> 1. During restart, if source options remove a topic or table: >>>>>> sometimes connectors like MySQL CDC will remove unused splits after >>>>>> restart >>>>>> in MySqlSourceReader#addSplits [1]. Kafka lacks this, so if the >>>>>> configured >>>>>> topics change, removed topic's splits are still read. I also want to do >>>>>> the >>>>>> same thing in Kafka. >>>>>> 2. In Kafka or MySQL CDC, some bounded splits, if finished, can be >>>>>> removed after restart. >>>>>> In these cases, I have to get the assigned splits after >>>>>> SourceReader#addSplits, rather than get them from SourceOperator >>>>>> directly. >>>>>> >>>>>> > By design, the SplitEnumerator can get the reader information any >>>>>> time from the `SplitEnumeratorContext.registeredReaders()`. >>>>>> It looks good. >>>>>> >>>>>> Thanks again. >>>>>> >>>>>> Best, >>>>>> Hongshun >>>>>> >>>>>> >>>>>> [1] >>>>>> https://github.com/apache/flink-cdc/blob/42f91a864e329c00959828fe0ca4f1e9e8e1de75/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java#L238 >>>>>> >>>>>> >>>>>> On Tue, Aug 5, 2025 at 2:35 PM Becket Qin <becket....@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Hongshun, >>>>>>> >>>>>>> Thanks for the proposal. The current Kafka split assignment >>>>>>> algorithm does seem to have issues. (I cannot recall why it was >>>>>>> implemented >>>>>>> this way at that time...). >>>>>>> >>>>>>> Two quick comments: >>>>>>> 1. It seems that we don't need the method `SourceReader. >>>>>>> getAssignedSplits()`. The assigned splits are available in the >>>>>>> SourceCoordinator upon state restoration and can be put into the >>>>>>> ReaderRegistrationEvent. >>>>>>> 2. Instead of adding the method `SplitEnumerator.addReader(int >>>>>>> subtaskId, List<SplitT> assignedSplits)`, add a new field of >>>>>>> `InitialSplitAssignment` to the ReaderInfo. By design, the >>>>>>> SplitEnumerator >>>>>>> can get the reader information any time from the >>>>>>> `SplitEnumeratorContext.registeredReaders()`. This also avoids the >>>>>>> Enumerator implementation to remember the initially assigned splits, if >>>>>>> it >>>>>>> wants to wait until all the readers are registered. This also allow >>>>>>> future >>>>>>> addition of reader information. >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> Jiangjie (Becket) Qin >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Mon, Aug 4, 2025 at 8:39 PM Hongshun Wang < >>>>>>> loserwang1...@gmail.com> wrote: >>>>>>> >>>>>>>> Anyone familiar with kafka connector can help review this FLIP? I >>>>>>>> am looking forward for your reply. >>>>>>>> >>>>>>>> Best >>>>>>>> Hongshun >>>>>>>> >>>>>>>> On Thu, Jul 24, 2025 at 8:13 PM Leonard Xu <xbjt...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks Hongshun for driving this work. >>>>>>>>> >>>>>>>>> >>>>>>>>> We also suffering the issue in production Kafka restoration usage, >>>>>>>>> current design is a nice tradeoff and has considered the new Source >>>>>>>>> implementation details, +1 from my side. >>>>>>>>> >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Leonard >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> > 2025 7月 19 18:59,Hongshun Wang <loserwang1...@gmail.com> 写道: >>>>>>>>> > >>>>>>>>> > Hi devs, >>>>>>>>> > >>>>>>>>> > I'd like to initiate a discussion about [FLIP-537: Enumerator >>>>>>>>> with Global >>>>>>>>> > Split Assignment Distribution for Balanced Split Assignment] >>>>>>>>> [1], which >>>>>>>>> > addresses critical limitations in our current Kafka connector >>>>>>>>> split >>>>>>>>> > distribution mechanism. >>>>>>>>> > >>>>>>>>> > As documented in [FLINK-31762] [2], several scenarios currently >>>>>>>>> lead to >>>>>>>>> > uneven Kafka split distribution, causing reader delays and >>>>>>>>> performance >>>>>>>>> > bottlenecks. The core issue stems from the enumerator's lack of >>>>>>>>> visibility >>>>>>>>> > into post-assignment split distribution. >>>>>>>>> > >>>>>>>>> > This flip does two things: >>>>>>>>> > 1. ReaderRegistrationEvent Enhancement: SourceOperator should >>>>>>>>> send >>>>>>>>> > ReaderRegistrationEvent with assigned splits metadata after >>>>>>>>> startup to >>>>>>>>> > ensure state consistency. >>>>>>>>> > 2. Implementation in the Kafka connector to resolve imbalanced >>>>>>>>> splits and >>>>>>>>> > state awareness during recovery (the enumerator will always >>>>>>>>> choose the >>>>>>>>> > least assigned subtask,and reason aslo as follows) >>>>>>>>> > >>>>>>>>> > Any additional questions regarding this FLIP? Looking forward to >>>>>>>>> hearing >>>>>>>>> > from you. >>>>>>>>> > >>>>>>>>> > Best >>>>>>>>> > Hongshun >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > [1] >>>>>>>>> > >>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment >>>>>>>>> > [2] https://issues.apache.org/jira/browse/FLINK-31762 >>>>>>>>> >>>>>>>>>