Bumping this thread. Thanks!

Best regards,
Yuepeng Pan

Yuepeng Pan <[email protected]> 于2026年1月17日周六 11:06写道:

> Thanks to Zhanghao Chen for the feedback.
>
> Please let me sort out the candidate solutions from the discussion history
> to facilitate gathering clearer preferences or feedback:
>
> For JobVertices with Forward edges in streaming jobs with the
> AdaptiveScheduler enabled:
>
> Design 1:
> - When upstream and downstream parallelism are the same, restore the
> partitioning strategy to ForwardPartitioner.
> - When upstream and downstream parallelism differ but have a multiple
> relationship, replace the partitioner with RescalePartitioner.
> - When upstream and downstream parallelism differ and do not have a
> multiple relationship, replace the partitioner with RebalancePartitioner.
>
> Design 2:
> Introduce a new parameter:
>
> - name:
> jobmanager.adaptive-scheduler.jobgraph.mutated-forward-edge.replacement-policy
> - type: enum
> - value options:
>   - MIXED: Use the strategy from Design 1
>   - RESCALE: Replace the partitioner with RescalePartitioner when upstream
> and downstream JobVertices have different parallelism
>   - REBALANCE: Replace the partitioner with RebalancePartitioner when
> upstream and downstream JobVertices have different parallelism
> - default value: MIXED
>
> Looking forward to feedback about it!
>
> Best regards,
> Yuepeng Pan
>
>
>
> Zhanghao Chen <[email protected]> 于2026年1月15日周四 23:35写道:
>
>> Thanks Yuepeng for the detailed elaboration. The idea makes sense, but
>> I'd prefer adding an explicit option to control the behavior for two
>> reasons:
>>
>>   1.
>> A complex strategy in black box may be confusing for others.
>>   2.
>> The real-world cases can be much more complex, e.g. the source
>> parallelism can be limited by MQ partitions, and maintaining a
>> multiplicative relationship between the parallelism of upstream and
>> downstream vertices can be really costly in some cases, but even under a
>> non-multiplicative relationship, rescale can still easily outperform
>> rebalance in some cases (21-to-25 for example). If we can't make it right
>> under all cases, maybe just keep it simple.
>>
>> Best,
>> Zhanghao Chen
>> ________________________________
>> From: Yuepeng Pan <[email protected]>
>> Sent: Thursday, January 15, 2026 23:03
>> To: [email protected] <[email protected]>
>> Subject: Re: [DISCUSS] A design proposal to fix the wrong dynamic
>> replacement of partitioner from FORWARD to REBLANCE for AutoScaler and
>> AdaptiveScheduler
>>
>> Thanks Zhanghao Chen for the comments.
>>
>> As mentioned in the previous emails, we have to take one thing into
>> consideration:
>> the final parallelism configuration depends not only on external
>> adjustments, but also on the actual amount of resources that become
>> available.
>>
>> - In an ideal situation with sufficient resources, the external adjustment
>> strategy determines the final parallelism and partitioning.
>> - When resources are insufficient, the actually available resources may
>> also affect the final parallelism and partitioning.
>>
>> Therefore, based on your proposal, we do not introduce any new parameters.
>> Instead, we only apply the following adjustments to pairs of vertices
>> whose
>> initial partitioning type is ForwardPartitioner:
>>
>> - When the upstream and downstream vertex parallelisms have a multiple
>> relationship (and are not equal), we change the partitioning type to
>> RescalePartitioner.
>> - When the upstream and downstream vertex parallelisms do not have a
>> multiple relationship (and are not equal), we change the partitioning type
>> to RebalancePartitioner.
>> - When the upstream and downstream vertex parallelisms are equal, we
>> change
>> the partitioning type back to ForwardPartitioner.
>>
>> In this way, we can also achieve a decoupling from concrete model-specific
>> strategies.
>>
>> WDYTA ?
>>
>> Best regards,
>> Yuepeng Pan
>>
>> Zhanghao Chen <[email protected]> 于2026年1月15日周四 22:44写道:
>>
>> > I think it should definitely be controlled in the model rather than in
>> the
>> > engine. Maybe we can add an option to control its behavior?
>> >
>> > Best,
>> > Zhanghao Chen
>> > ________________________________
>> > From: Yuepeng Pan <[email protected]>
>> > Sent: Thursday, January 15, 2026 21:39
>> > To: [email protected] <[email protected]>
>> > Subject: Re: [DISCUSS] A design proposal to fix the wrong dynamic
>> > replacement of partitioner from FORWARD to REBLANCE for AutoScaler and
>> > AdaptiveScheduler
>> >
>> > Thanks Zhanghao Chen for the response.
>> >
>> > Please let me add some historical context[1].
>> >
>> > In the previous discussions, there were two alternative replacement
>> > strategies, with the following main characteristics:
>> > - RescalePartitioner: Compared to RebalancePartitioner, it introduces
>> fewer
>> > network connections and less shuffle overhead.
>> > However, it is more prone to load skew and therefore lacks generality.
>> >
>> > - RebalancePartitioner: In theory, it can evenly distribute the load
>> across
>> > downstream tasks and is more general,
>> > but at the cost of increased network connections and shuffle overhead.
>> >
>> > To balance generality and correctness, the community eventually chose
>> the
>> > latter.
>> >
>> > I'd like to apologize for not providing a detailed response earlier to
>> this
>> > suggestion[2](switching to RescalePartitioner and enforcing a
>> > multiplicative relationship between upstream and downstream
>> parallelism).
>> >
>> > If this strategy is implemented on the AutoScaler side, we may consider
>> > whether it can be migrated into the engine.
>> > The reason is that inconsistent parallelism between upstream and
>> downstream
>> > vertices connected by a forward edge is not only caused by AutoScaler
>> > requests,
>> > but can also result from rescaling triggered via the REST API or
>> internal
>> > events such as failover.
>> > Therefore, placing the implementation on the engine side would help
>> ensure
>> > the safety and consistency of this strategy.
>> >
>> > If the cost of moving this strategy into the engine is too high, we
>> could
>> > alternatively propose
>> > a new FLIP to discuss and advance it as a new feature on the AutoScaler
>> > side.
>> >
>> > If the strategy you mentioned is indeed intended to be implemented in
>> the
>> > engine,
>> > I have one question. Consider a job consisting of two JobVertices, A
>> and B:
>> >
>> > A (p = 100) --forward--> B (p = 100)
>> >
>> > After one AutoScaler adjustment, the resulting parallelism proposal is:
>> >
>> > A (p = 60) --rescale--> B (p = 100)
>> >
>> > I assume that, in order to maintain a multiplicative relationship
>> between
>> > the parallelism
>> > of upstream and downstream vertices, there are roughly two possible
>> > directions:
>> >
>> > a) Adjust A from p = 60 to p = 50. In this case, some tasks of vertex A
>> may
>> > become bottlenecks.
>> > b) Adjust B from p = 100 to p = 120. In this case, we may end up
>> reserving
>> > some idle resources,
>> > and the scale-down effect may be less significant.
>> >
>> > Any input is appreciated!
>> >
>> >
>> > [1]https://github.com/apache/flink/pull/21443#discussion_r1042919428
>> > [2]
>> >
>> >
>> https://issues.apache.org/jira/browse/FLINK-33123?focusedCommentId=17767397&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17767397
>> >
>> > Best regards,
>> > Yuepeng Pan
>> >
>> >
>> >
>> > Zhanghao Chen <[email protected]> 于2026年1月15日周四 19:37写道:
>> >
>> > > Thanks Yuepeng for the proposal. Overall LGTM. However, I'm a bit
>> > > concerned about the potential performance impact of changing a forward
>> > edge
>> > > to rebalance. The autoscaler currently assumes a linear performance
>> model
>> > > between the throughput and the parallelism. The edge change can easily
>> > > break this assumption as Rebalance introduces more shuffle and
>> results in
>> > > higher CPU usage and network memory consumption. I suggest
>> considering it
>> > > on the algorithm side as well.
>> > >
>> > > Best,
>> > > Zhanghao Chen
>> > > ________________________________
>> > > From: Yuepeng Pan <[email protected]>
>> > > Sent: Tuesday, January 13, 2026 23:46
>> > > To: [email protected] <[email protected]>
>> > > Subject: [DISCUSS] A design proposal to fix the wrong dynamic
>> replacement
>> > > of partitioner from FORWARD to REBLANCE for AutoScaler and
>> > AdaptiveScheduler
>> > >
>> > > Hi community,
>> > >
>> > > I would like to start a discussion around the issue described in
>> > > **FLINK-33123[1]**.
>> > >
>> > > This issue can mainly be broken down into two parts:
>> > > a).
>> > > Assuming that initially two upstream and downstream JobVertices
>> connected
>> > > by a FORWARD edge have the same parallelism,
>> > > due to a rescale operation their parallelism becomes different.
>> > > In this case, the current strategy may produce incorrect results when
>> > > rebuilding the upstream–downstream network partition connections.
>> > > b).
>> > > Assuming that the parallelism of two upstream and downstream
>> JobVertices
>> > is
>> > >  different,
>> > > but due to a rescale operation their parallelism needs to be adjusted
>> to
>> > be
>> > > the same.
>> > > In this scenario, it is not possible to determine the partition type
>> > after
>> > > the rescale.
>> > >
>> > > So, I'd like to share a design proposal[2] that attempts to address
>> the
>> > > problem described in the ticket[1].
>> > >
>> > > Thanks in advance for your time and feedback.
>> > > Looking forward to the discussion!
>> > >
>> > >
>> > > [1]https://issues.apache.org/jira/browse/FLINK-33123
>> > > [2]
>> > >
>> > >
>> >
>> https://docs.google.com/document/d/1e_6o4bdXcKtFL3xYxKeyKnRjR8ffsw6Z8frp3tp7u-M/edit?usp=sharing
>> > >
>> > > Best regards,
>> > > Yuepeng Pan
>> > >
>> >
>>
>

Reply via email to