Thanks for Max's update.

Let me share our use cases. We have quite a few gigantic Flink pipelines with a 
parallelism of 3000+. The overhead of REBALANCE is not negligible in this 
scale. When you try to double a vertex's parallelism but with the edge changing 
from FORWARD to REBALANCE, you may simply get the performance degraded and the 
pipeline may run out of network memory much easily. Having autoscaling to 
support changing to rescale edge with a properly tuned algorithm to avoid data 
skew is crucial to large scale jobs.

As for the proposal discussed now. I also prefer dropping the extra option for 
now for simplicity. We can open up a new discussion on RESCALE / REBALANCE as 
option when the upstream and downstream parallelisms differ. The option is 
valuable for both the adaptive scheduling case and the normal case where the 
user just want to manually reconfigure its job.

Best,
Zhanghao Chen
________________________________
From: Maximilian Michels <[email protected]>
Sent: Tuesday, January 20, 2026 19:19
To: [email protected] <[email protected]>
Subject: Re: [DISCUSS] A design proposal to fix the wrong dynamic replacement 
of partitioner from FORWARD to REBLANCE for AutoScaler and AdaptiveScheduler

Thank you, Yuepeng Pan, for looking into this long-standing issue! The
original proposal looks good to me. I agree with Zhanghao Chen to keep
things simple and avoid complexity.

I would not add a configuration option, but if we must add one, then
let's only have RESCALE / REBALANCE as option, and default to
REBALANCE. Let's not add a "mixed" strategy, it should be clear which
strategy gets chosen. In practice, I don't believe this configuration
option will ever be set, so I would prefer not to add it at all, but I
leave this up to the community.

Cheers,
Max

