This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d031f6008d7cc2cb7928adbf84ff89e7bccf9dde Author: Rui Fan <[email protected]> AuthorDate: Wed Jan 10 15:48:31 2024 +0800 [FLINK-33565][Scheduler] Correct the numberOfRestarts metric --- .../executiongraph/failover/ExecutionFailureHandler.java | 4 +++- .../executiongraph/failover/ExecutionFailureHandlerTest.java | 10 +++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java index 0ff7f673a55..aed330de522 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java @@ -163,7 +163,9 @@ public class ExecutionFailureHandler { boolean isNewAttempt = restartBackoffTimeStrategy.notifyFailure(cause); if (restartBackoffTimeStrategy.canRestart()) { - numberOfRestarts++; + if (isNewAttempt) { + numberOfRestarts++; + } return FailureHandlingResult.restartable( failedExecution, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java index 3102694be28..a9d3beada8a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java @@ -195,7 +195,7 @@ class ExecutionFailureHandlerTest { } @Test - void testNewAttempt() throws Exception { + void testNewAttemptAndNumberOfRestarts() throws Exception { final Set<ExecutionVertexID> tasksToRestart = Collections.singleton(new ExecutionVertexID(new JobVertexID(), 0)); failoverStrategy.setTasksToRestart(tasksToRestart); @@ -220,6 +220,7 @@ class ExecutionFailureHandlerTest { } private void testHandlingRootException(Execution execution, Throwable error) { + final long originalNumberOfRestarts = executionFailureHandler.getNumberOfRestarts(); FailureHandlingResult result = executionFailureHandler.getFailureHandlingResult( execution, error, System.currentTimeMillis()); @@ -227,9 +228,13 @@ class ExecutionFailureHandlerTest { .as( "The FailureHandlingResult should be the root cause if exception is new attempt.") .isTrue(); + assertThat(executionFailureHandler.getNumberOfRestarts()) + .as("The numberOfRestarts should be increased when it's a root exception.") + .isEqualTo(originalNumberOfRestarts + 1); } private void testHandlingConcurrentException(Execution execution, Throwable error) { + final long originalNumberOfRestarts = executionFailureHandler.getNumberOfRestarts(); FailureHandlingResult result = executionFailureHandler.getFailureHandlingResult( execution, error, System.currentTimeMillis()); @@ -237,6 +242,9 @@ class ExecutionFailureHandlerTest { .as( "The FailureHandlingResult shouldn't be the root cause if exception isn't new attempt.") .isFalse(); + assertThat(executionFailureHandler.getNumberOfRestarts()) + .as("The numberOfRestarts shouldn't be increased when it isn't a root exception.") + .isEqualTo(originalNumberOfRestarts); } /** Tests the check for unrecoverable error. */
