Hi, devs.

> As for the proposal discussed now.
> I also prefer dropping the extra option for now for simplicity.

+1 on this proposal.

> We can open up a new discussion on RESCALE / REBALANCE
> as option when the upstream and downstream parallelisms differ.

Looking forward to it!

Thanks Maximilian, Zhanghao, and everyone involved in the discussion.

Best regards,
Yuepeng Pan


Zhanghao Chen <[email protected]> 于2026年1月21日周三 10:48写道:

> Thanks for Max's update.
>
> Let me share our use cases. We have quite a few gigantic Flink pipelines
> with a parallelism of 3000+. The overhead of REBALANCE is not negligible in
> this scale. When you try to double a vertex's parallelism but with the edge
> changing from FORWARD to REBALANCE, you may simply get the performance
> degraded and the pipeline may run out of network memory much easily. Having
> autoscaling to support changing to rescale edge with a properly tuned
> algorithm to avoid data skew is crucial to large scale jobs.
>
> As for the proposal discussed now. I also prefer dropping the extra option
> for now for simplicity. We can open up a new discussion on RESCALE /
> REBALANCE as option when the upstream and downstream parallelisms differ.
> The option is valuable for both the adaptive scheduling case and the normal
> case where the user just want to manually reconfigure its job.
>
> Best,
> Zhanghao Chen
> ________________________________
> From: Maximilian Michels <[email protected]>
> Sent: Tuesday, January 20, 2026 19:19
> To: [email protected] <[email protected]>
> Subject: Re: [DISCUSS] A design proposal to fix the wrong dynamic
> replacement of partitioner from FORWARD to REBLANCE for AutoScaler and
> AdaptiveScheduler
>
> Thank you, Yuepeng Pan, for looking into this long-standing issue! The
> original proposal looks good to me. I agree with Zhanghao Chen to keep
> things simple and avoid complexity.
>
> I would not add a configuration option, but if we must add one, then
> let's only have RESCALE / REBALANCE as option, and default to
> REBALANCE. Let's not add a "mixed" strategy, it should be clear which
> strategy gets chosen. In practice, I don't believe this configuration
> option will ever be set, so I would prefer not to add it at all, but I
> leave this up to the community.
>
> Cheers,
> Max
>
> On Tue, Jan 20, 2026 at 11:59 AM Yuepeng Pan <[email protected]>
> wrote:
> >
> > Bumping this thread. Thanks!
> >
> > Best regards,
> > Yuepeng Pan
> >
> > Yuepeng Pan <[email protected]> 于2026年1月17日周六 11:06写道:
> >
> > > Thanks to Zhanghao Chen for the feedback.
> > >
> > > Please let me sort out the candidate solutions from the discussion
> history
> > > to facilitate gathering clearer preferences or feedback:
> > >
> > > For JobVertices with Forward edges in streaming jobs with the
> > > AdaptiveScheduler enabled:
> > >
> > > Design 1:
> > > - When upstream and downstream parallelism are the same, restore the
> > > partitioning strategy to ForwardPartitioner.
> > > - When upstream and downstream parallelism differ but have a multiple
> > > relationship, replace the partitioner with RescalePartitioner.
> > > - When upstream and downstream parallelism differ and do not have a
> > > multiple relationship, replace the partitioner with
> RebalancePartitioner.
> > >
> > > Design 2:
> > > Introduce a new parameter:
> > >
> > > - name:
> > >
> jobmanager.adaptive-scheduler.jobgraph.mutated-forward-edge.replacement-policy
> > > - type: enum
> > > - value options:
> > >   - MIXED: Use the strategy from Design 1
> > >   - RESCALE: Replace the partitioner with RescalePartitioner when
> upstream
> > > and downstream JobVertices have different parallelism
> > >   - REBALANCE: Replace the partitioner with RebalancePartitioner when
> > > upstream and downstream JobVertices have different parallelism
> > > - default value: MIXED
> > >
> > > Looking forward to feedback about it!
> > >
> > > Best regards,
> > > Yuepeng Pan
> > >
> > >
> > >
> > > Zhanghao Chen <[email protected]> 于2026年1月15日周四 23:35写道:
> > >
> > >> Thanks Yuepeng for the detailed elaboration. The idea makes sense, but
> > >> I'd prefer adding an explicit option to control the behavior for two
> > >> reasons:
> > >>
> > >>   1.
> > >> A complex strategy in black box may be confusing for others.
> > >>   2.
> > >> The real-world cases can be much more complex, e.g. the source
> > >> parallelism can be limited by MQ partitions, and maintaining a
> > >> multiplicative relationship between the parallelism of upstream and
> > >> downstream vertices can be really costly in some cases, but even
> under a
> > >> non-multiplicative relationship, rescale can still easily outperform
> > >> rebalance in some cases (21-to-25 for example). If we can't make it
> right
> > >> under all cases, maybe just keep it simple.
> > >>
> > >> Best,
> > >> Zhanghao Chen
> > >> ________________________________
> > >> From: Yuepeng Pan <[email protected]>
> > >> Sent: Thursday, January 15, 2026 23:03
> > >> To: [email protected] <[email protected]>
> > >> Subject: Re: [DISCUSS] A design proposal to fix the wrong dynamic
> > >> replacement of partitioner from FORWARD to REBLANCE for AutoScaler and
> > >> AdaptiveScheduler
> > >>
> > >> Thanks Zhanghao Chen for the comments.
> > >>
> > >> As mentioned in the previous emails, we have to take one thing into
> > >> consideration:
> > >> the final parallelism configuration depends not only on external
> > >> adjustments, but also on the actual amount of resources that become
> > >> available.
> > >>
> > >> - In an ideal situation with sufficient resources, the external
> adjustment
> > >> strategy determines the final parallelism and partitioning.
> > >> - When resources are insufficient, the actually available resources
> may
> > >> also affect the final parallelism and partitioning.
> > >>
> > >> Therefore, based on your proposal, we do not introduce any new
> parameters.
> > >> Instead, we only apply the following adjustments to pairs of vertices
> > >> whose
> > >> initial partitioning type is ForwardPartitioner:
> > >>
> > >> - When the upstream and downstream vertex parallelisms have a multiple
> > >> relationship (and are not equal), we change the partitioning type to
> > >> RescalePartitioner.
> > >> - When the upstream and downstream vertex parallelisms do not have a
> > >> multiple relationship (and are not equal), we change the partitioning
> type
> > >> to RebalancePartitioner.
> > >> - When the upstream and downstream vertex parallelisms are equal, we
> > >> change
> > >> the partitioning type back to ForwardPartitioner.
> > >>
> > >> In this way, we can also achieve a decoupling from concrete
> model-specific
> > >> strategies.
> > >>
> > >> WDYTA ?
> > >>
> > >> Best regards,
> > >> Yuepeng Pan
> > >>
> > >> Zhanghao Chen <[email protected]> 于2026年1月15日周四 22:44写道:
> > >>
> > >> > I think it should definitely be controlled in the model rather than
> in
> > >> the
> > >> > engine. Maybe we can add an option to control its behavior?
> > >> >
> > >> > Best,
> > >> > Zhanghao Chen
> > >> > ________________________________
> > >> > From: Yuepeng Pan <[email protected]>
> > >> > Sent: Thursday, January 15, 2026 21:39
> > >> > To: [email protected] <[email protected]>
> > >> > Subject: Re: [DISCUSS] A design proposal to fix the wrong dynamic
> > >> > replacement of partitioner from FORWARD to REBLANCE for AutoScaler
> and
> > >> > AdaptiveScheduler
> > >> >
> > >> > Thanks Zhanghao Chen for the response.
> > >> >
> > >> > Please let me add some historical context[1].
> > >> >
> > >> > In the previous discussions, there were two alternative replacement
> > >> > strategies, with the following main characteristics:
> > >> > - RescalePartitioner: Compared to RebalancePartitioner, it
> introduces
> > >> fewer
> > >> > network connections and less shuffle overhead.
> > >> > However, it is more prone to load skew and therefore lacks
> generality.
> > >> >
> > >> > - RebalancePartitioner: In theory, it can evenly distribute the load
> > >> across
> > >> > downstream tasks and is more general,
> > >> > but at the cost of increased network connections and shuffle
> overhead.
> > >> >
> > >> > To balance generality and correctness, the community eventually
> chose
> > >> the
> > >> > latter.
> > >> >
> > >> > I'd like to apologize for not providing a detailed response earlier
> to
> > >> this
> > >> > suggestion[2](switching to RescalePartitioner and enforcing a
> > >> > multiplicative relationship between upstream and downstream
> > >> parallelism).
> > >> >
> > >> > If this strategy is implemented on the AutoScaler side, we may
> consider
> > >> > whether it can be migrated into the engine.
> > >> > The reason is that inconsistent parallelism between upstream and
> > >> downstream
> > >> > vertices connected by a forward edge is not only caused by
> AutoScaler
> > >> > requests,
> > >> > but can also result from rescaling triggered via the REST API or
> > >> internal
> > >> > events such as failover.
> > >> > Therefore, placing the implementation on the engine side would help
> > >> ensure
> > >> > the safety and consistency of this strategy.
> > >> >
> > >> > If the cost of moving this strategy into the engine is too high, we
> > >> could
> > >> > alternatively propose
> > >> > a new FLIP to discuss and advance it as a new feature on the
> AutoScaler
> > >> > side.
> > >> >
> > >> > If the strategy you mentioned is indeed intended to be implemented
> in
> > >> the
> > >> > engine,
> > >> > I have one question. Consider a job consisting of two JobVertices, A
> > >> and B:
> > >> >
> > >> > A (p = 100) --forward--> B (p = 100)
> > >> >
> > >> > After one AutoScaler adjustment, the resulting parallelism proposal
> is:
> > >> >
> > >> > A (p = 60) --rescale--> B (p = 100)
> > >> >
> > >> > I assume that, in order to maintain a multiplicative relationship
> > >> between
> > >> > the parallelism
> > >> > of upstream and downstream vertices, there are roughly two possible
> > >> > directions:
> > >> >
> > >> > a) Adjust A from p = 60 to p = 50. In this case, some tasks of
> vertex A
> > >> may
> > >> > become bottlenecks.
> > >> > b) Adjust B from p = 100 to p = 120. In this case, we may end up
> > >> reserving
> > >> > some idle resources,
> > >> > and the scale-down effect may be less significant.
> > >> >
> > >> > Any input is appreciated!
> > >> >
> > >> >
> > >> > [1]
> https://github.com/apache/flink/pull/21443#discussion_r1042919428
> > >> > [2]
> > >> >
> > >> >
> > >>
> https://issues.apache.org/jira/browse/FLINK-33123?focusedCommentId=17767397&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17767397
> > >> >
> > >> > Best regards,
> > >> > Yuepeng Pan
> > >> >
> > >> >
> > >> >
> > >> > Zhanghao Chen <[email protected]> 于2026年1月15日周四 19:37写道:
> > >> >
> > >> > > Thanks Yuepeng for the proposal. Overall LGTM. However, I'm a bit
> > >> > > concerned about the potential performance impact of changing a
> forward
> > >> > edge
> > >> > > to rebalance. The autoscaler currently assumes a linear
> performance
> > >> model
> > >> > > between the throughput and the parallelism. The edge change can
> easily
> > >> > > break this assumption as Rebalance introduces more shuffle and
> > >> results in
> > >> > > higher CPU usage and network memory consumption. I suggest
> > >> considering it
> > >> > > on the algorithm side as well.
> > >> > >
> > >> > > Best,
> > >> > > Zhanghao Chen
> > >> > > ________________________________
> > >> > > From: Yuepeng Pan <[email protected]>
> > >> > > Sent: Tuesday, January 13, 2026 23:46
> > >> > > To: [email protected] <[email protected]>
> > >> > > Subject: [DISCUSS] A design proposal to fix the wrong dynamic
> > >> replacement
> > >> > > of partitioner from FORWARD to REBLANCE for AutoScaler and
> > >> > AdaptiveScheduler
> > >> > >
> > >> > > Hi community,
> > >> > >
> > >> > > I would like to start a discussion around the issue described in
> > >> > > **FLINK-33123[1]**.
> > >> > >
> > >> > > This issue can mainly be broken down into two parts:
> > >> > > a).
> > >> > > Assuming that initially two upstream and downstream JobVertices
> > >> connected
> > >> > > by a FORWARD edge have the same parallelism,
> > >> > > due to a rescale operation their parallelism becomes different.
> > >> > > In this case, the current strategy may produce incorrect results
> when
> > >> > > rebuilding the upstream–downstream network partition connections.
> > >> > > b).
> > >> > > Assuming that the parallelism of two upstream and downstream
> > >> JobVertices
> > >> > is
> > >> > >  different,
> > >> > > but due to a rescale operation their parallelism needs to be
> adjusted
> > >> to
> > >> > be
> > >> > > the same.
> > >> > > In this scenario, it is not possible to determine the partition
> type
> > >> > after
> > >> > > the rescale.
> > >> > >
> > >> > > So, I'd like to share a design proposal[2] that attempts to
> address
> > >> the
> > >> > > problem described in the ticket[1].
> > >> > >
> > >> > > Thanks in advance for your time and feedback.
> > >> > > Looking forward to the discussion!
> > >> > >
> > >> > >
> > >> > > [1]https://issues.apache.org/jira/browse/FLINK-33123
> > >> > > [2]
> > >> > >
> > >> > >
> > >> >
> > >>
> https://docs.google.com/document/d/1e_6o4bdXcKtFL3xYxKeyKnRjR8ffsw6Z8frp3tp7u-M/edit?usp=sharing
> > >> > >
> > >> > > Best regards,
> > >> > > Yuepeng Pan
> > >> > >
> > >> >
> > >>
> > >
>

Reply via email to