On Tue, Jan 20, 2026 at 11:59 AM Yuepeng Pan <[email protected]> wrote:
>
> Bumping this thread. Thanks!
>
> Best regards,
> Yuepeng Pan
>
> Yuepeng Pan <[email protected]> 于2026年1月17日周六 11:06写道:
>
> > Thanks to Zhanghao Chen for the feedback.
> >
> > Please let me sort out the candidate solutions from the discussion history
> > to facilitate gathering clearer preferences or feedback:
> >
> > For JobVertices with Forward edges in streaming jobs with the
> > AdaptiveScheduler enabled:
> >
> > Design 1:
> > - When upstream and downstream parallelism are the same, restore the
> > partitioning strategy to ForwardPartitioner.
> > - When upstream and downstream parallelism differ but have a multiple
> > relationship, replace the partitioner with RescalePartitioner.
> > - When upstream and downstream parallelism differ and do not have a
> > multiple relationship, replace the partitioner with RebalancePartitioner.
> >
> > Design 2:
> > Introduce a new parameter:
> >
> > - name:
> > jobmanager.adaptive-scheduler.jobgraph.mutated-forward-edge.replacement-policy
> > - type: enum
> > - value options:
> >   - MIXED: Use the strategy from Design 1
> >   - RESCALE: Replace the partitioner with RescalePartitioner when upstream
> > and downstream JobVertices have different parallelism
> >   - REBALANCE: Replace the partitioner with RebalancePartitioner when
> > upstream and downstream JobVertices have different parallelism
> > - default value: MIXED
> >
> > Looking forward to feedback about it!
> >
> > Best regards,
> > Yuepeng Pan
> >
> >
> >
> > Zhanghao Chen <[email protected]> 于2026年1月15日周四 23:35写道:
> >
> >> Thanks Yuepeng for the detailed elaboration. The idea makes sense, but
> >> I'd prefer adding an explicit option to control the behavior for two
> >> reasons:
> >>
> >>   1.
> >> A complex strategy in black box may be confusing for others.
> >>   2.
> >> The real-world cases can be much more complex, e.g. the source
> >> parallelism can be limited by MQ partitions, and maintaining a
> >> multiplicative relationship between the parallelism of upstream and
> >> downstream vertices can be really costly in some cases, but even under a
> >> non-multiplicative relationship, rescale can still easily outperform
> >> rebalance in some cases (21-to-25 for example). If we can't make it right
> >> under all cases, maybe just keep it simple.
> >>
> >> Best,
> >> Zhanghao Chen
> >> ________________________________
> >> From: Yuepeng Pan <[email protected]>
> >> Sent: Thursday, January 15, 2026 23:03
> >> To: [email protected] <[email protected]>
> >> Subject: Re: [DISCUSS] A design proposal to fix the wrong dynamic
> >> replacement of partitioner from FORWARD to REBLANCE for AutoScaler and
> >> AdaptiveScheduler
> >>
> >> Thanks Zhanghao Chen for the comments.
> >>
> >> As mentioned in the previous emails, we have to take one thing into
> >> consideration:
> >> the final parallelism configuration depends not only on external
> >> adjustments, but also on the actual amount of resources that become
> >> available.
> >>
> >> - In an ideal situation with sufficient resources, the external adjustment
> >> strategy determines the final parallelism and partitioning.
> >> - When resources are insufficient, the actually available resources may
> >> also affect the final parallelism and partitioning.
> >>
> >> Therefore, based on your proposal, we do not introduce any new parameters.
> >> Instead, we only apply the following adjustments to pairs of vertices
> >> whose
> >> initial partitioning type is ForwardPartitioner:
> >>
> >> - When the upstream and downstream vertex parallelisms have a multiple
> >> relationship (and are not equal), we change the partitioning type to
> >> RescalePartitioner.
> >> - When the upstream and downstream vertex parallelisms do not have a
> >> multiple relationship (and are not equal), we change the partitioning type
> >> to RebalancePartitioner.
> >> - When the upstream and downstream vertex parallelisms are equal, we
> >> change
> >> the partitioning type back to ForwardPartitioner.
> >>
> >> In this way, we can also achieve a decoupling from concrete model-specific
> >> strategies.
> >>
> >> WDYTA ?
> >>
> >> Best regards,
> >> Yuepeng Pan
> >>
> >> Zhanghao Chen <[email protected]> 于2026年1月15日周四 22:44写道:
> >>
> >> > I think it should definitely be controlled in the model rather than in
> >> the
> >> > engine. Maybe we can add an option to control its behavior?
> >> >
> >> > Best,
> >> > Zhanghao Chen
> >> > ________________________________
> >> > From: Yuepeng Pan <[email protected]>
> >> > Sent: Thursday, January 15, 2026 21:39
> >> > To: [email protected] <[email protected]>
> >> > Subject: Re: [DISCUSS] A design proposal to fix the wrong dynamic
> >> > replacement of partitioner from FORWARD to REBLANCE for AutoScaler and
> >> > AdaptiveScheduler
> >> >
> >> > Thanks Zhanghao Chen for the response.
> >> >
> >> > Please let me add some historical context[1].
> >> >
> >> > In the previous discussions, there were two alternative replacement
> >> > strategies, with the following main characteristics:
> >> > - RescalePartitioner: Compared to RebalancePartitioner, it introduces
> >> fewer
> >> > network connections and less shuffle overhead.
> >> > However, it is more prone to load skew and therefore lacks generality.
> >> >
> >> > - RebalancePartitioner: In theory, it can evenly distribute the load
> >> across
> >> > downstream tasks and is more general,
> >> > but at the cost of increased network connections and shuffle overhead.
> >> >
> >> > To balance generality and correctness, the community eventually chose
> >> the
> >> > latter.
> >> >
> >> > I'd like to apologize for not providing a detailed response earlier to
> >> this
> >> > suggestion[2](switching to RescalePartitioner and enforcing a
> >> > multiplicative relationship between upstream and downstream
> >> parallelism).
> >> >
> >> > If this strategy is implemented on the AutoScaler side, we may consider
> >> > whether it can be migrated into the engine.
> >> > The reason is that inconsistent parallelism between upstream and
> >> downstream
> >> > vertices connected by a forward edge is not only caused by AutoScaler
> >> > requests,
> >> > but can also result from rescaling triggered via the REST API or
> >> internal
> >> > events such as failover.
> >> > Therefore, placing the implementation on the engine side would help
> >> ensure
> >> > the safety and consistency of this strategy.
> >> >
> >> > If the cost of moving this strategy into the engine is too high, we
> >> could
> >> > alternatively propose
> >> > a new FLIP to discuss and advance it as a new feature on the AutoScaler
> >> > side.
> >> >
> >> > If the strategy you mentioned is indeed intended to be implemented in
> >> the
> >> > engine,
> >> > I have one question. Consider a job consisting of two JobVertices, A
> >> and B:
> >> >
> >> > A (p = 100) --forward--> B (p = 100)
> >> >
> >> > After one AutoScaler adjustment, the resulting parallelism proposal is:
> >> >
> >> > A (p = 60) --rescale--> B (p = 100)
> >> >
> >> > I assume that, in order to maintain a multiplicative relationship
> >> between
> >> > the parallelism
> >> > of upstream and downstream vertices, there are roughly two possible
> >> > directions:
> >> >
> >> > a) Adjust A from p = 60 to p = 50. In this case, some tasks of vertex A
> >> may
> >> > become bottlenecks.
> >> > b) Adjust B from p = 100 to p = 120. In this case, we may end up
> >> reserving
> >> > some idle resources,
> >> > and the scale-down effect may be less significant.
> >> >
> >> > Any input is appreciated!
> >> >
> >> >
> >> > [1]https://github.com/apache/flink/pull/21443#discussion_r1042919428
> >> > [2]
> >> >
> >> >
> >> https://issues.apache.org/jira/browse/FLINK-33123?focusedCommentId=17767397&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17767397
> >> >
> >> > Best regards,
> >> > Yuepeng Pan
> >> >
> >> >
> >> >
> >> > Zhanghao Chen <[email protected]> 于2026年1月15日周四 19:37写道:
> >> >
> >> > > Thanks Yuepeng for the proposal. Overall LGTM. However, I'm a bit
> >> > > concerned about the potential performance impact of changing a forward
> >> > edge
> >> > > to rebalance. The autoscaler currently assumes a linear performance
> >> model
> >> > > between the throughput and the parallelism. The edge change can easily
> >> > > break this assumption as Rebalance introduces more shuffle and
> >> results in
> >> > > higher CPU usage and network memory consumption. I suggest
> >> considering it
> >> > > on the algorithm side as well.
> >> > >
> >> > > Best,
> >> > > Zhanghao Chen
> >> > > ________________________________
> >> > > From: Yuepeng Pan <[email protected]>
> >> > > Sent: Tuesday, January 13, 2026 23:46
> >> > > To: [email protected] <[email protected]>
> >> > > Subject: [DISCUSS] A design proposal to fix the wrong dynamic
> >> replacement
> >> > > of partitioner from FORWARD to REBLANCE for AutoScaler and
> >> > AdaptiveScheduler
> >> > >
> >> > > Hi community,
> >> > >
> >> > > I would like to start a discussion around the issue described in
> >> > > **FLINK-33123[1]**.
> >> > >
> >> > > This issue can mainly be broken down into two parts:
> >> > > a).
> >> > > Assuming that initially two upstream and downstream JobVertices
> >> connected
> >> > > by a FORWARD edge have the same parallelism,
> >> > > due to a rescale operation their parallelism becomes different.
> >> > > In this case, the current strategy may produce incorrect results when
> >> > > rebuilding the upstream–downstream network partition connections.
> >> > > b).
> >> > > Assuming that the parallelism of two upstream and downstream
> >> JobVertices
> >> > is
> >> > >  different,
> >> > > but due to a rescale operation their parallelism needs to be adjusted
> >> to
> >> > be
> >> > > the same.
> >> > > In this scenario, it is not possible to determine the partition type
> >> > after
> >> > > the rescale.
> >> > >
> >> > > So, I'd like to share a design proposal[2] that attempts to address
> >> the
> >> > > problem described in the ticket[1].
> >> > >
> >> > > Thanks in advance for your time and feedback.
> >> > > Looking forward to the discussion!
> >> > >
> >> > >
> >> > > [1]https://issues.apache.org/jira/browse/FLINK-33123
> >> > > [2]
> >> > >
> >> > >
> >> >
> >> https://docs.google.com/document/d/1e_6o4bdXcKtFL3xYxKeyKnRjR8ffsw6Z8frp3tp7u-M/edit?usp=sharing
> >> > >
> >> > > Best regards,
> >> > > Yuepeng Pan
> >> > >
> >> >
> >>
> >

Reply via email to