cjohnson-confluent commented on code in PR #28555:
URL: https://github.com/apache/flink/pull/28555#discussion_r3500239430


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java:
##########
@@ -1673,6 +1709,10 @@ private boolean transitionState(
                         ExceptionUtils.stripCompletionException(error));
             }
 
+            if (preCompletionAction != null) {
+                preCompletionAction.run();
+            }
+
             if (targetState == INITIALIZING || targetState == RUNNING) {
                 initializingOrRunningFuture.complete(null);
             } else if (targetState.isTerminal()) {

Review Comment:
   Agreed, that's the better spot. Moved it into the `targetState.isTerminal()` 
block, right before `terminalStateFuture.complete()`.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java:
##########
@@ -403,6 +403,78 @@ void testAccumulatorsAndMetricsStorage() throws Exception {
         assertThat(execution2.getUserAccumulators()).isEqualTo(accumulators);
     }
 
+    /**
+     * Verifies that IOMetrics are visible to {@link 
ExecutionStateUpdateListener}s at the time they
+     * are notified of a terminal state transition via {@code markFinished()}, 
{@code
+     * processFail()}, and {@code recoverExecution()}.
+     */
+    @Test
+    void testIOMetricsVisibleToListenersDuringStateTransition() throws 
Exception {
+        final JobVertexID jid1 = new JobVertexID();
+        final JobVertexID jid2 = new JobVertexID();
+        final JobVertexID jid3 = new JobVertexID();
+
+        JobVertex v1 = new JobVertex("v1", jid1);
+        JobVertex v2 = new JobVertex("v2", jid2);
+        JobVertex v3 = new JobVertex("v3", jid3);
+
+        SchedulerBase scheduler = setupScheduler(v1, 1, v2, 1, v3, 1);
+        DefaultExecutionGraph graph = (DefaultExecutionGraph) 
scheduler.getExecutionGraph();
+
+        Iterator<Execution> executionIter = 
graph.getRegisteredExecutions().values().iterator();
+        Execution finishedExecution = executionIter.next();
+        Execution failedExecution = executionIter.next();
+        Execution recoveredExecution = executionIter.next();
+
+        // The listener receives an ExecutionAttemptID, not the Execution 
object. Keep a stable
+        // map to resolve executions (markFinished deregisters them after the 
transition).
+        Map<ExecutionAttemptID, Execution> executionsById = new HashMap<>();
+        executionsById.put(finishedExecution.getAttemptId(), 
finishedExecution);
+        executionsById.put(failedExecution.getAttemptId(), failedExecution);
+        executionsById.put(recoveredExecution.getAttemptId(), 
recoveredExecution);
+
+        Map<ExecutionAttemptID, IOMetrics> metricsSeenByListener = new 
HashMap<>();
+        graph.registerExecutionStateUpdateListener(
+                (attemptId, previousState, newState) -> {
+                    if (newState.isTerminal()) {
+                        metricsSeenByListener.put(
+                                attemptId, 
executionsById.get(attemptId).getIOMetrics());
+                    }
+                });
+
+        IOMetrics ioMetrics = new IOMetrics(10, 20, 30, 40, 100, 200, 300);
+
+        // markFinished() only transitions from a running state.
+        finishedExecution.transitionState(ExecutionState.RUNNING);
+        finishedExecution.markFinished(Collections.emptyMap(), ioMetrics);
+
+        // markFailed() with fromSchedulerNg=true to reach the FAILED 
transition.
+        failedExecution.markFailed(
+                new Exception("test failure"),
+                false,
+                Collections.emptyMap(),
+                ioMetrics,
+                false,
+                true);
+
+        // recoverExecution() transitions directly to FINISHED during JM 
failover recovery.
+        recoveredExecution.recoverExecution(
+                recoveredExecution.getAttemptId(),
+                new LocalTaskManagerLocation(),
+                Collections.emptyMap(),
+                ioMetrics);

Review Comment:
   Good recommendation. Extracted a private 
`testMetricsVisibleToListenersDuringTerminalTransition(TerminalStateTransition)`
 helper that takes the transition as a callback, with one `@Test` per path: 
`markFinished`, `markFailed`, `completeCancelling`, and `recoverExecution`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to