> > SourceCoordinator doesn't store splits that have already been assigned to > readers, and SplitAssignmentTracker stores the splits only for this > checkpoint, which will be removed after checkpoint. Maybe you mean > SourceOperator?
Yes, I meant SourceOperator. At the beginning, I also thought about using it. However, there are two > situations: > 1. During restart, if source options remove a topic or table: sometimes > connectors like MySQL CDC will remove unused splits after restart in > MySqlSourceReader#addSplits [1]. Kafka lacks this, so if the configured > topics change, removed topic's splits are still read. I also want to do the > same thing in Kafka. > 2. In Kafka or MySQL CDC, some bounded splits, if finished, can be removed > after restart. > In these cases, I have to get the assigned splits after > SourceReader#addSplits, rather than get them from SourceOperator directly. One principle we want to follow is that the enumerator should be the brain doing the splits assignment, while the source readers read from the assigned splits. So we want to avoid the case where the SourceReader ignores the split assignment. Given this principle, For case 1, if there is a subscription change, it might be better to hold back calling SourceReader.addSplits() until an assignment is confirmed by the Enumerator. In fact, this might be a good default behavior regardless of whether there is a subscription change. For case 2: if a bounded split is finished, the SourceReader.snapshotState() will not contain that split. So upon restoration, those splits should not appear, right? Thanks, Jiangjie (Becket) Qin On Wed, Aug 6, 2025 at 5:19 AM Hongshun Wang <loserwang1...@gmail.com> wrote: > Hi Becket, > > Thank you a lot for your advice, which helped me a lot. > > It seems that we don't need the method `SourceReader. > getAssignedSplits()`. The assigned splits are available in the > SourceCoordinator upon state restoration. > > SourceCoordinator doesn't store splits that have already been assigned to > readers, and SplitAssignmentTracker stores the splits only for this > checkpoint, which will be removed after checkpoint. Maybe you mean > SourceOperator? > > At the beginning, I also thought about using it. However, there are two > situations: > 1. During restart, if source options remove a topic or table: sometimes > connectors like MySQL CDC will remove unused splits after restart in > MySqlSourceReader#addSplits [1]. Kafka lacks this, so if the configured > topics change, removed topic's splits are still read. I also want to do the > same thing in Kafka. > 2. In Kafka or MySQL CDC, some bounded splits, if finished, can be removed > after restart. > In these cases, I have to get the assigned splits after > SourceReader#addSplits, rather than get them from SourceOperator directly. > > > By design, the SplitEnumerator can get the reader information any time > from the `SplitEnumeratorContext.registeredReaders()`. > It looks good. > > Thanks again. > > Best, > Hongshun > > > [1] > https://github.com/apache/flink-cdc/blob/42f91a864e329c00959828fe0ca4f1e9e8e1de75/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java#L238 > > > On Tue, Aug 5, 2025 at 2:35 PM Becket Qin <becket....@gmail.com> wrote: > >> Hi Hongshun, >> >> Thanks for the proposal. The current Kafka split assignment algorithm >> does seem to have issues. (I cannot recall why it was implemented this way >> at that time...). >> >> Two quick comments: >> 1. It seems that we don't need the method `SourceReader. >> getAssignedSplits()`. The assigned splits are available in the >> SourceCoordinator upon state restoration and can be put into the >> ReaderRegistrationEvent. >> 2. Instead of adding the method `SplitEnumerator.addReader(int >> subtaskId, List<SplitT> assignedSplits)`, add a new field of >> `InitialSplitAssignment` to the ReaderInfo. By design, the SplitEnumerator >> can get the reader information any time from the >> `SplitEnumeratorContext.registeredReaders()`. This also avoids the >> Enumerator implementation to remember the initially assigned splits, if it >> wants to wait until all the readers are registered. This also allow future >> addition of reader information. >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> >> >> On Mon, Aug 4, 2025 at 8:39 PM Hongshun Wang <loserwang1...@gmail.com> >> wrote: >> >>> Anyone familiar with kafka connector can help review this FLIP? I am >>> looking forward for your reply. >>> >>> Best >>> Hongshun >>> >>> On Thu, Jul 24, 2025 at 8:13 PM Leonard Xu <xbjt...@gmail.com> wrote: >>> >>>> Thanks Hongshun for driving this work. >>>> >>>> >>>> We also suffering the issue in production Kafka restoration usage, >>>> current design is a nice tradeoff and has considered the new Source >>>> implementation details, +1 from my side. >>>> >>>> >>>> Best, >>>> Leonard >>>> >>>> >>>> >>>> > 2025 7月 19 18:59,Hongshun Wang <loserwang1...@gmail.com> 写道: >>>> > >>>> > Hi devs, >>>> > >>>> > I'd like to initiate a discussion about [FLIP-537: Enumerator with >>>> Global >>>> > Split Assignment Distribution for Balanced Split Assignment] [1], >>>> which >>>> > addresses critical limitations in our current Kafka connector split >>>> > distribution mechanism. >>>> > >>>> > As documented in [FLINK-31762] [2], several scenarios currently lead >>>> to >>>> > uneven Kafka split distribution, causing reader delays and performance >>>> > bottlenecks. The core issue stems from the enumerator's lack of >>>> visibility >>>> > into post-assignment split distribution. >>>> > >>>> > This flip does two things: >>>> > 1. ReaderRegistrationEvent Enhancement: SourceOperator should send >>>> > ReaderRegistrationEvent with assigned splits metadata after startup to >>>> > ensure state consistency. >>>> > 2. Implementation in the Kafka connector to resolve imbalanced splits >>>> and >>>> > state awareness during recovery (the enumerator will always choose the >>>> > least assigned subtask,and reason aslo as follows) >>>> > >>>> > Any additional questions regarding this FLIP? Looking forward to >>>> hearing >>>> > from you. >>>> > >>>> > Best >>>> > Hongshun >>>> > >>>> > >>>> > [1] >>>> > >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment >>>> > [2] https://issues.apache.org/jira/browse/FLINK-31762 >>>> >>>>