XComp commented on code in PR #24003:
URL: https://github.com/apache/flink/pull/24003#discussion_r1455102444
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java:
##########
@@ -170,11 +176,15 @@ public RootExceptionHistoryEntry(
CompletableFuture<Map<String, String>> failureLabels,
@Nullable String failingTaskName,
@Nullable TaskManagerLocation taskManagerLocation,
- Iterable<ExceptionHistoryEntry> concurrentExceptions) {
+ Collection<ExceptionHistoryEntry> concurrentExceptions) {
super(cause, timestamp, failureLabels, failingTaskName,
taskManagerLocation);
this.concurrentExceptions = concurrentExceptions;
}
+ public void addConcurrentExceptions(Iterable<Execution>
concurrentlyExecutions) {
Review Comment:
The class was previously meant to be immutable. This method is changing that
property. I'm wondering whether we should add `@NotThreadSafe` to the class to
make that specific change more explicit.
The change itself should be still ok because failure handling happens in the
`JobMaster`'s main thread which is how we avoid concurrency issues.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FixedDelayRestartBackoffTimeStrategy.java:
##########
@@ -61,8 +61,9 @@ public long getBackoffTime() {
}
@Override
- public void notifyFailure(Throwable cause) {
+ public boolean notifyFailure(Throwable cause) {
currentRestartAttempt++;
+ return true;
Review Comment:
Aren't we also collecting exceptions here? Because the restart might happen
with some delay. :thinking:
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java:
##########
@@ -150,7 +154,17 @@ public long getTimestamp() {
*
* @return The concurrently failed {@code Executions}.
*/
- public Iterable<Execution> getConcurrentlyFailedExecution() {
+ public Set<Execution> getConcurrentlyFailedExecution() {
return Collections.unmodifiableSet(concurrentlyFailedExecutions);
}
+
+ /**
+ * @return Whether the current failure is a new attempt. True means that
the current failure is
Review Comment:
```suggestion
* @return True means that the current failure is
```
nit: The first sentence does not add any more information. You could also go
with the proposal I shared in `FailureHandlingResult` if you think it's a
reasonable change.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java:
##########
@@ -71,7 +74,10 @@ void setUp() {
failoverStrategy = new TestFailoverStrategy();
testingFailureEnricher = new TestingFailureEnricher();
- backoffTimeStrategy = new TestRestartBackoffTimeStrategy(true,
RESTART_DELAY_MS);
+ isNewAttempt = new AtomicBoolean(true);
+ backoffTimeStrategy =
+ new TestRestartBackoffTimeStrategy(
+ true, RESTART_DELAY_MS, () -> isNewAttempt.get());
Review Comment:
```suggestion
true, RESTART_DELAY_MS, isNewAttempt::get);
```
nit
##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategy.java:
##########
@@ -38,8 +38,11 @@ public interface RestartBackoffTimeStrategy {
* Notify the strategy about the task failure cause.
*
* @param cause of the task failure
+ * @return Whether the current failure is a new attempt. True means that
the current failure is
+ * a new attempt, false means that there has been a failure before and
has not been tried
+ * yet, and the current failure will be merged into the previous
attempt.
Review Comment:
```suggestion
* @return True means that the current failure is the first one after
the most-recent failure
* handling happened, false means that there has been a failure
before that was not handled,
* yet, and the current failure will be considered in a combined
failure handling effort.
```
nit: just another proposal to remove the "attempt" from the description.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureRateRestartBackoffTimeStrategy.java:
##########
@@ -79,11 +79,12 @@ public long getBackoffTime() {
}
@Override
- public void notifyFailure(Throwable cause) {
+ public boolean notifyFailure(Throwable cause) {
if (isFailureTimestampsQueueFull()) {
failureTimestamps.remove();
}
failureTimestamps.add(clock.absoluteTimeMillis());
+ return true;
Review Comment:
Same here, isn't that also collecting failures before doing the restart? So
that we would have not a "first attempt" every single time?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java:
##########
@@ -171,6 +179,9 @@ void testNonRecoverableFailureHandlingResult() throws
Exception {
assertThat(result.getFailureLabels().get())
.isEqualTo(testingFailureEnricher.getFailureLabels());
assertThat(result.getTimestamp()).isEqualTo(timestamp);
+ // NonRecoverableFailure is new attempt even if
RestartBackoffTimeStrategy consider it's not
+ // new attempt.
+ assertThat(result.isNewAttempt()).isTrue();
Review Comment:
```suggestion
assertThat(result.isNewAttempt())
.as(
"A NonRecoverableFailure should be new attempt even
if RestartBackoffTimeStrategy consider it's not new attempt.")
.isTrue();
```
just another nitty hint: For tests, we don't need to add comments but could
rather have a description assertion message. That serves the same purpose as
the command but also makes the test execution more descriptive.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java:
##########
@@ -708,27 +712,43 @@ private void archiveGlobalFailure(
long timestamp,
CompletableFuture<Map<String, String>> failureLabels,
Iterable<Execution> executions) {
- exceptionHistory.add(
+ latestRootExceptionEntry =
RootExceptionHistoryEntry.fromGlobalFailure(
- failure, timestamp, failureLabels, executions));
+ failure, timestamp, failureLabels, executions);
+ exceptionHistory.add(latestRootExceptionEntry);
log.debug("Archive global failure.", failure);
}
protected final void archiveFromFailureHandlingResult(
FailureHandlingResultSnapshot failureHandlingResult) {
+ // Handle all subsequent exceptions as the concurrent exceptions when
it's not a new
+ // attempt.
+ if (!failureHandlingResult.isNewAttempt()) {
+ checkState(
+ latestRootExceptionEntry != null,
+ "A root exception entry should exist if
failureHandlingResult wasn't "
+ + "generated as part of a new error handling
cycle.");
+ List<Execution> concurrentlyExecutions = new ArrayList<>();
+
failureHandlingResult.getRootCauseExecution().ifPresent(concurrentlyExecutions::add);
+
concurrentlyExecutions.addAll(failureHandlingResult.getConcurrentlyFailedExecution());
+
+
latestRootExceptionEntry.addConcurrentExceptions(concurrentlyExecutions);
+ return;
+ }
+
if (failureHandlingResult.getRootCauseExecution().isPresent()) {
Review Comment:
```suggestion
} else if
(failureHandlingResult.getRootCauseExecution().isPresent()) {
```
nit: Just as a proposal to to make it more obvious that we're handling three
exclusive cases in this method. If you go with that proposal, you might want to
move the comment into the if block.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java:
##########
@@ -183,6 +194,55 @@ void testNonRecoverableFailureHandlingResult() throws
Exception {
assertThat(executionFailureHandler.getNumberOfRestarts()).isZero();
}
+ /** Test isNewAttempt of {@link FailureHandlingResult} is expected. */
+ @Test
+ void testNewAttemptAndNumberOfRestarts() throws Exception {
Review Comment:
The comment is obsolete because it could be expressed through the test
method name(s) (to reduce redundant information in the code).
Additionally, what's your opinion on splitting this big test up into smaller
test methods. I see the benefit of having one big one. Because you can check
the number of restarts at the same time. But having more specific test methods
usually helps with improving readability of the test code.. WDYT?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureHandlingResult.java:
##########
@@ -206,6 +214,11 @@ public boolean isGlobalFailure() {
return globalFailure;
}
+ /** @return Whether this failure is a new attempt. */
+ public boolean isNewAttempt() {
Review Comment:
What about calling it `isRootCause` here analogously to the exception
history? ...or isFirstErrorInNewErrorHandlingCycle 🤔 Just as other more
descriptive proposals. Up to you, I'm not fighting for any version. :innocent:
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntryTest.java:
##########
@@ -81,10 +81,14 @@ void testFromFailureHandlingResultSnapshot() throws
ExecutionException, Interrup
final CompletableFuture<Map<String, String>> rootFailureLabels =
CompletableFuture.completedFuture(Collections.singletonMap("key", "value"));
- final Throwable concurrentException = new
IllegalStateException("Expected other failure");
- final ExecutionVertex concurrentlyFailedExecutionVertex =
extractExecutionVertex(1);
- final long concurrentExceptionTimestamp =
- triggerFailure(concurrentlyFailedExecutionVertex,
concurrentException);
+ final Throwable concurrentException1 = new
IllegalStateException("Expected other failure1");
+ final ExecutionVertex concurrentlyFailedExecutionVertex1 =
extractExecutionVertex(1);
+ Predicate<ExceptionHistoryEntry> exception1Predicate =
+ getExceptionHistoryEntryPredicate(
+ concurrentException1,
concurrentlyFailedExecutionVertex1);
+
+ final Throwable concurrentException2 = new
IllegalStateException("Expected other failure2");
+ final ExecutionVertex concurrentlyFailedExecutionVertex2 =
extractExecutionVertex(2);
Review Comment:
Can you move these two lines further down where they are actually used
(alternatively, you could move the Predicate initialization up). The goal would
be to have the entire initialization in one place. Me personally, I would
prefer moving the code down. That gives less distraction to the reader so that
he/she can focus on the first concurrent exception handling at the beginning.
But up to you...
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntryTest.java:
##########
@@ -81,10 +81,14 @@ void testFromFailureHandlingResultSnapshot() throws
ExecutionException, Interrup
final CompletableFuture<Map<String, String>> rootFailureLabels =
CompletableFuture.completedFuture(Collections.singletonMap("key", "value"));
- final Throwable concurrentException = new
IllegalStateException("Expected other failure");
- final ExecutionVertex concurrentlyFailedExecutionVertex =
extractExecutionVertex(1);
- final long concurrentExceptionTimestamp =
- triggerFailure(concurrentlyFailedExecutionVertex,
concurrentException);
+ final Throwable concurrentException1 = new
IllegalStateException("Expected other failure1");
+ final ExecutionVertex concurrentlyFailedExecutionVertex1 =
extractExecutionVertex(1);
+ Predicate<ExceptionHistoryEntry> exception1Predicate =
+ getExceptionHistoryEntryPredicate(
+ concurrentException1,
concurrentlyFailedExecutionVertex1);
+
+ final Throwable concurrentException2 = new
IllegalStateException("Expected other failure2");
+ final ExecutionVertex concurrentlyFailedExecutionVertex2 =
extractExecutionVertex(2);
Review Comment:
Thinking about it: Another solution would be to create a dedicated test
method `testAddConecurrentExceptions`. WDYT? :thinking:
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntryTest.java:
##########
@@ -150,25 +164,24 @@ void testFromGlobalFailure() throws ExecutionException,
InterruptedException {
ExceptionHistoryEntryMatcher.matchesGlobalFailure(
rootCause, rootTimestamp,
rootFailureLabels.get()));
- final Predicate<ExceptionHistoryEntry> exception0Predicate =
- ExceptionHistoryEntryMatcher.matchesFailure(
- concurrentException0,
- concurrentExceptionTimestamp0,
-
concurrentlyFailedExecutionVertex0.getTaskNameWithSubtaskIndex(),
-
concurrentlyFailedExecutionVertex0.getCurrentAssignedResourceLocation());
- final Predicate<ExceptionHistoryEntry> exception1Predicate =
- ExceptionHistoryEntryMatcher.matchesFailure(
- concurrentException1,
- concurrentExceptionTimestamp1,
-
concurrentlyFailedExecutionVertex1.getTaskNameWithSubtaskIndex(),
-
concurrentlyFailedExecutionVertex1.getCurrentAssignedResourceLocation());
assertThat(actualEntry.getConcurrentExceptions())
.allMatch(
exceptionHistoryEntry ->
exception0Predicate.test(exceptionHistoryEntry)
||
exception1Predicate.test(exceptionHistoryEntry));
}
+ private Predicate<ExceptionHistoryEntry> getExceptionHistoryEntryPredicate(
Review Comment:
```suggestion
private Predicate<ExceptionHistoryEntry>
triggerFailureAndCreateEntryMatcher(
```
Based on the old name, I didn't expect the triggering of the failure. We
might want to mention that in the method name because it's an essential part of
each test scenario that the failure is actually registered through a state
update in the `ExecutionGraph`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]