[
https://issues.apache.org/jira/browse/FLINK-40003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chris Johnson updated FLINK-40003:
----------------------------------
Description:
{{ExecutionStateUpdateListeners}} registered on the ExecutionGraph are notified
inline during {{{}Execution.transitionState(){}}}. When an execution reaches a
terminal state, a listener may want to read the final IOMetrics via
{{Execution.getIOMetrics()}} – for example, to emit observability events that
include bytes/records in/out.
In {{markFinished()}} and {{{}processFail(){}}},
{{updateAccumulatorsAndMetrics()}} is called *after* {{{}transitionState(){}}},
so listeners always see null from {{{}getIOMetrics(){}}}during FINISHED and
FAILED notifications.
{{completeCancelling()}} already has the correct ordering – it calls
{{updateAccumulatorsAndMetrics()}} *before* {{{}transitionState(){}}}.
The call chain:
{{ Execution.transitionState()}}
{{ -> ExecutionVertex.notifyStateTransition()}}
{{ -> DefaultExecutionGraph.notifyExecutionChange()}}
{{ -> ExecutionStateUpdateListener.onStateUpdate()}}
{{ -> execution.getIOMetrics()}} // null – not yet stored
The fix is to move {{updateAccumulatorsAndMetrics()}} before
{{transitionState()}} in {{markFinished()}} and {{{}processFail(){}}}, matching
the ordering already used in {{{}completeCancelling(){}}}.
was:
{{ExecutionStateUpdateListeners}} registered on the ExecutionGraph are notified
inline during {{{}Execution.transitionState(){}}}. When an execution reaches a
terminal state, a listener may want to read the final IOMetrics via
{{Execution.getIOMetrics()}} -- for example, to emit observability events that
include bytes/records in/out.
In {{markFinished()}} and {{{}processFail(){}}},
{{updateAccumulatorsAndMetrics()}} is called *after* {{{}transitionState(){}}},
so listeners always see null from {{getIOMetrics()}}
during FINISHED and FAILED notifications.
{{completeCancelling()}} already has the correct ordering -- it calls
{{updateAccumulatorsAndMetrics()}} *before* {{{}transitionState(){}}}.
The call chain:
{{ Execution.transitionState()}}
{{ -> ExecutionVertex.notifyStateTransition()}}
{{ -> DefaultExecutionGraph.notifyExecutionChange()}}
{{ -> ExecutionStateUpdateListener.onStateUpdate()}}
{{ -> execution.getIOMetrics()}} // null -- not yet stored
The fix is to move {{updateAccumulatorsAndMetrics()}} before
{{transitionState()}} in {{markFinished()}} and {{{}processFail(){}}}, matching
the ordering already used in
{{{}completeCancelling(){}}}.
> IOMetrics not visible to ExecutionStateUpdateListeners during FINISHED/FAILED
> transitions
> -----------------------------------------------------------------------------------------
>
> Key: FLINK-40003
> URL: https://issues.apache.org/jira/browse/FLINK-40003
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Reporter: Chris Johnson
> Priority: Major
>
> {{ExecutionStateUpdateListeners}} registered on the ExecutionGraph are
> notified inline during {{{}Execution.transitionState(){}}}. When an execution
> reaches a terminal state, a listener may want to read the final IOMetrics
> via {{Execution.getIOMetrics()}} – for example, to emit observability events
> that include bytes/records in/out.
> In {{markFinished()}} and {{{}processFail(){}}},
> {{updateAccumulatorsAndMetrics()}} is called *after*
> {{{}transitionState(){}}}, so listeners always see null from
> {{{}getIOMetrics(){}}}during FINISHED and FAILED notifications.
> {{completeCancelling()}} already has the correct ordering – it calls
> {{updateAccumulatorsAndMetrics()}} *before* {{{}transitionState(){}}}.
> The call chain:
> {{ Execution.transitionState()}}
> {{ -> ExecutionVertex.notifyStateTransition()}}
> {{ -> DefaultExecutionGraph.notifyExecutionChange()}}
> {{ -> ExecutionStateUpdateListener.onStateUpdate()}}
> {{ -> execution.getIOMetrics()}} // null – not yet stored
> The fix is to move {{updateAccumulatorsAndMetrics()}} before
> {{transitionState()}} in {{markFinished()}} and {{{}processFail(){}}},
> matching the ordering already used in {{{}completeCancelling(){}}}.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)