XComp commented on code in PR #28555:
URL: https://github.com/apache/flink/pull/28555#discussion_r3492210636


##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java:
##########
@@ -403,6 +403,68 @@ void testAccumulatorsAndMetricsStorage() throws Exception {
         assertThat(execution2.getUserAccumulators()).isEqualTo(accumulators);
     }
 
+    /**
+     * Verifies that IOMetrics are visible to {@link 
ExecutionStateUpdateListener}s at the time they
+     * are notified of a terminal state transition. This guards against a 
regression where
+     * updateAccumulatorsAndMetrics was called after transitionState (which 
fires listeners inline),
+     * causing listeners to see null IOMetrics.
+     */
+    @Test
+    void testIOMetricsVisibleToListenersDuringStateTransition() throws 
Exception {
+        final JobVertexID jid1 = new JobVertexID();
+        final JobVertexID jid2 = new JobVertexID();
+
+        JobVertex v1 = new JobVertex("v1", jid1);
+        JobVertex v2 = new JobVertex("v2", jid2);
+
+        SchedulerBase scheduler = setupScheduler(v1, 1, v2, 1);
+        ExecutionGraph graph = facade.supply(scheduler::getExecutionGraph);
+
+        // Capture IOMetrics as seen by the listener at notification time
+        Map<ExecutionAttemptID, IOMetrics> metricsSeenByListener = new 
HashMap<>();
+        facade.run(

Review Comment:
   Where is the `facade` variable coming from? This is causing compilation 
errors.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java:
##########
@@ -1166,10 +1166,14 @@ void markFinished(Map<String, Accumulator<?, ?>> 
userAccumulators, IOMetrics met
 
             if (current == INITIALIZING || current == RUNNING || current == 
DEPLOYING) {
 
+                // Store metrics before the state transition so that listeners
+                // notified inside transitionState() can read them via 
getIOMetrics().
+                // This matches the ordering already used by 
completeCancelling().
+                updateAccumulatorsAndMetrics(userAccumulators, metrics);
+
                 if (transitionState(current, FINISHED)) {

Review Comment:
   have you considered moving the metrics update into a pre-completion callback 
that's passed into the `transitionState` method and is called as part of the 
state transition before completing 
[here](https://github.com/apache/flink/blob/d8461ed0c6cb6246dae1daa3d6d5edfc73815ef9/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java#L1685).
   
   We could introduce a `transitionToFinalState` to make the contract cleaner. 
WDYT?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java:
##########


Review Comment:
   `recoverExecution` suffers from the same issue. So, we might want to solve 
it there as well, hm?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to