Thanks to Zhanghao Chen for the feedback. Please let me sort out the candidate solutions from the discussion history to facilitate gathering clearer preferences or feedback:
For JobVertices with Forward edges in streaming jobs with the AdaptiveScheduler enabled: Design 1: - When upstream and downstream parallelism are the same, restore the partitioning strategy to ForwardPartitioner. - When upstream and downstream parallelism differ but have a multiple relationship, replace the partitioner with RescalePartitioner. - When upstream and downstream parallelism differ and do not have a multiple relationship, replace the partitioner with RebalancePartitioner. Design 2: Introduce a new parameter: - name: jobmanager.adaptive-scheduler.jobgraph.mutated-forward-edge.replacement-policy - type: enum - value options: - MIXED: Use the strategy from Design 1 - RESCALE: Replace the partitioner with RescalePartitioner when upstream and downstream JobVertices have different parallelism - REBALANCE: Replace the partitioner with RebalancePartitioner when upstream and downstream JobVertices have different parallelism - default value: MIXED Looking forward to feedback about it! Best regards, Yuepeng Pan Zhanghao Chen <[email protected]> 于2026年1月15日周四 23:35写道: > Thanks Yuepeng for the detailed elaboration. The idea makes sense, but I'd > prefer adding an explicit option to control the behavior for two reasons: > > 1. > A complex strategy in black box may be confusing for others. > 2. > The real-world cases can be much more complex, e.g. the source parallelism > can be limited by MQ partitions, and maintaining a multiplicative > relationship between the parallelism of upstream and downstream vertices > can be really costly in some cases, but even under a non-multiplicative > relationship, rescale can still easily outperform rebalance in some cases > (21-to-25 for example). If we can't make it right under all cases, maybe > just keep it simple. > > Best, > Zhanghao Chen > ________________________________ > From: Yuepeng Pan <[email protected]> > Sent: Thursday, January 15, 2026 23:03 > To: [email protected] <[email protected]> > Subject: Re: [DISCUSS] A design proposal to fix the wrong dynamic > replacement of partitioner from FORWARD to REBLANCE for AutoScaler and > AdaptiveScheduler > > Thanks Zhanghao Chen for the comments. > > As mentioned in the previous emails, we have to take one thing into > consideration: > the final parallelism configuration depends not only on external > adjustments, but also on the actual amount of resources that become > available. > > - In an ideal situation with sufficient resources, the external adjustment > strategy determines the final parallelism and partitioning. > - When resources are insufficient, the actually available resources may > also affect the final parallelism and partitioning. > > Therefore, based on your proposal, we do not introduce any new parameters. > Instead, we only apply the following adjustments to pairs of vertices whose > initial partitioning type is ForwardPartitioner: > > - When the upstream and downstream vertex parallelisms have a multiple > relationship (and are not equal), we change the partitioning type to > RescalePartitioner. > - When the upstream and downstream vertex parallelisms do not have a > multiple relationship (and are not equal), we change the partitioning type > to RebalancePartitioner. > - When the upstream and downstream vertex parallelisms are equal, we change > the partitioning type back to ForwardPartitioner. > > In this way, we can also achieve a decoupling from concrete model-specific > strategies. > > WDYTA ? > > Best regards, > Yuepeng Pan > > Zhanghao Chen <[email protected]> 于2026年1月15日周四 22:44写道: > > > I think it should definitely be controlled in the model rather than in > the > > engine. Maybe we can add an option to control its behavior? > > > > Best, > > Zhanghao Chen > > ________________________________ > > From: Yuepeng Pan <[email protected]> > > Sent: Thursday, January 15, 2026 21:39 > > To: [email protected] <[email protected]> > > Subject: Re: [DISCUSS] A design proposal to fix the wrong dynamic > > replacement of partitioner from FORWARD to REBLANCE for AutoScaler and > > AdaptiveScheduler > > > > Thanks Zhanghao Chen for the response. > > > > Please let me add some historical context[1]. > > > > In the previous discussions, there were two alternative replacement > > strategies, with the following main characteristics: > > - RescalePartitioner: Compared to RebalancePartitioner, it introduces > fewer > > network connections and less shuffle overhead. > > However, it is more prone to load skew and therefore lacks generality. > > > > - RebalancePartitioner: In theory, it can evenly distribute the load > across > > downstream tasks and is more general, > > but at the cost of increased network connections and shuffle overhead. > > > > To balance generality and correctness, the community eventually chose the > > latter. > > > > I'd like to apologize for not providing a detailed response earlier to > this > > suggestion[2](switching to RescalePartitioner and enforcing a > > multiplicative relationship between upstream and downstream parallelism). > > > > If this strategy is implemented on the AutoScaler side, we may consider > > whether it can be migrated into the engine. > > The reason is that inconsistent parallelism between upstream and > downstream > > vertices connected by a forward edge is not only caused by AutoScaler > > requests, > > but can also result from rescaling triggered via the REST API or internal > > events such as failover. > > Therefore, placing the implementation on the engine side would help > ensure > > the safety and consistency of this strategy. > > > > If the cost of moving this strategy into the engine is too high, we could > > alternatively propose > > a new FLIP to discuss and advance it as a new feature on the AutoScaler > > side. > > > > If the strategy you mentioned is indeed intended to be implemented in the > > engine, > > I have one question. Consider a job consisting of two JobVertices, A and > B: > > > > A (p = 100) --forward--> B (p = 100) > > > > After one AutoScaler adjustment, the resulting parallelism proposal is: > > > > A (p = 60) --rescale--> B (p = 100) > > > > I assume that, in order to maintain a multiplicative relationship between > > the parallelism > > of upstream and downstream vertices, there are roughly two possible > > directions: > > > > a) Adjust A from p = 60 to p = 50. In this case, some tasks of vertex A > may > > become bottlenecks. > > b) Adjust B from p = 100 to p = 120. In this case, we may end up > reserving > > some idle resources, > > and the scale-down effect may be less significant. > > > > Any input is appreciated! > > > > > > [1]https://github.com/apache/flink/pull/21443#discussion_r1042919428 > > [2] > > > > > https://issues.apache.org/jira/browse/FLINK-33123?focusedCommentId=17767397&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17767397 > > > > Best regards, > > Yuepeng Pan > > > > > > > > Zhanghao Chen <[email protected]> 于2026年1月15日周四 19:37写道: > > > > > Thanks Yuepeng for the proposal. Overall LGTM. However, I'm a bit > > > concerned about the potential performance impact of changing a forward > > edge > > > to rebalance. The autoscaler currently assumes a linear performance > model > > > between the throughput and the parallelism. The edge change can easily > > > break this assumption as Rebalance introduces more shuffle and results > in > > > higher CPU usage and network memory consumption. I suggest considering > it > > > on the algorithm side as well. > > > > > > Best, > > > Zhanghao Chen > > > ________________________________ > > > From: Yuepeng Pan <[email protected]> > > > Sent: Tuesday, January 13, 2026 23:46 > > > To: [email protected] <[email protected]> > > > Subject: [DISCUSS] A design proposal to fix the wrong dynamic > replacement > > > of partitioner from FORWARD to REBLANCE for AutoScaler and > > AdaptiveScheduler > > > > > > Hi community, > > > > > > I would like to start a discussion around the issue described in > > > **FLINK-33123[1]**. > > > > > > This issue can mainly be broken down into two parts: > > > a). > > > Assuming that initially two upstream and downstream JobVertices > connected > > > by a FORWARD edge have the same parallelism, > > > due to a rescale operation their parallelism becomes different. > > > In this case, the current strategy may produce incorrect results when > > > rebuilding the upstream–downstream network partition connections. > > > b). > > > Assuming that the parallelism of two upstream and downstream > JobVertices > > is > > > different, > > > but due to a rescale operation their parallelism needs to be adjusted > to > > be > > > the same. > > > In this scenario, it is not possible to determine the partition type > > after > > > the rescale. > > > > > > So, I'd like to share a design proposal[2] that attempts to address the > > > problem described in the ticket[1]. > > > > > > Thanks in advance for your time and feedback. > > > Looking forward to the discussion! > > > > > > > > > [1]https://issues.apache.org/jira/browse/FLINK-33123 > > > [2] > > > > > > > > > https://docs.google.com/document/d/1e_6o4bdXcKtFL3xYxKeyKnRjR8ffsw6Z8frp3tp7u-M/edit?usp=sharing > > > > > > Best regards, > > > Yuepeng Pan > > > > > >
