Hi, devs. > As for the proposal discussed now. > I also prefer dropping the extra option for now for simplicity.
+1 on this proposal. > We can open up a new discussion on RESCALE / REBALANCE > as option when the upstream and downstream parallelisms differ. Looking forward to it! Thanks Maximilian, Zhanghao, and everyone involved in the discussion. Best regards, Yuepeng Pan Zhanghao Chen <[email protected]> 于2026年1月21日周三 10:48写道: > Thanks for Max's update. > > Let me share our use cases. We have quite a few gigantic Flink pipelines > with a parallelism of 3000+. The overhead of REBALANCE is not negligible in > this scale. When you try to double a vertex's parallelism but with the edge > changing from FORWARD to REBALANCE, you may simply get the performance > degraded and the pipeline may run out of network memory much easily. Having > autoscaling to support changing to rescale edge with a properly tuned > algorithm to avoid data skew is crucial to large scale jobs. > > As for the proposal discussed now. I also prefer dropping the extra option > for now for simplicity. We can open up a new discussion on RESCALE / > REBALANCE as option when the upstream and downstream parallelisms differ. > The option is valuable for both the adaptive scheduling case and the normal > case where the user just want to manually reconfigure its job. > > Best, > Zhanghao Chen > ________________________________ > From: Maximilian Michels <[email protected]> > Sent: Tuesday, January 20, 2026 19:19 > To: [email protected] <[email protected]> > Subject: Re: [DISCUSS] A design proposal to fix the wrong dynamic > replacement of partitioner from FORWARD to REBLANCE for AutoScaler and > AdaptiveScheduler > > Thank you, Yuepeng Pan, for looking into this long-standing issue! The > original proposal looks good to me. I agree with Zhanghao Chen to keep > things simple and avoid complexity. > > I would not add a configuration option, but if we must add one, then > let's only have RESCALE / REBALANCE as option, and default to > REBALANCE. Let's not add a "mixed" strategy, it should be clear which > strategy gets chosen. In practice, I don't believe this configuration > option will ever be set, so I would prefer not to add it at all, but I > leave this up to the community. > > Cheers, > Max > > On Tue, Jan 20, 2026 at 11:59 AM Yuepeng Pan <[email protected]> > wrote: > > > > Bumping this thread. Thanks! > > > > Best regards, > > Yuepeng Pan > > > > Yuepeng Pan <[email protected]> 于2026年1月17日周六 11:06写道: > > > > > Thanks to Zhanghao Chen for the feedback. > > > > > > Please let me sort out the candidate solutions from the discussion > history > > > to facilitate gathering clearer preferences or feedback: > > > > > > For JobVertices with Forward edges in streaming jobs with the > > > AdaptiveScheduler enabled: > > > > > > Design 1: > > > - When upstream and downstream parallelism are the same, restore the > > > partitioning strategy to ForwardPartitioner. > > > - When upstream and downstream parallelism differ but have a multiple > > > relationship, replace the partitioner with RescalePartitioner. > > > - When upstream and downstream parallelism differ and do not have a > > > multiple relationship, replace the partitioner with > RebalancePartitioner. > > > > > > Design 2: > > > Introduce a new parameter: > > > > > > - name: > > > > jobmanager.adaptive-scheduler.jobgraph.mutated-forward-edge.replacement-policy > > > - type: enum > > > - value options: > > > - MIXED: Use the strategy from Design 1 > > > - RESCALE: Replace the partitioner with RescalePartitioner when > upstream > > > and downstream JobVertices have different parallelism > > > - REBALANCE: Replace the partitioner with RebalancePartitioner when > > > upstream and downstream JobVertices have different parallelism > > > - default value: MIXED > > > > > > Looking forward to feedback about it! > > > > > > Best regards, > > > Yuepeng Pan > > > > > > > > > > > > Zhanghao Chen <[email protected]> 于2026年1月15日周四 23:35写道: > > > > > >> Thanks Yuepeng for the detailed elaboration. The idea makes sense, but > > >> I'd prefer adding an explicit option to control the behavior for two > > >> reasons: > > >> > > >> 1. > > >> A complex strategy in black box may be confusing for others. > > >> 2. > > >> The real-world cases can be much more complex, e.g. the source > > >> parallelism can be limited by MQ partitions, and maintaining a > > >> multiplicative relationship between the parallelism of upstream and > > >> downstream vertices can be really costly in some cases, but even > under a > > >> non-multiplicative relationship, rescale can still easily outperform > > >> rebalance in some cases (21-to-25 for example). If we can't make it > right > > >> under all cases, maybe just keep it simple. > > >> > > >> Best, > > >> Zhanghao Chen > > >> ________________________________ > > >> From: Yuepeng Pan <[email protected]> > > >> Sent: Thursday, January 15, 2026 23:03 > > >> To: [email protected] <[email protected]> > > >> Subject: Re: [DISCUSS] A design proposal to fix the wrong dynamic > > >> replacement of partitioner from FORWARD to REBLANCE for AutoScaler and > > >> AdaptiveScheduler > > >> > > >> Thanks Zhanghao Chen for the comments. > > >> > > >> As mentioned in the previous emails, we have to take one thing into > > >> consideration: > > >> the final parallelism configuration depends not only on external > > >> adjustments, but also on the actual amount of resources that become > > >> available. > > >> > > >> - In an ideal situation with sufficient resources, the external > adjustment > > >> strategy determines the final parallelism and partitioning. > > >> - When resources are insufficient, the actually available resources > may > > >> also affect the final parallelism and partitioning. > > >> > > >> Therefore, based on your proposal, we do not introduce any new > parameters. > > >> Instead, we only apply the following adjustments to pairs of vertices > > >> whose > > >> initial partitioning type is ForwardPartitioner: > > >> > > >> - When the upstream and downstream vertex parallelisms have a multiple > > >> relationship (and are not equal), we change the partitioning type to > > >> RescalePartitioner. > > >> - When the upstream and downstream vertex parallelisms do not have a > > >> multiple relationship (and are not equal), we change the partitioning > type > > >> to RebalancePartitioner. > > >> - When the upstream and downstream vertex parallelisms are equal, we > > >> change > > >> the partitioning type back to ForwardPartitioner. > > >> > > >> In this way, we can also achieve a decoupling from concrete > model-specific > > >> strategies. > > >> > > >> WDYTA ? > > >> > > >> Best regards, > > >> Yuepeng Pan > > >> > > >> Zhanghao Chen <[email protected]> 于2026年1月15日周四 22:44写道: > > >> > > >> > I think it should definitely be controlled in the model rather than > in > > >> the > > >> > engine. Maybe we can add an option to control its behavior? > > >> > > > >> > Best, > > >> > Zhanghao Chen > > >> > ________________________________ > > >> > From: Yuepeng Pan <[email protected]> > > >> > Sent: Thursday, January 15, 2026 21:39 > > >> > To: [email protected] <[email protected]> > > >> > Subject: Re: [DISCUSS] A design proposal to fix the wrong dynamic > > >> > replacement of partitioner from FORWARD to REBLANCE for AutoScaler > and > > >> > AdaptiveScheduler > > >> > > > >> > Thanks Zhanghao Chen for the response. > > >> > > > >> > Please let me add some historical context[1]. > > >> > > > >> > In the previous discussions, there were two alternative replacement > > >> > strategies, with the following main characteristics: > > >> > - RescalePartitioner: Compared to RebalancePartitioner, it > introduces > > >> fewer > > >> > network connections and less shuffle overhead. > > >> > However, it is more prone to load skew and therefore lacks > generality. > > >> > > > >> > - RebalancePartitioner: In theory, it can evenly distribute the load > > >> across > > >> > downstream tasks and is more general, > > >> > but at the cost of increased network connections and shuffle > overhead. > > >> > > > >> > To balance generality and correctness, the community eventually > chose > > >> the > > >> > latter. > > >> > > > >> > I'd like to apologize for not providing a detailed response earlier > to > > >> this > > >> > suggestion[2](switching to RescalePartitioner and enforcing a > > >> > multiplicative relationship between upstream and downstream > > >> parallelism). > > >> > > > >> > If this strategy is implemented on the AutoScaler side, we may > consider > > >> > whether it can be migrated into the engine. > > >> > The reason is that inconsistent parallelism between upstream and > > >> downstream > > >> > vertices connected by a forward edge is not only caused by > AutoScaler > > >> > requests, > > >> > but can also result from rescaling triggered via the REST API or > > >> internal > > >> > events such as failover. > > >> > Therefore, placing the implementation on the engine side would help > > >> ensure > > >> > the safety and consistency of this strategy. > > >> > > > >> > If the cost of moving this strategy into the engine is too high, we > > >> could > > >> > alternatively propose > > >> > a new FLIP to discuss and advance it as a new feature on the > AutoScaler > > >> > side. > > >> > > > >> > If the strategy you mentioned is indeed intended to be implemented > in > > >> the > > >> > engine, > > >> > I have one question. Consider a job consisting of two JobVertices, A > > >> and B: > > >> > > > >> > A (p = 100) --forward--> B (p = 100) > > >> > > > >> > After one AutoScaler adjustment, the resulting parallelism proposal > is: > > >> > > > >> > A (p = 60) --rescale--> B (p = 100) > > >> > > > >> > I assume that, in order to maintain a multiplicative relationship > > >> between > > >> > the parallelism > > >> > of upstream and downstream vertices, there are roughly two possible > > >> > directions: > > >> > > > >> > a) Adjust A from p = 60 to p = 50. In this case, some tasks of > vertex A > > >> may > > >> > become bottlenecks. > > >> > b) Adjust B from p = 100 to p = 120. In this case, we may end up > > >> reserving > > >> > some idle resources, > > >> > and the scale-down effect may be less significant. > > >> > > > >> > Any input is appreciated! > > >> > > > >> > > > >> > [1] > https://github.com/apache/flink/pull/21443#discussion_r1042919428 > > >> > [2] > > >> > > > >> > > > >> > https://issues.apache.org/jira/browse/FLINK-33123?focusedCommentId=17767397&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17767397 > > >> > > > >> > Best regards, > > >> > Yuepeng Pan > > >> > > > >> > > > >> > > > >> > Zhanghao Chen <[email protected]> 于2026年1月15日周四 19:37写道: > > >> > > > >> > > Thanks Yuepeng for the proposal. Overall LGTM. However, I'm a bit > > >> > > concerned about the potential performance impact of changing a > forward > > >> > edge > > >> > > to rebalance. The autoscaler currently assumes a linear > performance > > >> model > > >> > > between the throughput and the parallelism. The edge change can > easily > > >> > > break this assumption as Rebalance introduces more shuffle and > > >> results in > > >> > > higher CPU usage and network memory consumption. I suggest > > >> considering it > > >> > > on the algorithm side as well. > > >> > > > > >> > > Best, > > >> > > Zhanghao Chen > > >> > > ________________________________ > > >> > > From: Yuepeng Pan <[email protected]> > > >> > > Sent: Tuesday, January 13, 2026 23:46 > > >> > > To: [email protected] <[email protected]> > > >> > > Subject: [DISCUSS] A design proposal to fix the wrong dynamic > > >> replacement > > >> > > of partitioner from FORWARD to REBLANCE for AutoScaler and > > >> > AdaptiveScheduler > > >> > > > > >> > > Hi community, > > >> > > > > >> > > I would like to start a discussion around the issue described in > > >> > > **FLINK-33123[1]**. > > >> > > > > >> > > This issue can mainly be broken down into two parts: > > >> > > a). > > >> > > Assuming that initially two upstream and downstream JobVertices > > >> connected > > >> > > by a FORWARD edge have the same parallelism, > > >> > > due to a rescale operation their parallelism becomes different. > > >> > > In this case, the current strategy may produce incorrect results > when > > >> > > rebuilding the upstream–downstream network partition connections. > > >> > > b). > > >> > > Assuming that the parallelism of two upstream and downstream > > >> JobVertices > > >> > is > > >> > > different, > > >> > > but due to a rescale operation their parallelism needs to be > adjusted > > >> to > > >> > be > > >> > > the same. > > >> > > In this scenario, it is not possible to determine the partition > type > > >> > after > > >> > > the rescale. > > >> > > > > >> > > So, I'd like to share a design proposal[2] that attempts to > address > > >> the > > >> > > problem described in the ticket[1]. > > >> > > > > >> > > Thanks in advance for your time and feedback. > > >> > > Looking forward to the discussion! > > >> > > > > >> > > > > >> > > [1]https://issues.apache.org/jira/browse/FLINK-33123 > > >> > > [2] > > >> > > > > >> > > > > >> > > > >> > https://docs.google.com/document/d/1e_6o4bdXcKtFL3xYxKeyKnRjR8ffsw6Z8frp3tp7u-M/edit?usp=sharing > > >> > > > > >> > > Best regards, > > >> > > Yuepeng Pan > > >> > > > > >> > > > >> > > > >
