Thanks Zhanghao Chen for the comments. As mentioned in the previous emails, we have to take one thing into consideration: the final parallelism configuration depends not only on external adjustments, but also on the actual amount of resources that become available.
- In an ideal situation with sufficient resources, the external adjustment strategy determines the final parallelism and partitioning. - When resources are insufficient, the actually available resources may also affect the final parallelism and partitioning. Therefore, based on your proposal, we do not introduce any new parameters. Instead, we only apply the following adjustments to pairs of vertices whose initial partitioning type is ForwardPartitioner: - When the upstream and downstream vertex parallelisms have a multiple relationship (and are not equal), we change the partitioning type to RescalePartitioner. - When the upstream and downstream vertex parallelisms do not have a multiple relationship (and are not equal), we change the partitioning type to RebalancePartitioner. - When the upstream and downstream vertex parallelisms are equal, we change the partitioning type back to ForwardPartitioner. In this way, we can also achieve a decoupling from concrete model-specific strategies. WDYTA ? Best regards, Yuepeng Pan Zhanghao Chen <[email protected]> 于2026年1月15日周四 22:44写道: > I think it should definitely be controlled in the model rather than in the > engine. Maybe we can add an option to control its behavior? > > Best, > Zhanghao Chen > ________________________________ > From: Yuepeng Pan <[email protected]> > Sent: Thursday, January 15, 2026 21:39 > To: [email protected] <[email protected]> > Subject: Re: [DISCUSS] A design proposal to fix the wrong dynamic > replacement of partitioner from FORWARD to REBLANCE for AutoScaler and > AdaptiveScheduler > > Thanks Zhanghao Chen for the response. > > Please let me add some historical context[1]. > > In the previous discussions, there were two alternative replacement > strategies, with the following main characteristics: > - RescalePartitioner: Compared to RebalancePartitioner, it introduces fewer > network connections and less shuffle overhead. > However, it is more prone to load skew and therefore lacks generality. > > - RebalancePartitioner: In theory, it can evenly distribute the load across > downstream tasks and is more general, > but at the cost of increased network connections and shuffle overhead. > > To balance generality and correctness, the community eventually chose the > latter. > > I'd like to apologize for not providing a detailed response earlier to this > suggestion[2](switching to RescalePartitioner and enforcing a > multiplicative relationship between upstream and downstream parallelism). > > If this strategy is implemented on the AutoScaler side, we may consider > whether it can be migrated into the engine. > The reason is that inconsistent parallelism between upstream and downstream > vertices connected by a forward edge is not only caused by AutoScaler > requests, > but can also result from rescaling triggered via the REST API or internal > events such as failover. > Therefore, placing the implementation on the engine side would help ensure > the safety and consistency of this strategy. > > If the cost of moving this strategy into the engine is too high, we could > alternatively propose > a new FLIP to discuss and advance it as a new feature on the AutoScaler > side. > > If the strategy you mentioned is indeed intended to be implemented in the > engine, > I have one question. Consider a job consisting of two JobVertices, A and B: > > A (p = 100) --forward--> B (p = 100) > > After one AutoScaler adjustment, the resulting parallelism proposal is: > > A (p = 60) --rescale--> B (p = 100) > > I assume that, in order to maintain a multiplicative relationship between > the parallelism > of upstream and downstream vertices, there are roughly two possible > directions: > > a) Adjust A from p = 60 to p = 50. In this case, some tasks of vertex A may > become bottlenecks. > b) Adjust B from p = 100 to p = 120. In this case, we may end up reserving > some idle resources, > and the scale-down effect may be less significant. > > Any input is appreciated! > > > [1]https://github.com/apache/flink/pull/21443#discussion_r1042919428 > [2] > > https://issues.apache.org/jira/browse/FLINK-33123?focusedCommentId=17767397&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17767397 > > Best regards, > Yuepeng Pan > > > > Zhanghao Chen <[email protected]> 于2026年1月15日周四 19:37写道: > > > Thanks Yuepeng for the proposal. Overall LGTM. However, I'm a bit > > concerned about the potential performance impact of changing a forward > edge > > to rebalance. The autoscaler currently assumes a linear performance model > > between the throughput and the parallelism. The edge change can easily > > break this assumption as Rebalance introduces more shuffle and results in > > higher CPU usage and network memory consumption. I suggest considering it > > on the algorithm side as well. > > > > Best, > > Zhanghao Chen > > ________________________________ > > From: Yuepeng Pan <[email protected]> > > Sent: Tuesday, January 13, 2026 23:46 > > To: [email protected] <[email protected]> > > Subject: [DISCUSS] A design proposal to fix the wrong dynamic replacement > > of partitioner from FORWARD to REBLANCE for AutoScaler and > AdaptiveScheduler > > > > Hi community, > > > > I would like to start a discussion around the issue described in > > **FLINK-33123[1]**. > > > > This issue can mainly be broken down into two parts: > > a). > > Assuming that initially two upstream and downstream JobVertices connected > > by a FORWARD edge have the same parallelism, > > due to a rescale operation their parallelism becomes different. > > In this case, the current strategy may produce incorrect results when > > rebuilding the upstream–downstream network partition connections. > > b). > > Assuming that the parallelism of two upstream and downstream JobVertices > is > > different, > > but due to a rescale operation their parallelism needs to be adjusted to > be > > the same. > > In this scenario, it is not possible to determine the partition type > after > > the rescale. > > > > So, I'd like to share a design proposal[2] that attempts to address the > > problem described in the ticket[1]. > > > > Thanks in advance for your time and feedback. > > Looking forward to the discussion! > > > > > > [1]https://issues.apache.org/jira/browse/FLINK-33123 > > [2] > > > > > https://docs.google.com/document/d/1e_6o4bdXcKtFL3xYxKeyKnRjR8ffsw6Z8frp3tp7u-M/edit?usp=sharing > > > > Best regards, > > Yuepeng Pan > > >
