[ 
https://issues.apache.org/jira/browse/FLINK-31245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhanghao Chen updated FLINK-31245:
----------------------------------
    Description: 
*Problem*

GlobalAggregateManager is used to share state amongst parallel tasks in a job 
and thus coordinate their execution. It maintains a state (the _accumulators_ 
field in JobMaster) in JM memory. The accumulator state content is defined in 
user code, in my company, a user stores task parallelism in the accumulator, 
assuming task parallelism never changes. However, this assumption is broken 
when using adaptive scheduler.

*Possible Solutions*
 # Mark GlobalAggregateManager as deprecated. It seems that operator 
coordinator can completely replace GlobalAggregateManager and is a more elegent 
solution. Therefore, it is fine to deprecate GlobalAggregateManager and leave 
this issue there. If that's the case, we can open another ticket for doing that.
 # If we decide to continue supporting GlobalAggregateManager, then we need to 
reset the state when rescaling.

  was:
*Problem*

GlobalAggregateManager is used to share state amongst parallel tasks in a job 
and thus coordinate their execution. It maintains a state (the _accumulators_ 
field in JobMaster) in JM memory. The accumulator state content is defined in 
user code, in my company, a user stores task parallelism in the accumulator, 
assuming task parallelism never changes. However, this assumption is broken 
when using adaptive scheduler.

*Possible Solutions*
 # Mark GlobalAggregateManager as deprecated. It seems that operator 
coordinator can completely replace GlobalAggregateManager and is a more elegent 
solution. Therefore, it is fine to deprecate GlobalAggregateManager and leave 
this issue there. It that's the case, we can open another ticket for doing that.
 # If we decide to continue supporting GlobalAggregateManager, then we need to 
reset the state when rescaling.


> Adaptive scheduler does not reset the state of GlobalAggregateManager when 
> rescaling
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-31245
>                 URL: https://issues.apache.org/jira/browse/FLINK-31245
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.16.1
>            Reporter: Zhanghao Chen
>            Priority: Major
>             Fix For: 1.18.0
>
>
> *Problem*
> GlobalAggregateManager is used to share state amongst parallel tasks in a job 
> and thus coordinate their execution. It maintains a state (the _accumulators_ 
> field in JobMaster) in JM memory. The accumulator state content is defined in 
> user code, in my company, a user stores task parallelism in the accumulator, 
> assuming task parallelism never changes. However, this assumption is broken 
> when using adaptive scheduler.
> *Possible Solutions*
>  # Mark GlobalAggregateManager as deprecated. It seems that operator 
> coordinator can completely replace GlobalAggregateManager and is a more 
> elegent solution. Therefore, it is fine to deprecate GlobalAggregateManager 
> and leave this issue there. If that's the case, we can open another ticket 
> for doing that.
>  # If we decide to continue supporting GlobalAggregateManager, then we need 
> to reset the state when rescaling.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to