1996fanrui commented on code in PR #24003:
URL: https://github.com/apache/flink/pull/24003#discussion_r1455296666
##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java:
##########
@@ -183,6 +194,55 @@ void testNonRecoverableFailureHandlingResult() throws
Exception {
assertThat(executionFailureHandler.getNumberOfRestarts()).isZero();
}
+ /** Test isNewAttempt of {@link FailureHandlingResult} is expected. */
+ @Test
+ void testNewAttemptAndNumberOfRestarts() throws Exception {
Review Comment:
> The comment is obsolete because it could be expressed through the test
method name(s) (to reduce redundant information in the code).
Removed the comment.
> Additionally, what's your opinion on splitting this big test up into
smaller test methods. I see the benefit of having one big one. Because you can
check the number of restarts at the same time. But having more specific test
methods usually helps with improving readability of the test code.. WDYT?
This test is the whole test, it wants to check NumberOfRestarts is excepted
when we have a series of exceptions, and the exception merging logic work well.
I added some comments inside of this test to try let it is easy to
understand. You can check again.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureHandlingResult.java:
##########
@@ -206,6 +214,11 @@ public boolean isGlobalFailure() {
return globalFailure;
}
+ /** @return Whether this failure is a new attempt. */
+ public boolean isNewAttempt() {
Review Comment:
Renaming it to `isRootCause` sounds make sense to me. Updated.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureRateRestartBackoffTimeStrategy.java:
##########
@@ -79,11 +79,12 @@ public long getBackoffTime() {
}
@Override
- public void notifyFailure(Throwable cause) {
+ public boolean notifyFailure(Throwable cause) {
if (isFailureTimestampsQueueFull()) {
failureTimestamps.remove();
}
failureTimestamps.add(clock.absoluteTimeMillis());
+ return true;
Review Comment:
See the last comment.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java:
##########
@@ -170,11 +176,15 @@ public RootExceptionHistoryEntry(
CompletableFuture<Map<String, String>> failureLabels,
@Nullable String failingTaskName,
@Nullable TaskManagerLocation taskManagerLocation,
- Iterable<ExceptionHistoryEntry> concurrentExceptions) {
+ Collection<ExceptionHistoryEntry> concurrentExceptions) {
super(cause, timestamp, failureLabels, failingTaskName,
taskManagerLocation);
this.concurrentExceptions = concurrentExceptions;
}
+ public void addConcurrentExceptions(Iterable<Execution>
concurrentlyExecutions) {
Review Comment:
Good catch, added the `@NotThreadSafe`.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntryTest.java:
##########
@@ -81,10 +81,14 @@ void testFromFailureHandlingResultSnapshot() throws
ExecutionException, Interrup
final CompletableFuture<Map<String, String>> rootFailureLabels =
CompletableFuture.completedFuture(Collections.singletonMap("key", "value"));
- final Throwable concurrentException = new
IllegalStateException("Expected other failure");
- final ExecutionVertex concurrentlyFailedExecutionVertex =
extractExecutionVertex(1);
- final long concurrentExceptionTimestamp =
- triggerFailure(concurrentlyFailedExecutionVertex,
concurrentException);
+ final Throwable concurrentException1 = new
IllegalStateException("Expected other failure1");
+ final ExecutionVertex concurrentlyFailedExecutionVertex1 =
extractExecutionVertex(1);
+ Predicate<ExceptionHistoryEntry> exception1Predicate =
+ getExceptionHistoryEntryPredicate(
+ concurrentException1,
concurrentlyFailedExecutionVertex1);
+
+ final Throwable concurrentException2 = new
IllegalStateException("Expected other failure2");
+ final ExecutionVertex concurrentlyFailedExecutionVertex2 =
extractExecutionVertex(2);
Review Comment:
My thoughts is moving the code down instead of create a new test method.
Because `testAddConecurrentExceptions` will use all code of this test. WDYT?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FixedDelayRestartBackoffTimeStrategy.java:
##########
@@ -61,8 +61,9 @@ public long getBackoffTime() {
}
@Override
- public void notifyFailure(Throwable cause) {
+ public boolean notifyFailure(Throwable cause) {
currentRestartAttempt++;
+ return true;
Review Comment:
Only exponential-delay restart strategy make exception merging.
The background is [FLIP-364](https://cwiki.apache.org/confluence/x/uJqzDw),
Before FLIP-364, each exception will be considered a new attempt for all
restart strategies.
FLIP-364 support merge multiple exceptions into one attempt for
`exponential-delay restart strategy`, and the each attempt matches with
exception merging.
It means when one flink job restart 5 times, users can see 5 exceptions in
the WebUI.
We don't need to worry about the rest of restart strategies,
`exponential-delay restart strategy` has been set to the default `restart
strategy`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]