XComp commented on code in PR #24003:
URL: https://github.com/apache/flink/pull/24003#discussion_r1458870055


##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java:
##########
@@ -183,6 +194,59 @@ void testNonRecoverableFailureHandlingResult() throws 
Exception {
         assertThat(executionFailureHandler.getNumberOfRestarts()).isZero();
     }
 
+    @Test
+    void testNewAttemptAndNumberOfRestarts() throws Exception {
+        final Set<ExecutionVertexID> tasksToRestart =
+                Collections.singleton(new ExecutionVertexID(new JobVertexID(), 
0));
+        failoverStrategy.setTasksToRestart(tasksToRestart);
+
+        Execution execution =
+                
FailureHandlingResultTest.createExecution(EXECUTOR_RESOURCE.getExecutor());
+        final Throwable error = new Exception("expected test failure");
+
+        assertHandlerRootException(execution, error);
+
+        isNewAttempt.set(false);
+        assertHandlerConcurrentException(execution, error);
+        assertHandlerConcurrentException(execution, error);
+
+        isNewAttempt.set(true);
+        assertHandlerRootException(execution, error);
+        assertHandlerRootException(execution, error);
+
+        isNewAttempt.set(false);
+        assertHandlerConcurrentException(execution, error);
+        assertHandlerConcurrentException(execution, error);
+    }
+
+    private void assertHandlerRootException(Execution execution, Throwable 
error) {
+        final long originalNumberOfRestarts = 
executionFailureHandler.getNumberOfRestarts();
+        FailureHandlingResult result =
+                executionFailureHandler.getFailureHandlingResult(
+                        execution, error, System.currentTimeMillis());
+        assertThat(result.isRootCause())
+                .as(
+                        "The FailureHandlingResult shouldn be the root cause 
if exception is new attempt.")

Review Comment:
   ```suggestion
                           "The FailureHandlingResult should be the root cause 
if exception is new attempt.")
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java:
##########
@@ -183,6 +194,59 @@ void testNonRecoverableFailureHandlingResult() throws 
Exception {
         assertThat(executionFailureHandler.getNumberOfRestarts()).isZero();
     }
 
+    @Test
+    void testNewAttemptAndNumberOfRestarts() throws Exception {
+        final Set<ExecutionVertexID> tasksToRestart =
+                Collections.singleton(new ExecutionVertexID(new JobVertexID(), 
0));
+        failoverStrategy.setTasksToRestart(tasksToRestart);
+
+        Execution execution =
+                
FailureHandlingResultTest.createExecution(EXECUTOR_RESOURCE.getExecutor());
+        final Throwable error = new Exception("expected test failure");
+
+        assertHandlerRootException(execution, error);
+
+        isNewAttempt.set(false);
+        assertHandlerConcurrentException(execution, error);
+        assertHandlerConcurrentException(execution, error);
+
+        isNewAttempt.set(true);
+        assertHandlerRootException(execution, error);
+        assertHandlerRootException(execution, error);
+
+        isNewAttempt.set(false);
+        assertHandlerConcurrentException(execution, error);
+        assertHandlerConcurrentException(execution, error);
+    }
+
+    private void assertHandlerRootException(Execution execution, Throwable 
error) {
+        final long originalNumberOfRestarts = 
executionFailureHandler.getNumberOfRestarts();
+        FailureHandlingResult result =
+                executionFailureHandler.getFailureHandlingResult(
+                        execution, error, System.currentTimeMillis());
+        assertThat(result.isRootCause())
+                .as(
+                        "The FailureHandlingResult shouldn be the root cause 
if exception is new attempt.")
+                .isTrue();
+        assertThat(executionFailureHandler.getNumberOfRestarts())
+                .as("The numberOfRestarts should be increased when it's a root 
exception.")
+                .isEqualTo(originalNumberOfRestarts + 1);
+    }
+
+    private void assertHandlerConcurrentException(Execution execution, 
Throwable error) {

Review Comment:
   ```suggestion
       private void testHandlingConcurrentException(Execution execution, 
Throwable error) {
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java:
##########
@@ -183,6 +194,59 @@ void testNonRecoverableFailureHandlingResult() throws 
Exception {
         assertThat(executionFailureHandler.getNumberOfRestarts()).isZero();
     }
 
+    @Test
+    void testNewAttemptAndNumberOfRestarts() throws Exception {
+        final Set<ExecutionVertexID> tasksToRestart =
+                Collections.singleton(new ExecutionVertexID(new JobVertexID(), 
0));
+        failoverStrategy.setTasksToRestart(tasksToRestart);
+
+        Execution execution =
+                
FailureHandlingResultTest.createExecution(EXECUTOR_RESOURCE.getExecutor());
+        final Throwable error = new Exception("expected test failure");
+
+        assertHandlerRootException(execution, error);
+
+        isNewAttempt.set(false);
+        assertHandlerConcurrentException(execution, error);
+        assertHandlerConcurrentException(execution, error);
+
+        isNewAttempt.set(true);
+        assertHandlerRootException(execution, error);
+        assertHandlerRootException(execution, error);
+
+        isNewAttempt.set(false);
+        assertHandlerConcurrentException(execution, error);
+        assertHandlerConcurrentException(execution, error);
+    }
+
+    private void assertHandlerRootException(Execution execution, Throwable 
error) {

Review Comment:
   ```suggestion
       private void testHandlingRootException(Execution execution, Throwable 
error) {
   ```
   `assert` indicates that we're only checking the value of a preceding 
operation. But we're also running the actual test code in this method. So, I'd 
suggest to rename the method.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntryTest.java:
##########
@@ -81,10 +81,14 @@ void testFromFailureHandlingResultSnapshot() throws 
ExecutionException, Interrup
         final CompletableFuture<Map<String, String>> rootFailureLabels =
                 
CompletableFuture.completedFuture(Collections.singletonMap("key", "value"));
 
-        final Throwable concurrentException = new 
IllegalStateException("Expected other failure");
-        final ExecutionVertex concurrentlyFailedExecutionVertex = 
extractExecutionVertex(1);
-        final long concurrentExceptionTimestamp =
-                triggerFailure(concurrentlyFailedExecutionVertex, 
concurrentException);
+        final Throwable concurrentException1 = new 
IllegalStateException("Expected other failure1");
+        final ExecutionVertex concurrentlyFailedExecutionVertex1 = 
extractExecutionVertex(1);
+        Predicate<ExceptionHistoryEntry> exception1Predicate =
+                getExceptionHistoryEntryPredicate(
+                        concurrentException1, 
concurrentlyFailedExecutionVertex1);
+
+        final Throwable concurrentException2 = new 
IllegalStateException("Expected other failure2");
+        final ExecutionVertex concurrentlyFailedExecutionVertex2 = 
extractExecutionVertex(2);

Review Comment:
   Fine with me to keep it like that.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FixedDelayRestartBackoffTimeStrategy.java:
##########
@@ -61,8 +61,9 @@ public long getBackoffTime() {
     }
 
     @Override
-    public void notifyFailure(Throwable cause) {
+    public boolean notifyFailure(Throwable cause) {
         currentRestartAttempt++;
+        return true;

Review Comment:
   Thanks for the clarification. That makes sense.
   
   > After FLIP-360 discussion, I found exponential-delay restart strategy can 
replace other restart strategies directly if users set the 
restart-strategy.exponential-delay.backoff-multiplier = 1
   
   I guess, you meant 
[FLIP-364](https://cwiki.apache.org/confluence/display/FLINK/FLIP-364%3A+Improve+the+exponential-delay+restart-strategy)
 here? ;-) You're right, we could deprecate the class. But there is still a 
benefit of keeping the configuration parameter (as an alias that configures the 
`ExponentialDelayRestartBackoffTimeStrategy` with multiplier=1) to make it 
easier to configure this behavior. 
   
   What about deprecating that class as part of this PR and coming up with a 
follow-up Jira issue that replaces the strategy?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to