Hi Leonard, Thanks for your advice. It makes sense and I have modified it.
Best, Hongshun On Mon, Sep 8, 2025 at 11:40 AM Leonard Xu <xbjt...@gmail.com> wrote: > Thanks Hongshun and Becket for the deep discussion. > > I only have one comment for one API design: > > > Deprecate the old addSplitsBack method, use a addSplitsBack with param > isReportedByReader instead. Because, The enumerator can apply different > reassignment policies based on the context. > > Could we introduce a new method like *addSplitsBackOnRecovery* with default > implementation. In this way, we can provide better backward compatibility > and also makes it easier for developers to understand. > > Best, > Leonard > > > > 2025 9月 3 20:26,Hongshun Wang <loserwang1...@gmail.com> 写道: > > Hi Becket, > > I think that's a great idea! I have added the > SupportSplitReassignmentOnRecovery interface in this FLIP. If a Source > implements this interface indicates that the source operator needs to > report splits to the enumerator and receive reassignment.[1] > > Best, > Hongshun > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment > > On Thu, Aug 21, 2025 at 12:09 PM Becket Qin <becket....@gmail.com> wrote: > >> Hi Hongshun, >> >> I think the convention for such optional features in Source is via mix-in >> interfaces. So instead of adding a method to the SourceReader, maybe we >> should introduce an interface SupportSplitReassingmentOnRecovery with this >> method. If a Source implementation implements that interface, then the >> SourceOperator will check the desired behavior and act accordingly. >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> On Wed, Aug 20, 2025 at 8:52 PM Hongshun Wang <loserwang1...@gmail.com> >> wrote: >> >>> Hi de vs, >>> >>> Would anyone like to discuss this FLIP? I'd appreciate your feedback and >>> suggestions. >>> >>> Best, >>> Hongshun >>> >>> >>> 2025年8月13日 14:23,Hongshun Wang <loserwang1...@gmail.com> 写道: >>> >>> Hi Becket, >>> >>> Thank you for your detailed feedback. The new contract makes good sense >>> to me and effectively addresses the issues I encountered at the beginning >>> of the design. >>> >>> That said, I recommend not reporting splits by default, primarily for >>> compatibility and practical reasons: >>> >>> > For these reasons, we do not expect the Split objects to be huge, >>> and we are not trying to design for huge Split objects either as they will >>> have problems even today. >>> >>> 1. >>> >>> Not all existing connector match this rule >>> For example, in mysql cdc connector, a binlog split may contain >>> hundreds (or even more) snapshot split completion records. This state is >>> large and is currently transmitted incrementally through multiple >>> BinlogSplitMetaEvent messages. Since the binlog reader operates with >>> single >>> parallelism, reporting the full split state on recovery could be >>> inefficient or even infeasible. >>> For such sources, it would be better to provide a mechanism to skip >>> split reporting during restart until they redesign and reduce the >>> split size. >>> 2. >>> >>> Not all enumerators maintain unassigned splits in state. >>> Some SplitEnumerator(such as kafka connector) implementations do not >>> track or persistently manage unassigned splits. Requiring them to handle >>> re-registration would add unnecessary complexity. Even though we maybe >>> implements in kafka connector, currently, kafka connector is decouple >>> with >>> flink version, we also need to make sure the elder version is compatible. >>> >>> ------------------------------ >>> >>> To address these concerns, I propose introducing a new method: boolean >>> SourceReader#shouldReassignSplitsOnRecovery() with a default >>> implementation returning false. This allows source readers to opt in to >>> split reassignment only when necessary. Since the new contract already >>> places the responsibility for split assignment on the enumerator, not >>> reporting splits by default is a safe and clean default behavior. >>> >>> >>> ------------------------------ >>> >>> >>> I’ve updated the implementation and the FIP accordingly[1]. It quite a >>> big change. In particular, for the Kafka connector, we can now use a >>> pluggable SplitPartitioner to support different split assignment >>> strategies (e.g., default, round-robin). >>> >>> >>> Could you please review it when you have a chance? >>> >>> >>> Best, >>> >>> Hongshun >>> >>> >>> [1] >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment >>> >>> On Sat, Aug 9, 2025 at 3:03 AM Becket Qin <becket....@gmail.com> wrote: >>> >>>> Hi Hongshun, >>>> >>>> I am not too concerned about the transmission cost. Because the full >>>> split transmission has to happen in the initial assignment phase already. >>>> And in the future, we probably want to also introduce some kind of workload >>>> balance across source readers, e.g. based on the per-split throughput or >>>> the per-source-reader workload in heterogeneous clusters. For these >>>> reasons, we do not expect the Split objects to be huge, and we are not >>>> trying to design for huge Split objects either as they will have problems >>>> even today. >>>> >>>> Good point on the potential split loss, please see the reply below: >>>> >>>> Scenario 2: >>>> >>>> >>>>> 1. Reader A reports splits (1 and 2), and Reader B reports (3 and 4) >>>>> upon restart. >>>>> 2. Before the enumerator receives all reports and performs >>>>> reassignment, a checkpoint is triggered. >>>>> 3. Since no splits have been reassigned yet, both readers have empty >>>>> states. >>>>> 4. When restarting from this checkpoint, all four splits are lost. >>>> >>>> The reader registration happens in the SourceOperator.open(), which >>>> means the task is still in the initializing state, therefore the checkpoint >>>> should not be triggered until the enumerator receives all the split >>>> reports. >>>> >>>> There is a nuance here. Today, the RPC call from the TM to the JM is >>>> async. So it is possible that the SourceOpertor.open() has returned, but >>>> the enumerator has not received the split reports. However, because the >>>> task status update RPC call goes to the same channel as the split reports >>>> call, so the task status RPC call will happen after the split reports call >>>> on the JM side. Therefore, on the JM side, the SourceCoordinator will >>>> always first receive the split reports, then receive the checkpoint >>>> request. >>>> This "happen before" relationship is kind of important to guarantee the >>>> consistent state between enumerator and readers. >>>> >>>> Scenario 1: >>>> >>>> >>>>> 1. Upon restart, Reader A reports assigned splits (1 and 2), and >>>>> Reader B reports (3 and 4). >>>>> 2. The enumerator receives these reports but only reassigns splits 1 >>>>> and 2 — not 3 and 4. >>>>> 3. A checkpoint or savepoint is then triggered. Only splits 1 and 2 >>>>> are recorded in the reader states; splits 3 and 4 are not persisted. >>>>> 4. If the job is later restarted from this checkpoint, splits 3 and 4 >>>>> will be permanently lost. >>>> >>>> This scenario is possible. One solution is to let the enumerator >>>> implementation handle this. That means if the enumerator relies on the >>>> initial split reports from the source readers, it should maintain these >>>> reports by itself. In the above example, the enumerator will need to >>>> remember that 3 and 4 are not assigned and put it into its own state. >>>> The current contract is that anything assigned to the SourceReaders >>>> are completely owned by the SourceReaders. Enumerators can remember the >>>> assignments but cannot change them, even when the source reader recovers / >>>> restarts. >>>> With this FLIP, the contract becomes that the source readers will >>>> return the ownership of the splits to the enumerator. So the enumerator is >>>> responsible for maintaining these splits, until they are assigned to a >>>> source reader again. >>>> >>>> There are other cases where there may be conflict information between >>>> reader and enumerator. For example, consider the following sequence: >>>> 1. reader A reports splits (1 and 2) up on restart. >>>> 2. enumerator receives the report and assigns both 1 and 2 to reader B. >>>> 3. reader A failed before checkpointing. And this is a partial failure, >>>> so only reader A restarts. >>>> 4. When reader A recovers, it will again report splits (1 and 2) to the >>>> enumerator. >>>> 5. The enumerator should ignore this report because it has >>>> assigned splits (1 and 2) to reader B. >>>> >>>> So with the new contract, the enumerator should be the source of truth >>>> for split ownership. >>>> >>>> Thanks, >>>> >>>> Jiangjie (Becket) Qin >>>> >>>> On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang <loserwang1...@gmail.com> >>>> wrote: >>>> >>>>> Hi Becket, >>>>> >>>>> >>>>> I did consider this approach at the beginning (and it was also >>>>> mentioned in this FLIP), since it would allow more flexibility in >>>>> reassigning all splits. However, there are a few potential issues. >>>>> >>>>> 1. High Transmission Cost >>>>> If we pass the full split objects (rather than just split IDs), the >>>>> data size could be significant, leading to high overhead during >>>>> transmission — especially when many splits are involved. >>>>> >>>>> 2. Risk of Split Loss >>>>> >>>>> Risk of split loss exists unless we have a mechanism to make sure only >>>>> can checkpoint after all the splits are reassigned. >>>>> There are scenarios where splits could be lost due to inconsistent >>>>> state handling during recovery: >>>>> >>>>> >>>>> Scenario 1: >>>>> >>>>> >>>>> 1. Upon restart, Reader A reports assigned splits (1 and 2), and >>>>> Reader B reports (3 and 4). >>>>> 2. The enumerator receives these reports but only reassigns splits >>>>> 1 and 2 — not 3 and 4. >>>>> 3. A checkpoint or savepoint is then triggered. Only splits 1 and >>>>> 2 are recorded in the reader states; splits 3 and 4 are not persisted. >>>>> 4. If the job is later restarted from this checkpoint, splits 3 >>>>> and 4 will be permanently lost. >>>>> >>>>> >>>>> Scenario 2: >>>>> >>>>> 1. Reader A reports splits (1 and 2), and Reader B reports (3 and >>>>> 4) upon restart. >>>>> 2. Before the enumerator receives all reports and performs >>>>> reassignment, a checkpoint is triggered. >>>>> 3. Since no splits have been reassigned yet, both readers have >>>>> empty states. >>>>> 4. When restarting from this checkpoint, all four splits are lost. >>>>> >>>>> >>>>> Let me know if you have thoughts on how we might mitigate these risks! >>>>> >>>>> Best >>>>> Hongshun >>>>> >>>>> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin <becket....@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Hongshun, >>>>>> >>>>>> The steps sound reasonable to me in general. In terms of the updated >>>>>> FLIP wiki, it would be good to see if we can keep the protocol simple. >>>>>> One >>>>>> alternative way to achieve this behavior is following: >>>>>> >>>>>> 1. Upon SourceOperator startup, the SourceOperator sends >>>>>> ReaderRegistrationEvent with the currently assigned splits to the >>>>>> enumerator. It does not add these splits to the SourceReader. >>>>>> 2. The enumerator will always use the >>>>>> SourceEnumeratorContext.assignSplits() to assign the splits. (not via the >>>>>> response of the SourceRegistrationEvent, this allows async split >>>>>> assignment >>>>>> in case the enumerator wants to wait until all the readers are >>>>>> registered) >>>>>> 3. The SourceOperator will only call SourceReader.addSplits() when it >>>>>> receives the AddSplitEvent from the enumerator. >>>>>> >>>>>> This protocol has a few benefits: >>>>>> 1. it basically allows arbitrary split reassignment upon restart >>>>>> 2. simplicity: there is only one way to assign splits. >>>>>> >>>>>> So we only need one interface change: >>>>>> - add the initially assigned splits to ReaderInfo so the Enumerator >>>>>> can access it. >>>>>> and one behavior change: >>>>>> - The SourceOperator should stop assigning splits to the from state >>>>>> restoration, but only do that when it receives AddSplitsEvent from the >>>>>> enumerator. >>>>>> >>>>>> The enumerator story is also simple: >>>>>> 1. Receive some kind of notification (new partition, new reader, etc) >>>>>> 2. look at the reader information (in the enumerator context or >>>>>> self-maintained state) >>>>>> 3. assign splits via the enumerator context. >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Jiangjie (Becket) Qin >>>>>> >>>>>> On Thu, Aug 7, 2025 at 1:31 AM Hongshun Wang <loserwang1...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Becket, >>>>>>> Thanks for your advice — I’ve quickly learned a lot about the >>>>>>> reader’s design principle. It’s really interesting! >>>>>>> >>>>>>> > One principle we want to follow is that the enumerator should be >>>>>>> the brain doing the splits assignment, while the source readers read >>>>>>> from >>>>>>> the assigned splits. So we want to avoid the case where the SourceReader >>>>>>> ignores the split assignment. >>>>>>> >>>>>>> It appears that MySQL CDC currently bypasses this principle by >>>>>>> proactively removing unused splits directly in the SourceReader. This >>>>>>> may >>>>>>> be due to the lack of built-in framework support for such cleanup, >>>>>>> forcing >>>>>>> connectors to handle it manually. However, this responsibility ideally >>>>>>> belongs in the framework. >>>>>>> >>>>>>> With this FLIP, we propose a redesigned mechanism that centralizes >>>>>>> split cleanup logic in the SplitEnumerator, allowing connectors like >>>>>>> MySQL >>>>>>> CDC to eventually adopt it( @leneord, CC). >>>>>>> >>>>>>> To achieve this, we must carefully manage state consistency during >>>>>>> startup and recovery. The proposed approach is as follows: >>>>>>> >>>>>>> 1. Reader Registration with Deferred Assignment >>>>>>> When a reader starts (SourceOperator#open), it sends a >>>>>>> ReaderRegistrationEvent to the SplitEnumerator, including its >>>>>>> previously assigned splits (restored from state). However, these >>>>>>> splits >>>>>>> are not yet assigned to the reader. The SourceOperator is placed >>>>>>> in a PENDING state. >>>>>>> 2. Prevent State Pollution During Registration >>>>>>> While in the PENDING state, SourceOperator#snapshotState will >>>>>>> not update the operator state. This prevents empty or outdated >>>>>>> reader state >>>>>>> (e.g., with removed splits) from polluting the checkpoint. >>>>>>> 3. Enumerator Performs Split Cleanup and Acknowledges >>>>>>> Upon receiving the ReaderRegistrationEvent, the SplitEnumerator >>>>>>> removes >>>>>>> any splits that are no longer valid (e.g., due to removed topics or >>>>>>> tables) >>>>>>> and returns the list of remaining valid split IDs to the reader via a >>>>>>> ReaderRegistrationACKEvent. >>>>>>> For backward compatibility, the default behavior is to return >>>>>>> all split IDs (i.e., no filtering). >>>>>>> 4. Finalize Registration and Resume Normal Operation >>>>>>> When the SourceOperator receives the ReaderRegistrationACKEvent, >>>>>>> it assigns the confirmed splits to the reader and transitions its >>>>>>> state to >>>>>>> REGISTERED. From this point onward, SourceOperator#snapshotState can >>>>>>> safely update the operator state. >>>>>>> >>>>>>> >>>>>>> Best, >>>>>>> Hongshun >>>>>>> >>>>>>> On Thu, Aug 7, 2025 at 1:57 AM Becket Qin <becket....@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> SourceCoordinator doesn't store splits that have already been >>>>>>>>> assigned to readers, and SplitAssignmentTracker stores the splits >>>>>>>>> only for >>>>>>>>> this checkpoint, which will be removed after checkpoint. Maybe you >>>>>>>>> mean >>>>>>>>> SourceOperator? >>>>>>>> >>>>>>>> Yes, I meant SourceOperator. >>>>>>>> >>>>>>>> At the beginning, I also thought about using it. However, there are >>>>>>>>> two situations: >>>>>>>>> 1. During restart, if source options remove a topic or table: >>>>>>>>> sometimes connectors like MySQL CDC will remove unused splits after >>>>>>>>> restart >>>>>>>>> in MySqlSourceReader#addSplits [1]. Kafka lacks this, so if the >>>>>>>>> configured >>>>>>>>> topics change, removed topic's splits are still read. I also want to >>>>>>>>> do the >>>>>>>>> same thing in Kafka. >>>>>>>>> 2. In Kafka or MySQL CDC, some bounded splits, if finished, can be >>>>>>>>> removed after restart. >>>>>>>>> In these cases, I have to get the assigned splits after >>>>>>>>> SourceReader#addSplits, rather than get them from SourceOperator >>>>>>>>> directly. >>>>>>>> >>>>>>>> >>>>>>>> One principle we want to follow is that the enumerator should be >>>>>>>> the brain doing the splits assignment, while the source readers read >>>>>>>> from >>>>>>>> the assigned splits. So we want to avoid the case where the >>>>>>>> SourceReader >>>>>>>> ignores the split assignment. Given this principle, >>>>>>>> For case 1, if there is a subscription change, it might be better >>>>>>>> to hold back calling SourceReader.addSplits() until an assignment is >>>>>>>> confirmed by the Enumerator. In fact, this might be a good default >>>>>>>> behavior >>>>>>>> regardless of whether there is a subscription change. >>>>>>>> For case 2: if a bounded split is finished, the >>>>>>>> SourceReader.snapshotState() will not contain that split. So upon >>>>>>>> restoration, those splits should not appear, right? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> >>>>>>>> Jiangjie (Becket) Qin >>>>>>>> >>>>>>>> On Wed, Aug 6, 2025 at 5:19 AM Hongshun Wang < >>>>>>>> loserwang1...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi Becket, >>>>>>>>> >>>>>>>>> Thank you a lot for your advice, which helped me a lot. >>>>>>>>> > It seems that we don't need the method `SourceReader. >>>>>>>>> getAssignedSplits()`. The assigned splits are available in the >>>>>>>>> SourceCoordinator upon state restoration. >>>>>>>>> >>>>>>>>> SourceCoordinator doesn't store splits that have already been >>>>>>>>> assigned to readers, and SplitAssignmentTracker stores the splits >>>>>>>>> only for >>>>>>>>> this checkpoint, which will be removed after checkpoint. Maybe you >>>>>>>>> mean >>>>>>>>> SourceOperator? >>>>>>>>> >>>>>>>>> At the beginning, I also thought about using it. However, there >>>>>>>>> are two situations: >>>>>>>>> 1. During restart, if source options remove a topic or table: >>>>>>>>> sometimes connectors like MySQL CDC will remove unused splits after >>>>>>>>> restart >>>>>>>>> in MySqlSourceReader#addSplits [1]. Kafka lacks this, so if the >>>>>>>>> configured >>>>>>>>> topics change, removed topic's splits are still read. I also want to >>>>>>>>> do the >>>>>>>>> same thing in Kafka. >>>>>>>>> 2. In Kafka or MySQL CDC, some bounded splits, if finished, can be >>>>>>>>> removed after restart. >>>>>>>>> In these cases, I have to get the assigned splits after >>>>>>>>> SourceReader#addSplits, rather than get them from SourceOperator >>>>>>>>> directly. >>>>>>>>> >>>>>>>>> > By design, the SplitEnumerator can get the reader information >>>>>>>>> any time from the `SplitEnumeratorContext.registeredReaders()`. >>>>>>>>> It looks good. >>>>>>>>> >>>>>>>>> Thanks again. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Hongshun >>>>>>>>> >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://github.com/apache/flink-cdc/blob/42f91a864e329c00959828fe0ca4f1e9e8e1de75/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java#L238 >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Aug 5, 2025 at 2:35 PM Becket Qin <becket....@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Hongshun, >>>>>>>>>> >>>>>>>>>> Thanks for the proposal. The current Kafka split assignment >>>>>>>>>> algorithm does seem to have issues. (I cannot recall why it was >>>>>>>>>> implemented >>>>>>>>>> this way at that time...). >>>>>>>>>> >>>>>>>>>> Two quick comments: >>>>>>>>>> 1. It seems that we don't need the method `SourceReader. >>>>>>>>>> getAssignedSplits()`. The assigned splits are available in the >>>>>>>>>> SourceCoordinator upon state restoration and can be put into the >>>>>>>>>> ReaderRegistrationEvent. >>>>>>>>>> 2. Instead of adding the method `SplitEnumerator.addReader(int >>>>>>>>>> subtaskId, List<SplitT> assignedSplits)`, add a new field of >>>>>>>>>> `InitialSplitAssignment` to the ReaderInfo. By design, the >>>>>>>>>> SplitEnumerator >>>>>>>>>> can get the reader information any time from the >>>>>>>>>> `SplitEnumeratorContext.registeredReaders()`. This also avoids the >>>>>>>>>> Enumerator implementation to remember the initially assigned splits, >>>>>>>>>> if it >>>>>>>>>> wants to wait until all the readers are registered. This also allow >>>>>>>>>> future >>>>>>>>>> addition of reader information. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> >>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, Aug 4, 2025 at 8:39 PM Hongshun Wang < >>>>>>>>>> loserwang1...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Anyone familiar with kafka connector can help review this FLIP? >>>>>>>>>>> I am looking forward for your reply. >>>>>>>>>>> >>>>>>>>>>> Best >>>>>>>>>>> Hongshun >>>>>>>>>>> >>>>>>>>>>> On Thu, Jul 24, 2025 at 8:13 PM Leonard Xu <xbjt...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Thanks Hongshun for driving this work. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> We also suffering the issue in production Kafka restoration >>>>>>>>>>>> usage, current design is a nice tradeoff and has considered the >>>>>>>>>>>> new Source >>>>>>>>>>>> implementation details, +1 from my side. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> Leonard >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> > 2025 7月 19 18:59,Hongshun Wang <loserwang1...@gmail.com> 写道: >>>>>>>>>>>> > >>>>>>>>>>>> > Hi devs, >>>>>>>>>>>> > >>>>>>>>>>>> > I'd like to initiate a discussion about [FLIP-537: Enumerator >>>>>>>>>>>> with Global >>>>>>>>>>>> > Split Assignment Distribution for Balanced Split Assignment] >>>>>>>>>>>> [1], which >>>>>>>>>>>> > addresses critical limitations in our current Kafka connector >>>>>>>>>>>> split >>>>>>>>>>>> > distribution mechanism. >>>>>>>>>>>> > >>>>>>>>>>>> > As documented in [FLINK-31762] [2], several scenarios >>>>>>>>>>>> currently lead to >>>>>>>>>>>> > uneven Kafka split distribution, causing reader delays and >>>>>>>>>>>> performance >>>>>>>>>>>> > bottlenecks. The core issue stems from the enumerator's lack >>>>>>>>>>>> of visibility >>>>>>>>>>>> > into post-assignment split distribution. >>>>>>>>>>>> > >>>>>>>>>>>> > This flip does two things: >>>>>>>>>>>> > 1. ReaderRegistrationEvent Enhancement: SourceOperator should >>>>>>>>>>>> send >>>>>>>>>>>> > ReaderRegistrationEvent with assigned splits metadata after >>>>>>>>>>>> startup to >>>>>>>>>>>> > ensure state consistency. >>>>>>>>>>>> > 2. Implementation in the Kafka connector to resolve >>>>>>>>>>>> imbalanced splits and >>>>>>>>>>>> > state awareness during recovery (the enumerator will always >>>>>>>>>>>> choose the >>>>>>>>>>>> > least assigned subtask,and reason aslo as follows) >>>>>>>>>>>> > >>>>>>>>>>>> > Any additional questions regarding this FLIP? Looking forward >>>>>>>>>>>> to hearing >>>>>>>>>>>> > from you. >>>>>>>>>>>> > >>>>>>>>>>>> > Best >>>>>>>>>>>> > Hongshun >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > [1] >>>>>>>>>>>> > >>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment >>>>>>>>>>>> > [2] https://issues.apache.org/jira/browse/FLINK-31762 >>>>>>>>>>>> >>>>>>>>>>>> >>> >