cjohnson-confluent commented on code in PR #28555:
URL: https://github.com/apache/flink/pull/28555#discussion_r3500240647
##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java:
##########
@@ -403,6 +403,78 @@ void testAccumulatorsAndMetricsStorage() throws Exception {
assertThat(execution2.getUserAccumulators()).isEqualTo(accumulators);
}
+ /**
+ * Verifies that IOMetrics are visible to {@link
ExecutionStateUpdateListener}s at the time they
+ * are notified of a terminal state transition via {@code markFinished()},
{@code
+ * processFail()}, and {@code recoverExecution()}.
+ */
+ @Test
+ void testIOMetricsVisibleToListenersDuringStateTransition() throws
Exception {
+ final JobVertexID jid1 = new JobVertexID();
+ final JobVertexID jid2 = new JobVertexID();
+ final JobVertexID jid3 = new JobVertexID();
+
+ JobVertex v1 = new JobVertex("v1", jid1);
+ JobVertex v2 = new JobVertex("v2", jid2);
+ JobVertex v3 = new JobVertex("v3", jid3);
+
+ SchedulerBase scheduler = setupScheduler(v1, 1, v2, 1, v3, 1);
+ DefaultExecutionGraph graph = (DefaultExecutionGraph)
scheduler.getExecutionGraph();
+
+ Iterator<Execution> executionIter =
graph.getRegisteredExecutions().values().iterator();
+ Execution finishedExecution = executionIter.next();
+ Execution failedExecution = executionIter.next();
+ Execution recoveredExecution = executionIter.next();
+
+ // The listener receives an ExecutionAttemptID, not the Execution
object. Keep a stable
+ // map to resolve executions (markFinished deregisters them after the
transition).
+ Map<ExecutionAttemptID, Execution> executionsById = new HashMap<>();
+ executionsById.put(finishedExecution.getAttemptId(),
finishedExecution);
+ executionsById.put(failedExecution.getAttemptId(), failedExecution);
+ executionsById.put(recoveredExecution.getAttemptId(),
recoveredExecution);
+
+ Map<ExecutionAttemptID, IOMetrics> metricsSeenByListener = new
HashMap<>();
+ graph.registerExecutionStateUpdateListener(
+ (attemptId, previousState, newState) -> {
+ if (newState.isTerminal()) {
+ metricsSeenByListener.put(
+ attemptId,
executionsById.get(attemptId).getIOMetrics());
+ }
+ });
+
+ IOMetrics ioMetrics = new IOMetrics(10, 20, 30, 40, 100, 200, 300);
+
+ // markFinished() only transitions from a running state.
+ finishedExecution.transitionState(ExecutionState.RUNNING);
+ finishedExecution.markFinished(Collections.emptyMap(), ioMetrics);
+
+ // markFailed() with fromSchedulerNg=true to reach the FAILED
transition.
+ failedExecution.markFailed(
+ new Exception("test failure"),
+ false,
+ Collections.emptyMap(),
+ ioMetrics,
+ false,
+ true);
+
+ // recoverExecution() transitions directly to FINISHED during JM
failover recovery.
+ recoveredExecution.recoverExecution(
+ recoveredExecution.getAttemptId(),
+ new LocalTaskManagerLocation(),
+ Collections.emptyMap(),
+ ioMetrics);
Review Comment:
Added.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java:
##########
@@ -403,6 +403,78 @@ void testAccumulatorsAndMetricsStorage() throws Exception {
assertThat(execution2.getUserAccumulators()).isEqualTo(accumulators);
}
+ /**
+ * Verifies that IOMetrics are visible to {@link
ExecutionStateUpdateListener}s at the time they
+ * are notified of a terminal state transition via {@code markFinished()},
{@code
+ * processFail()}, and {@code recoverExecution()}.
+ */
+ @Test
+ void testIOMetricsVisibleToListenersDuringStateTransition() throws
Exception {
+ final JobVertexID jid1 = new JobVertexID();
+ final JobVertexID jid2 = new JobVertexID();
+ final JobVertexID jid3 = new JobVertexID();
+
+ JobVertex v1 = new JobVertex("v1", jid1);
+ JobVertex v2 = new JobVertex("v2", jid2);
+ JobVertex v3 = new JobVertex("v3", jid3);
+
+ SchedulerBase scheduler = setupScheduler(v1, 1, v2, 1, v3, 1);
+ DefaultExecutionGraph graph = (DefaultExecutionGraph)
scheduler.getExecutionGraph();
+
+ Iterator<Execution> executionIter =
graph.getRegisteredExecutions().values().iterator();
+ Execution finishedExecution = executionIter.next();
+ Execution failedExecution = executionIter.next();
+ Execution recoveredExecution = executionIter.next();
+
+ // The listener receives an ExecutionAttemptID, not the Execution
object. Keep a stable
+ // map to resolve executions (markFinished deregisters them after the
transition).
+ Map<ExecutionAttemptID, Execution> executionsById = new HashMap<>();
+ executionsById.put(finishedExecution.getAttemptId(),
finishedExecution);
+ executionsById.put(failedExecution.getAttemptId(), failedExecution);
+ executionsById.put(recoveredExecution.getAttemptId(),
recoveredExecution);
+
+ Map<ExecutionAttemptID, IOMetrics> metricsSeenByListener = new
HashMap<>();
+ graph.registerExecutionStateUpdateListener(
+ (attemptId, previousState, newState) -> {
+ if (newState.isTerminal()) {
+ metricsSeenByListener.put(
+ attemptId,
executionsById.get(attemptId).getIOMetrics());
+ }
+ });
+
+ IOMetrics ioMetrics = new IOMetrics(10, 20, 30, 40, 100, 200, 300);
+
+ // markFinished() only transitions from a running state.
+ finishedExecution.transitionState(ExecutionState.RUNNING);
+ finishedExecution.markFinished(Collections.emptyMap(), ioMetrics);
+
+ // markFailed() with fromSchedulerNg=true to reach the FAILED
transition.
+ failedExecution.markFailed(
+ new Exception("test failure"),
+ false,
+ Collections.emptyMap(),
+ ioMetrics,
+ false,
+ true);
+
+ // recoverExecution() transitions directly to FINISHED during JM
failover recovery.
+ recoveredExecution.recoverExecution(
+ recoveredExecution.getAttemptId(),
+ new LocalTaskManagerLocation(),
+ Collections.emptyMap(),
+ ioMetrics);
+
+ for (Map.Entry<ExecutionAttemptID, Execution> entry :
executionsById.entrySet()) {
+ IOMetrics captured = metricsSeenByListener.get(entry.getKey());
+ assertThat(captured)
Review Comment:
Done. The helper now captures `getUserAccumulators()` inside the listener
alongside the IOMetrics and asserts both. Used a non-empty accumulator map so
the check is meaningful.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]