Thanks to Zhanghao Chen for the feedback.

Please let me sort out the candidate solutions from the discussion history
to facilitate gathering clearer preferences or feedback:

For JobVertices with Forward edges in streaming jobs with the
AdaptiveScheduler enabled:

Design 1:
- When upstream and downstream parallelism are the same, restore the
partitioning strategy to ForwardPartitioner.
- When upstream and downstream parallelism differ but have a multiple
relationship, replace the partitioner with RescalePartitioner.
- When upstream and downstream parallelism differ and do not have a
multiple relationship, replace the partitioner with RebalancePartitioner.

Design 2:
Introduce a new parameter:

- name:
jobmanager.adaptive-scheduler.jobgraph.mutated-forward-edge.replacement-policy
- type: enum
- value options:
  - MIXED: Use the strategy from Design 1
  - RESCALE: Replace the partitioner with RescalePartitioner when upstream
and downstream JobVertices have different parallelism
  - REBALANCE: Replace the partitioner with RebalancePartitioner when
upstream and downstream JobVertices have different parallelism
- default value: MIXED

Looking forward to feedback about it!

Best regards,
Yuepeng Pan



Zhanghao Chen <[email protected]> 于2026年1月15日周四 23:35写道:

> Thanks Yuepeng for the detailed elaboration. The idea makes sense, but I'd
> prefer adding an explicit option to control the behavior for two reasons:
>
>   1.
> A complex strategy in black box may be confusing for others.
>   2.
> The real-world cases can be much more complex, e.g. the source parallelism
> can be limited by MQ partitions, and maintaining a multiplicative
> relationship between the parallelism of upstream and downstream vertices
> can be really costly in some cases, but even under a non-multiplicative
> relationship, rescale can still easily outperform rebalance in some cases
> (21-to-25 for example). If we can't make it right under all cases, maybe
> just keep it simple.
>
> Best,
> Zhanghao Chen
> ________________________________
> From: Yuepeng Pan <[email protected]>
> Sent: Thursday, January 15, 2026 23:03
> To: [email protected] <[email protected]>
> Subject: Re: [DISCUSS] A design proposal to fix the wrong dynamic
> replacement of partitioner from FORWARD to REBLANCE for AutoScaler and
> AdaptiveScheduler
>
> Thanks Zhanghao Chen for the comments.
>
> As mentioned in the previous emails, we have to take one thing into
> consideration:
> the final parallelism configuration depends not only on external
> adjustments, but also on the actual amount of resources that become
> available.
>
> - In an ideal situation with sufficient resources, the external adjustment
> strategy determines the final parallelism and partitioning.
> - When resources are insufficient, the actually available resources may
> also affect the final parallelism and partitioning.
>
> Therefore, based on your proposal, we do not introduce any new parameters.
> Instead, we only apply the following adjustments to pairs of vertices whose
> initial partitioning type is ForwardPartitioner:
>
> - When the upstream and downstream vertex parallelisms have a multiple
> relationship (and are not equal), we change the partitioning type to
> RescalePartitioner.
> - When the upstream and downstream vertex parallelisms do not have a
> multiple relationship (and are not equal), we change the partitioning type
> to RebalancePartitioner.
> - When the upstream and downstream vertex parallelisms are equal, we change
> the partitioning type back to ForwardPartitioner.
>
> In this way, we can also achieve a decoupling from concrete model-specific
> strategies.
>
> WDYTA ?
>
> Best regards,
> Yuepeng Pan
>
> Zhanghao Chen <[email protected]> 于2026年1月15日周四 22:44写道:
>
> > I think it should definitely be controlled in the model rather than in
> the
> > engine. Maybe we can add an option to control its behavior?
> >
> > Best,
> > Zhanghao Chen
> > ________________________________
> > From: Yuepeng Pan <[email protected]>
> > Sent: Thursday, January 15, 2026 21:39
> > To: [email protected] <[email protected]>
> > Subject: Re: [DISCUSS] A design proposal to fix the wrong dynamic
> > replacement of partitioner from FORWARD to REBLANCE for AutoScaler and
> > AdaptiveScheduler
> >
> > Thanks Zhanghao Chen for the response.
> >
> > Please let me add some historical context[1].
> >
> > In the previous discussions, there were two alternative replacement
> > strategies, with the following main characteristics:
> > - RescalePartitioner: Compared to RebalancePartitioner, it introduces
> fewer
> > network connections and less shuffle overhead.
> > However, it is more prone to load skew and therefore lacks generality.
> >
> > - RebalancePartitioner: In theory, it can evenly distribute the load
> across
> > downstream tasks and is more general,
> > but at the cost of increased network connections and shuffle overhead.
> >
> > To balance generality and correctness, the community eventually chose the
> > latter.
> >
> > I'd like to apologize for not providing a detailed response earlier to
> this
> > suggestion[2](switching to RescalePartitioner and enforcing a
> > multiplicative relationship between upstream and downstream parallelism).
> >
> > If this strategy is implemented on the AutoScaler side, we may consider
> > whether it can be migrated into the engine.
> > The reason is that inconsistent parallelism between upstream and
> downstream
> > vertices connected by a forward edge is not only caused by AutoScaler
> > requests,
> > but can also result from rescaling triggered via the REST API or internal
> > events such as failover.
> > Therefore, placing the implementation on the engine side would help
> ensure
> > the safety and consistency of this strategy.
> >
> > If the cost of moving this strategy into the engine is too high, we could
> > alternatively propose
> > a new FLIP to discuss and advance it as a new feature on the AutoScaler
> > side.
> >
> > If the strategy you mentioned is indeed intended to be implemented in the
> > engine,
> > I have one question. Consider a job consisting of two JobVertices, A and
> B:
> >
> > A (p = 100) --forward--> B (p = 100)
> >
> > After one AutoScaler adjustment, the resulting parallelism proposal is:
> >
> > A (p = 60) --rescale--> B (p = 100)
> >
> > I assume that, in order to maintain a multiplicative relationship between
> > the parallelism
> > of upstream and downstream vertices, there are roughly two possible
> > directions:
> >
> > a) Adjust A from p = 60 to p = 50. In this case, some tasks of vertex A
> may
> > become bottlenecks.
> > b) Adjust B from p = 100 to p = 120. In this case, we may end up
> reserving
> > some idle resources,
> > and the scale-down effect may be less significant.
> >
> > Any input is appreciated!
> >
> >
> > [1]https://github.com/apache/flink/pull/21443#discussion_r1042919428
> > [2]
> >
> >
> https://issues.apache.org/jira/browse/FLINK-33123?focusedCommentId=17767397&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17767397
> >
> > Best regards,
> > Yuepeng Pan
> >
> >
> >
> > Zhanghao Chen <[email protected]> 于2026年1月15日周四 19:37写道:
> >
> > > Thanks Yuepeng for the proposal. Overall LGTM. However, I'm a bit
> > > concerned about the potential performance impact of changing a forward
> > edge
> > > to rebalance. The autoscaler currently assumes a linear performance
> model
> > > between the throughput and the parallelism. The edge change can easily
> > > break this assumption as Rebalance introduces more shuffle and results
> in
> > > higher CPU usage and network memory consumption. I suggest considering
> it
> > > on the algorithm side as well.
> > >
> > > Best,
> > > Zhanghao Chen
> > > ________________________________
> > > From: Yuepeng Pan <[email protected]>
> > > Sent: Tuesday, January 13, 2026 23:46
> > > To: [email protected] <[email protected]>
> > > Subject: [DISCUSS] A design proposal to fix the wrong dynamic
> replacement
> > > of partitioner from FORWARD to REBLANCE for AutoScaler and
> > AdaptiveScheduler
> > >
> > > Hi community,
> > >
> > > I would like to start a discussion around the issue described in
> > > **FLINK-33123[1]**.
> > >
> > > This issue can mainly be broken down into two parts:
> > > a).
> > > Assuming that initially two upstream and downstream JobVertices
> connected
> > > by a FORWARD edge have the same parallelism,
> > > due to a rescale operation their parallelism becomes different.
> > > In this case, the current strategy may produce incorrect results when
> > > rebuilding the upstream–downstream network partition connections.
> > > b).
> > > Assuming that the parallelism of two upstream and downstream
> JobVertices
> > is
> > >  different,
> > > but due to a rescale operation their parallelism needs to be adjusted
> to
> > be
> > > the same.
> > > In this scenario, it is not possible to determine the partition type
> > after
> > > the rescale.
> > >
> > > So, I'd like to share a design proposal[2] that attempts to address the
> > > problem described in the ticket[1].
> > >
> > > Thanks in advance for your time and feedback.
> > > Looking forward to the discussion!
> > >
> > >
> > > [1]https://issues.apache.org/jira/browse/FLINK-33123
> > > [2]
> > >
> > >
> >
> https://docs.google.com/document/d/1e_6o4bdXcKtFL3xYxKeyKnRjR8ffsw6Z8frp3tp7u-M/edit?usp=sharing
> > >
> > > Best regards,
> > > Yuepeng Pan
> > >
> >
>

Reply via email to