XComp commented on code in PR #24003:
URL: https://github.com/apache/flink/pull/24003#discussion_r1458870055
##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java:
##########
@@ -183,6 +194,59 @@ void testNonRecoverableFailureHandlingResult() throws
Exception {
assertThat(executionFailureHandler.getNumberOfRestarts()).isZero();
}
+ @Test
+ void testNewAttemptAndNumberOfRestarts() throws Exception {
+ final Set<ExecutionVertexID> tasksToRestart =
+ Collections.singleton(new ExecutionVertexID(new JobVertexID(),
0));
+ failoverStrategy.setTasksToRestart(tasksToRestart);
+
+ Execution execution =
+
FailureHandlingResultTest.createExecution(EXECUTOR_RESOURCE.getExecutor());
+ final Throwable error = new Exception("expected test failure");
+
+ assertHandlerRootException(execution, error);
+
+ isNewAttempt.set(false);
+ assertHandlerConcurrentException(execution, error);
+ assertHandlerConcurrentException(execution, error);
+
+ isNewAttempt.set(true);
+ assertHandlerRootException(execution, error);
+ assertHandlerRootException(execution, error);
+
+ isNewAttempt.set(false);
+ assertHandlerConcurrentException(execution, error);
+ assertHandlerConcurrentException(execution, error);
+ }
+
+ private void assertHandlerRootException(Execution execution, Throwable
error) {
+ final long originalNumberOfRestarts =
executionFailureHandler.getNumberOfRestarts();
+ FailureHandlingResult result =
+ executionFailureHandler.getFailureHandlingResult(
+ execution, error, System.currentTimeMillis());
+ assertThat(result.isRootCause())
+ .as(
+ "The FailureHandlingResult shouldn be the root cause
if exception is new attempt.")
Review Comment:
```suggestion
"The FailureHandlingResult should be the root cause
if exception is new attempt.")
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java:
##########
@@ -183,6 +194,59 @@ void testNonRecoverableFailureHandlingResult() throws
Exception {
assertThat(executionFailureHandler.getNumberOfRestarts()).isZero();
}
+ @Test
+ void testNewAttemptAndNumberOfRestarts() throws Exception {
+ final Set<ExecutionVertexID> tasksToRestart =
+ Collections.singleton(new ExecutionVertexID(new JobVertexID(),
0));
+ failoverStrategy.setTasksToRestart(tasksToRestart);
+
+ Execution execution =
+
FailureHandlingResultTest.createExecution(EXECUTOR_RESOURCE.getExecutor());
+ final Throwable error = new Exception("expected test failure");
+
+ assertHandlerRootException(execution, error);
+
+ isNewAttempt.set(false);
+ assertHandlerConcurrentException(execution, error);
+ assertHandlerConcurrentException(execution, error);
+
+ isNewAttempt.set(true);
+ assertHandlerRootException(execution, error);
+ assertHandlerRootException(execution, error);
+
+ isNewAttempt.set(false);
+ assertHandlerConcurrentException(execution, error);
+ assertHandlerConcurrentException(execution, error);
+ }
+
+ private void assertHandlerRootException(Execution execution, Throwable
error) {
+ final long originalNumberOfRestarts =
executionFailureHandler.getNumberOfRestarts();
+ FailureHandlingResult result =
+ executionFailureHandler.getFailureHandlingResult(
+ execution, error, System.currentTimeMillis());
+ assertThat(result.isRootCause())
+ .as(
+ "The FailureHandlingResult shouldn be the root cause
if exception is new attempt.")
+ .isTrue();
+ assertThat(executionFailureHandler.getNumberOfRestarts())
+ .as("The numberOfRestarts should be increased when it's a root
exception.")
+ .isEqualTo(originalNumberOfRestarts + 1);
+ }
+
+ private void assertHandlerConcurrentException(Execution execution,
Throwable error) {
Review Comment:
```suggestion
private void testHandlingConcurrentException(Execution execution,
Throwable error) {
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java:
##########
@@ -183,6 +194,59 @@ void testNonRecoverableFailureHandlingResult() throws
Exception {
assertThat(executionFailureHandler.getNumberOfRestarts()).isZero();
}
+ @Test
+ void testNewAttemptAndNumberOfRestarts() throws Exception {
+ final Set<ExecutionVertexID> tasksToRestart =
+ Collections.singleton(new ExecutionVertexID(new JobVertexID(),
0));
+ failoverStrategy.setTasksToRestart(tasksToRestart);
+
+ Execution execution =
+
FailureHandlingResultTest.createExecution(EXECUTOR_RESOURCE.getExecutor());
+ final Throwable error = new Exception("expected test failure");
+
+ assertHandlerRootException(execution, error);
+
+ isNewAttempt.set(false);
+ assertHandlerConcurrentException(execution, error);
+ assertHandlerConcurrentException(execution, error);
+
+ isNewAttempt.set(true);
+ assertHandlerRootException(execution, error);
+ assertHandlerRootException(execution, error);
+
+ isNewAttempt.set(false);
+ assertHandlerConcurrentException(execution, error);
+ assertHandlerConcurrentException(execution, error);
+ }
+
+ private void assertHandlerRootException(Execution execution, Throwable
error) {
Review Comment:
```suggestion
private void testHandlingRootException(Execution execution, Throwable
error) {
```
`assert` indicates that we're only checking the value of a preceding
operation. But we're also running the actual test code in this method. So, I'd
suggest to rename the method.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntryTest.java:
##########
@@ -81,10 +81,14 @@ void testFromFailureHandlingResultSnapshot() throws
ExecutionException, Interrup
final CompletableFuture<Map<String, String>> rootFailureLabels =
CompletableFuture.completedFuture(Collections.singletonMap("key", "value"));
- final Throwable concurrentException = new
IllegalStateException("Expected other failure");
- final ExecutionVertex concurrentlyFailedExecutionVertex =
extractExecutionVertex(1);
- final long concurrentExceptionTimestamp =
- triggerFailure(concurrentlyFailedExecutionVertex,
concurrentException);
+ final Throwable concurrentException1 = new
IllegalStateException("Expected other failure1");
+ final ExecutionVertex concurrentlyFailedExecutionVertex1 =
extractExecutionVertex(1);
+ Predicate<ExceptionHistoryEntry> exception1Predicate =
+ getExceptionHistoryEntryPredicate(
+ concurrentException1,
concurrentlyFailedExecutionVertex1);
+
+ final Throwable concurrentException2 = new
IllegalStateException("Expected other failure2");
+ final ExecutionVertex concurrentlyFailedExecutionVertex2 =
extractExecutionVertex(2);
Review Comment:
Fine with me to keep it like that.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FixedDelayRestartBackoffTimeStrategy.java:
##########
@@ -61,8 +61,9 @@ public long getBackoffTime() {
}
@Override
- public void notifyFailure(Throwable cause) {
+ public boolean notifyFailure(Throwable cause) {
currentRestartAttempt++;
+ return true;
Review Comment:
Thanks for the clarification. That makes sense.
> After FLIP-360 discussion, I found exponential-delay restart strategy can
replace other restart strategies directly if users set the
restart-strategy.exponential-delay.backoff-multiplier = 1
I guess, you meant
[FLIP-364](https://cwiki.apache.org/confluence/display/FLINK/FLIP-364%3A+Improve+the+exponential-delay+restart-strategy)
here? ;-) You're right, we could deprecate the class. But there is still a
benefit of keeping the configuration parameter (as an alias that configures the
`ExponentialDelayRestartBackoffTimeStrategy` with multiplier=1) to make it
easier to configure this behavior.
What about deprecating that class as part of this PR and coming up with a
follow-up Jira issue that replaces the strategy?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]