Bumping this thread. Thanks! Best regards, Yuepeng Pan
Yuepeng Pan <[email protected]> 于2026年1月17日周六 11:06写道: > Thanks to Zhanghao Chen for the feedback. > > Please let me sort out the candidate solutions from the discussion history > to facilitate gathering clearer preferences or feedback: > > For JobVertices with Forward edges in streaming jobs with the > AdaptiveScheduler enabled: > > Design 1: > - When upstream and downstream parallelism are the same, restore the > partitioning strategy to ForwardPartitioner. > - When upstream and downstream parallelism differ but have a multiple > relationship, replace the partitioner with RescalePartitioner. > - When upstream and downstream parallelism differ and do not have a > multiple relationship, replace the partitioner with RebalancePartitioner. > > Design 2: > Introduce a new parameter: > > - name: > jobmanager.adaptive-scheduler.jobgraph.mutated-forward-edge.replacement-policy > - type: enum > - value options: > - MIXED: Use the strategy from Design 1 > - RESCALE: Replace the partitioner with RescalePartitioner when upstream > and downstream JobVertices have different parallelism > - REBALANCE: Replace the partitioner with RebalancePartitioner when > upstream and downstream JobVertices have different parallelism > - default value: MIXED > > Looking forward to feedback about it! > > Best regards, > Yuepeng Pan > > > > Zhanghao Chen <[email protected]> 于2026年1月15日周四 23:35写道: > >> Thanks Yuepeng for the detailed elaboration. The idea makes sense, but >> I'd prefer adding an explicit option to control the behavior for two >> reasons: >> >> 1. >> A complex strategy in black box may be confusing for others. >> 2. >> The real-world cases can be much more complex, e.g. the source >> parallelism can be limited by MQ partitions, and maintaining a >> multiplicative relationship between the parallelism of upstream and >> downstream vertices can be really costly in some cases, but even under a >> non-multiplicative relationship, rescale can still easily outperform >> rebalance in some cases (21-to-25 for example). If we can't make it right >> under all cases, maybe just keep it simple. >> >> Best, >> Zhanghao Chen >> ________________________________ >> From: Yuepeng Pan <[email protected]> >> Sent: Thursday, January 15, 2026 23:03 >> To: [email protected] <[email protected]> >> Subject: Re: [DISCUSS] A design proposal to fix the wrong dynamic >> replacement of partitioner from FORWARD to REBLANCE for AutoScaler and >> AdaptiveScheduler >> >> Thanks Zhanghao Chen for the comments. >> >> As mentioned in the previous emails, we have to take one thing into >> consideration: >> the final parallelism configuration depends not only on external >> adjustments, but also on the actual amount of resources that become >> available. >> >> - In an ideal situation with sufficient resources, the external adjustment >> strategy determines the final parallelism and partitioning. >> - When resources are insufficient, the actually available resources may >> also affect the final parallelism and partitioning. >> >> Therefore, based on your proposal, we do not introduce any new parameters. >> Instead, we only apply the following adjustments to pairs of vertices >> whose >> initial partitioning type is ForwardPartitioner: >> >> - When the upstream and downstream vertex parallelisms have a multiple >> relationship (and are not equal), we change the partitioning type to >> RescalePartitioner. >> - When the upstream and downstream vertex parallelisms do not have a >> multiple relationship (and are not equal), we change the partitioning type >> to RebalancePartitioner. >> - When the upstream and downstream vertex parallelisms are equal, we >> change >> the partitioning type back to ForwardPartitioner. >> >> In this way, we can also achieve a decoupling from concrete model-specific >> strategies. >> >> WDYTA ? >> >> Best regards, >> Yuepeng Pan >> >> Zhanghao Chen <[email protected]> 于2026年1月15日周四 22:44写道: >> >> > I think it should definitely be controlled in the model rather than in >> the >> > engine. Maybe we can add an option to control its behavior? >> > >> > Best, >> > Zhanghao Chen >> > ________________________________ >> > From: Yuepeng Pan <[email protected]> >> > Sent: Thursday, January 15, 2026 21:39 >> > To: [email protected] <[email protected]> >> > Subject: Re: [DISCUSS] A design proposal to fix the wrong dynamic >> > replacement of partitioner from FORWARD to REBLANCE for AutoScaler and >> > AdaptiveScheduler >> > >> > Thanks Zhanghao Chen for the response. >> > >> > Please let me add some historical context[1]. >> > >> > In the previous discussions, there were two alternative replacement >> > strategies, with the following main characteristics: >> > - RescalePartitioner: Compared to RebalancePartitioner, it introduces >> fewer >> > network connections and less shuffle overhead. >> > However, it is more prone to load skew and therefore lacks generality. >> > >> > - RebalancePartitioner: In theory, it can evenly distribute the load >> across >> > downstream tasks and is more general, >> > but at the cost of increased network connections and shuffle overhead. >> > >> > To balance generality and correctness, the community eventually chose >> the >> > latter. >> > >> > I'd like to apologize for not providing a detailed response earlier to >> this >> > suggestion[2](switching to RescalePartitioner and enforcing a >> > multiplicative relationship between upstream and downstream >> parallelism). >> > >> > If this strategy is implemented on the AutoScaler side, we may consider >> > whether it can be migrated into the engine. >> > The reason is that inconsistent parallelism between upstream and >> downstream >> > vertices connected by a forward edge is not only caused by AutoScaler >> > requests, >> > but can also result from rescaling triggered via the REST API or >> internal >> > events such as failover. >> > Therefore, placing the implementation on the engine side would help >> ensure >> > the safety and consistency of this strategy. >> > >> > If the cost of moving this strategy into the engine is too high, we >> could >> > alternatively propose >> > a new FLIP to discuss and advance it as a new feature on the AutoScaler >> > side. >> > >> > If the strategy you mentioned is indeed intended to be implemented in >> the >> > engine, >> > I have one question. Consider a job consisting of two JobVertices, A >> and B: >> > >> > A (p = 100) --forward--> B (p = 100) >> > >> > After one AutoScaler adjustment, the resulting parallelism proposal is: >> > >> > A (p = 60) --rescale--> B (p = 100) >> > >> > I assume that, in order to maintain a multiplicative relationship >> between >> > the parallelism >> > of upstream and downstream vertices, there are roughly two possible >> > directions: >> > >> > a) Adjust A from p = 60 to p = 50. In this case, some tasks of vertex A >> may >> > become bottlenecks. >> > b) Adjust B from p = 100 to p = 120. In this case, we may end up >> reserving >> > some idle resources, >> > and the scale-down effect may be less significant. >> > >> > Any input is appreciated! >> > >> > >> > [1]https://github.com/apache/flink/pull/21443#discussion_r1042919428 >> > [2] >> > >> > >> https://issues.apache.org/jira/browse/FLINK-33123?focusedCommentId=17767397&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17767397 >> > >> > Best regards, >> > Yuepeng Pan >> > >> > >> > >> > Zhanghao Chen <[email protected]> 于2026年1月15日周四 19:37写道: >> > >> > > Thanks Yuepeng for the proposal. Overall LGTM. However, I'm a bit >> > > concerned about the potential performance impact of changing a forward >> > edge >> > > to rebalance. The autoscaler currently assumes a linear performance >> model >> > > between the throughput and the parallelism. The edge change can easily >> > > break this assumption as Rebalance introduces more shuffle and >> results in >> > > higher CPU usage and network memory consumption. I suggest >> considering it >> > > on the algorithm side as well. >> > > >> > > Best, >> > > Zhanghao Chen >> > > ________________________________ >> > > From: Yuepeng Pan <[email protected]> >> > > Sent: Tuesday, January 13, 2026 23:46 >> > > To: [email protected] <[email protected]> >> > > Subject: [DISCUSS] A design proposal to fix the wrong dynamic >> replacement >> > > of partitioner from FORWARD to REBLANCE for AutoScaler and >> > AdaptiveScheduler >> > > >> > > Hi community, >> > > >> > > I would like to start a discussion around the issue described in >> > > **FLINK-33123[1]**. >> > > >> > > This issue can mainly be broken down into two parts: >> > > a). >> > > Assuming that initially two upstream and downstream JobVertices >> connected >> > > by a FORWARD edge have the same parallelism, >> > > due to a rescale operation their parallelism becomes different. >> > > In this case, the current strategy may produce incorrect results when >> > > rebuilding the upstream–downstream network partition connections. >> > > b). >> > > Assuming that the parallelism of two upstream and downstream >> JobVertices >> > is >> > > different, >> > > but due to a rescale operation their parallelism needs to be adjusted >> to >> > be >> > > the same. >> > > In this scenario, it is not possible to determine the partition type >> > after >> > > the rescale. >> > > >> > > So, I'd like to share a design proposal[2] that attempts to address >> the >> > > problem described in the ticket[1]. >> > > >> > > Thanks in advance for your time and feedback. >> > > Looking forward to the discussion! >> > > >> > > >> > > [1]https://issues.apache.org/jira/browse/FLINK-33123 >> > > [2] >> > > >> > > >> > >> https://docs.google.com/document/d/1e_6o4bdXcKtFL3xYxKeyKnRjR8ffsw6Z8frp3tp7u-M/edit?usp=sharing >> > > >> > > Best regards, >> > > Yuepeng Pan >> > > >> > >> >
