I think it should definitely be controlled in the model rather than in the 
engine. Maybe we can add an option to control its behavior?

Best,
Zhanghao Chen
________________________________
From: Yuepeng Pan <[email protected]>
Sent: Thursday, January 15, 2026 21:39
To: [email protected] <[email protected]>
Subject: Re: [DISCUSS] A design proposal to fix the wrong dynamic replacement 
of partitioner from FORWARD to REBLANCE for AutoScaler and AdaptiveScheduler

Thanks Zhanghao Chen for the response.

Please let me add some historical context[1].

In the previous discussions, there were two alternative replacement
strategies, with the following main characteristics:
- RescalePartitioner: Compared to RebalancePartitioner, it introduces fewer
network connections and less shuffle overhead.
However, it is more prone to load skew and therefore lacks generality.

- RebalancePartitioner: In theory, it can evenly distribute the load across
downstream tasks and is more general,
but at the cost of increased network connections and shuffle overhead.

To balance generality and correctness, the community eventually chose the
latter.

I'd like to apologize for not providing a detailed response earlier to this
suggestion[2](switching to RescalePartitioner and enforcing a
multiplicative relationship between upstream and downstream parallelism).

If this strategy is implemented on the AutoScaler side, we may consider
whether it can be migrated into the engine.
The reason is that inconsistent parallelism between upstream and downstream
vertices connected by a forward edge is not only caused by AutoScaler
requests,
but can also result from rescaling triggered via the REST API or internal
events such as failover.
Therefore, placing the implementation on the engine side would help ensure
the safety and consistency of this strategy.

If the cost of moving this strategy into the engine is too high, we could
alternatively propose
a new FLIP to discuss and advance it as a new feature on the AutoScaler
side.

If the strategy you mentioned is indeed intended to be implemented in the
engine,
I have one question. Consider a job consisting of two JobVertices, A and B:

A (p = 100) --forward--> B (p = 100)

After one AutoScaler adjustment, the resulting parallelism proposal is:

A (p = 60) --rescale--> B (p = 100)

I assume that, in order to maintain a multiplicative relationship between
the parallelism
of upstream and downstream vertices, there are roughly two possible
directions:

a) Adjust A from p = 60 to p = 50. In this case, some tasks of vertex A may
become bottlenecks.
b) Adjust B from p = 100 to p = 120. In this case, we may end up reserving
some idle resources,
and the scale-down effect may be less significant.

Any input is appreciated!


[1]https://github.com/apache/flink/pull/21443#discussion_r1042919428
[2]
https://issues.apache.org/jira/browse/FLINK-33123?focusedCommentId=17767397&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17767397

Best regards,
Yuepeng Pan



Zhanghao Chen <[email protected]> 于2026年1月15日周四 19:37写道:

> Thanks Yuepeng for the proposal. Overall LGTM. However, I'm a bit
> concerned about the potential performance impact of changing a forward edge
> to rebalance. The autoscaler currently assumes a linear performance model
> between the throughput and the parallelism. The edge change can easily
> break this assumption as Rebalance introduces more shuffle and results in
> higher CPU usage and network memory consumption. I suggest considering it
> on the algorithm side as well.
>
> Best,
> Zhanghao Chen
> ________________________________
> From: Yuepeng Pan <[email protected]>
> Sent: Tuesday, January 13, 2026 23:46
> To: [email protected] <[email protected]>
> Subject: [DISCUSS] A design proposal to fix the wrong dynamic replacement
> of partitioner from FORWARD to REBLANCE for AutoScaler and AdaptiveScheduler
>
> Hi community,
>
> I would like to start a discussion around the issue described in
> **FLINK-33123[1]**.
>
> This issue can mainly be broken down into two parts:
> a).
> Assuming that initially two upstream and downstream JobVertices connected
> by a FORWARD edge have the same parallelism,
> due to a rescale operation their parallelism becomes different.
> In this case, the current strategy may produce incorrect results when
> rebuilding the upstream�Cdownstream network partition connections.
> b).
> Assuming that the parallelism of two upstream and downstream JobVertices is
>  different,
> but due to a rescale operation their parallelism needs to be adjusted to be
> the same.
> In this scenario, it is not possible to determine the partition type after
> the rescale.
>
> So, I'd like to share a design proposal[2] that attempts to address the
> problem described in the ticket[1].
>
> Thanks in advance for your time and feedback.
> Looking forward to the discussion!
>
>
> [1]https://issues.apache.org/jira/browse/FLINK-33123
> [2]
>
> https://docs.google.com/document/d/1e_6o4bdXcKtFL3xYxKeyKnRjR8ffsw6Z8frp3tp7u-M/edit?usp=sharing
>
> Best regards,
> Yuepeng Pan
>

Reply via email to