[ 
https://issues.apache.org/jira/browse/FLINK-40003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Johnson updated FLINK-40003:
----------------------------------
    Description: 
{{ExecutionStateUpdateListeners}} registered on the ExecutionGraph are notified 
inline during {{{}Execution.transitionState(){}}}. When an execution reaches a 
terminal state, a  listener may want to read the final IOMetrics via 
{{Execution.getIOMetrics()}} – for example, to emit observability events that 
include bytes/records in/out.

In {{markFinished()}} and {{{}processFail(){}}}, 
{{updateAccumulatorsAndMetrics()}} is called *after* {{{}transitionState(){}}}, 
so listeners always see null from {{{}getIOMetrics(){}}}during FINISHED and 
FAILED notifications.

{{completeCancelling()}} already has the correct ordering – it calls 
{{updateAccumulatorsAndMetrics()}} *before* {{{}transitionState(){}}}.

The call chain:
{{  Execution.transitionState()}}
{{    -> ExecutionVertex.notifyStateTransition()}}
{{      -> DefaultExecutionGraph.notifyExecutionChange()}}
{{        -> ExecutionStateUpdateListener.onStateUpdate()}}
{{          -> execution.getIOMetrics()}}  // null – not yet stored

The fix is to move {{updateAccumulatorsAndMetrics()}} before 
{{transitionState()}} in {{markFinished()}} and {{{}processFail(){}}}, matching 
the ordering already used in {{{}completeCancelling(){}}}.

  was:
{{ExecutionStateUpdateListeners}} registered on the ExecutionGraph are notified 
inline during {{{}Execution.transitionState(){}}}. When an execution reaches a 
terminal state, a  listener may want to read the final IOMetrics via 
{{Execution.getIOMetrics()}} -- for example, to emit observability events that 
include bytes/records in/out.

In {{markFinished()}} and {{{}processFail(){}}}, 
{{updateAccumulatorsAndMetrics()}} is called *after* {{{}transitionState(){}}}, 
so listeners always see null from {{getIOMetrics()}}
during FINISHED and FAILED notifications.

{{completeCancelling()}} already has the correct ordering -- it calls
{{updateAccumulatorsAndMetrics()}} *before* {{{}transitionState(){}}}.

The call chain:
{{  Execution.transitionState()}}
{{    -> ExecutionVertex.notifyStateTransition()}}
{{      -> DefaultExecutionGraph.notifyExecutionChange()}}
{{        -> ExecutionStateUpdateListener.onStateUpdate()}}
{{          -> execution.getIOMetrics()}}  // null -- not yet stored

The fix is to move {{updateAccumulatorsAndMetrics()}} before 
{{transitionState()}} in {{markFinished()}} and {{{}processFail(){}}}, matching 
the ordering already used in
{{{}completeCancelling(){}}}.


> IOMetrics not visible to ExecutionStateUpdateListeners during FINISHED/FAILED 
> transitions
> -----------------------------------------------------------------------------------------
>
>                 Key: FLINK-40003
>                 URL: https://issues.apache.org/jira/browse/FLINK-40003
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>            Reporter: Chris Johnson
>            Priority: Major
>
> {{ExecutionStateUpdateListeners}} registered on the ExecutionGraph are 
> notified inline during {{{}Execution.transitionState(){}}}. When an execution 
> reaches a terminal state, a  listener may want to read the final IOMetrics 
> via {{Execution.getIOMetrics()}} – for example, to emit observability events 
> that include bytes/records in/out.
> In {{markFinished()}} and {{{}processFail(){}}}, 
> {{updateAccumulatorsAndMetrics()}} is called *after* 
> {{{}transitionState(){}}}, so listeners always see null from 
> {{{}getIOMetrics(){}}}during FINISHED and FAILED notifications.
> {{completeCancelling()}} already has the correct ordering – it calls 
> {{updateAccumulatorsAndMetrics()}} *before* {{{}transitionState(){}}}.
> The call chain:
> {{  Execution.transitionState()}}
> {{    -> ExecutionVertex.notifyStateTransition()}}
> {{      -> DefaultExecutionGraph.notifyExecutionChange()}}
> {{        -> ExecutionStateUpdateListener.onStateUpdate()}}
> {{          -> execution.getIOMetrics()}}  // null – not yet stored
> The fix is to move {{updateAccumulatorsAndMetrics()}} before 
> {{transitionState()}} in {{markFinished()}} and {{{}processFail(){}}}, 
> matching the ordering already used in {{{}completeCancelling(){}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to