Thanks Yuepeng for the detailed elaboration. The idea makes sense, but I'd 
prefer adding an explicit option to control the behavior for two reasons:

  1.
A complex strategy in black box may be confusing for others.
  2.
The real-world cases can be much more complex, e.g. the source parallelism can 
be limited by MQ partitions, and maintaining a multiplicative relationship 
between the parallelism of upstream and downstream vertices can be really 
costly in some cases, but even under a non-multiplicative relationship, rescale 
can still easily outperform rebalance in some cases (21-to-25 for example). If 
we can't make it right under all cases, maybe just keep it simple.

Best,
Zhanghao Chen
________________________________
From: Yuepeng Pan <[email protected]>
Sent: Thursday, January 15, 2026 23:03
To: [email protected] <[email protected]>
Subject: Re: [DISCUSS] A design proposal to fix the wrong dynamic replacement 
of partitioner from FORWARD to REBLANCE for AutoScaler and AdaptiveScheduler

Thanks Zhanghao Chen for the comments.

As mentioned in the previous emails, we have to take one thing into
consideration:
the final parallelism configuration depends not only on external
adjustments, but also on the actual amount of resources that become
available.

- In an ideal situation with sufficient resources, the external adjustment
strategy determines the final parallelism and partitioning.
- When resources are insufficient, the actually available resources may
also affect the final parallelism and partitioning.

Therefore, based on your proposal, we do not introduce any new parameters.
Instead, we only apply the following adjustments to pairs of vertices whose
initial partitioning type is ForwardPartitioner:

- When the upstream and downstream vertex parallelisms have a multiple
relationship (and are not equal), we change the partitioning type to
RescalePartitioner.
- When the upstream and downstream vertex parallelisms do not have a
multiple relationship (and are not equal), we change the partitioning type
to RebalancePartitioner.
- When the upstream and downstream vertex parallelisms are equal, we change
the partitioning type back to ForwardPartitioner.

In this way, we can also achieve a decoupling from concrete model-specific
strategies.

WDYTA ?

Best regards,
Yuepeng Pan

Zhanghao Chen <[email protected]> 于2026年1月15日周四 22:44写道:

> I think it should definitely be controlled in the model rather than in the
> engine. Maybe we can add an option to control its behavior?
>
> Best,
> Zhanghao Chen
> ________________________________
> From: Yuepeng Pan <[email protected]>
> Sent: Thursday, January 15, 2026 21:39
> To: [email protected] <[email protected]>
> Subject: Re: [DISCUSS] A design proposal to fix the wrong dynamic
> replacement of partitioner from FORWARD to REBLANCE for AutoScaler and
> AdaptiveScheduler
>
> Thanks Zhanghao Chen for the response.
>
> Please let me add some historical context[1].
>
> In the previous discussions, there were two alternative replacement
> strategies, with the following main characteristics:
> - RescalePartitioner: Compared to RebalancePartitioner, it introduces fewer
> network connections and less shuffle overhead.
> However, it is more prone to load skew and therefore lacks generality.
>
> - RebalancePartitioner: In theory, it can evenly distribute the load across
> downstream tasks and is more general,
> but at the cost of increased network connections and shuffle overhead.
>
> To balance generality and correctness, the community eventually chose the
> latter.
>
> I'd like to apologize for not providing a detailed response earlier to this
> suggestion[2](switching to RescalePartitioner and enforcing a
> multiplicative relationship between upstream and downstream parallelism).
>
> If this strategy is implemented on the AutoScaler side, we may consider
> whether it can be migrated into the engine.
> The reason is that inconsistent parallelism between upstream and downstream
> vertices connected by a forward edge is not only caused by AutoScaler
> requests,
> but can also result from rescaling triggered via the REST API or internal
> events such as failover.
> Therefore, placing the implementation on the engine side would help ensure
> the safety and consistency of this strategy.
>
> If the cost of moving this strategy into the engine is too high, we could
> alternatively propose
> a new FLIP to discuss and advance it as a new feature on the AutoScaler
> side.
>
> If the strategy you mentioned is indeed intended to be implemented in the
> engine,
> I have one question. Consider a job consisting of two JobVertices, A and B:
>
> A (p = 100) --forward--> B (p = 100)
>
> After one AutoScaler adjustment, the resulting parallelism proposal is:
>
> A (p = 60) --rescale--> B (p = 100)
>
> I assume that, in order to maintain a multiplicative relationship between
> the parallelism
> of upstream and downstream vertices, there are roughly two possible
> directions:
>
> a) Adjust A from p = 60 to p = 50. In this case, some tasks of vertex A may
> become bottlenecks.
> b) Adjust B from p = 100 to p = 120. In this case, we may end up reserving
> some idle resources,
> and the scale-down effect may be less significant.
>
> Any input is appreciated!
>
>
> [1]https://github.com/apache/flink/pull/21443#discussion_r1042919428
> [2]
>
> https://issues.apache.org/jira/browse/FLINK-33123?focusedCommentId=17767397&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17767397
>
> Best regards,
> Yuepeng Pan
>
>
>
> Zhanghao Chen <[email protected]> 于2026年1月15日周四 19:37写道:
>
> > Thanks Yuepeng for the proposal. Overall LGTM. However, I'm a bit
> > concerned about the potential performance impact of changing a forward
> edge
> > to rebalance. The autoscaler currently assumes a linear performance model
> > between the throughput and the parallelism. The edge change can easily
> > break this assumption as Rebalance introduces more shuffle and results in
> > higher CPU usage and network memory consumption. I suggest considering it
> > on the algorithm side as well.
> >
> > Best,
> > Zhanghao Chen
> > ________________________________
> > From: Yuepeng Pan <[email protected]>
> > Sent: Tuesday, January 13, 2026 23:46
> > To: [email protected] <[email protected]>
> > Subject: [DISCUSS] A design proposal to fix the wrong dynamic replacement
> > of partitioner from FORWARD to REBLANCE for AutoScaler and
> AdaptiveScheduler
> >
> > Hi community,
> >
> > I would like to start a discussion around the issue described in
> > **FLINK-33123[1]**.
> >
> > This issue can mainly be broken down into two parts:
> > a).
> > Assuming that initially two upstream and downstream JobVertices connected
> > by a FORWARD edge have the same parallelism,
> > due to a rescale operation their parallelism becomes different.
> > In this case, the current strategy may produce incorrect results when
> > rebuilding the upstream�Cdownstream network partition connections.
> > b).
> > Assuming that the parallelism of two upstream and downstream JobVertices
> is
> >  different,
> > but due to a rescale operation their parallelism needs to be adjusted to
> be
> > the same.
> > In this scenario, it is not possible to determine the partition type
> after
> > the rescale.
> >
> > So, I'd like to share a design proposal[2] that attempts to address the
> > problem described in the ticket[1].
> >
> > Thanks in advance for your time and feedback.
> > Looking forward to the discussion!
> >
> >
> > [1]https://issues.apache.org/jira/browse/FLINK-33123
> > [2]
> >
> >
> https://docs.google.com/document/d/1e_6o4bdXcKtFL3xYxKeyKnRjR8ffsw6Z8frp3tp7u-M/edit?usp=sharing
> >
> > Best regards,
> > Yuepeng Pan
> >
>

Reply via email to