XComp commented on code in PR #24003:
URL: https://github.com/apache/flink/pull/24003#discussion_r1444596677
##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureHandlingResult.java:
##########
@@ -66,6 +66,9 @@ public class FailureHandlingResult {
/** True if the original failure was a global failure. */
private final boolean globalFailure;
+ /** Tue if current failure is a new attempt. */
Review Comment:
```suggestion
/** True if current failure is a new attempt. */
```
nit
##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureHandlingResult.java:
##########
@@ -85,7 +89,9 @@ private FailureHandlingResult(
CompletableFuture<Map<String, String>> failureLabels,
@Nullable Set<ExecutionVertexID> verticesToRestart,
long restartDelayMS,
- boolean globalFailure) {
+ boolean globalFailure,
+ boolean isNewAttempt) {
+ this.isNewAttempt = isNewAttempt;
Review Comment:
nit: usually, contributors kind of stick to the order of the parameter list
when setting the fields within the constructor.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java:
##########
@@ -367,17 +367,13 @@ private void restartTasksWithDelay(final
FailureHandlingResult failureHandlingRe
final CompletableFuture<?> cancelFuture =
cancelTasksAsync(verticesToRestart);
- final FailureHandlingResultSnapshot failureHandlingResultSnapshot =
- createFailureHandlingResultSnapshot(failureHandlingResult);
+ archiveFromFailureHandlingResult(
+ createFailureHandlingResultSnapshot(failureHandlingResult));
delayExecutor.schedule(
() ->
FutureUtils.assertNoException(
cancelFuture.thenRunAsync(
- () -> {
- archiveFromFailureHandlingResult(
Review Comment:
> The first commit is refactoring, actually, I don't know why archiving
exception when restarting task instead of immediately. It means, when one task
failure, we can see the exception history after flink restart this task. So the
first commit is only a refactoring. It archives exceptions into the exception
history immediately when they occur, instead of archiving them when restarting.
I guess the motivation was to be able to collect all concurrent exceptions
that happen before triggering the restart. But you're right - it doesn't make a
difference because we're creating the `FailureHandlingResultSnapshot` already
earlier (before scheduling the restart). I'm just wondering whether we should
have created the snapshot in the scheduled task rather than before scheduling
it to capture any concurrent exceptions that happened before the restart is
triggered. :thinking:
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java:
##########
@@ -703,27 +707,41 @@ private void archiveGlobalFailure(
long timestamp,
CompletableFuture<Map<String, String>> failureLabels,
Iterable<Execution> executions) {
- exceptionHistory.add(
+ latestRootExceptionEntry =
RootExceptionHistoryEntry.fromGlobalFailure(
- failure, timestamp, failureLabels, executions));
+ failure, timestamp, failureLabels, executions);
+ exceptionHistory.add(latestRootExceptionEntry);
log.debug("Archive global failure.", failure);
}
protected final void archiveFromFailureHandlingResult(
FailureHandlingResultSnapshot failureHandlingResult) {
+ // ALl exceptions as the ConcurrentExceptions when it's not a new
attempt.
Review Comment:
```suggestion
// handle all subsequent exceptions as the concurrent exceptions
when it's not a new attempt.
```
nit: the verb was missing. Additionally, `ConcurrentExceptions` indicates
that it's some kind of class. We could use the language casing here in my
opinion.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java:
##########
@@ -703,27 +707,41 @@ private void archiveGlobalFailure(
long timestamp,
CompletableFuture<Map<String, String>> failureLabels,
Iterable<Execution> executions) {
- exceptionHistory.add(
+ latestRootExceptionEntry =
RootExceptionHistoryEntry.fromGlobalFailure(
- failure, timestamp, failureLabels, executions));
+ failure, timestamp, failureLabels, executions);
+ exceptionHistory.add(latestRootExceptionEntry);
log.debug("Archive global failure.", failure);
}
protected final void archiveFromFailureHandlingResult(
FailureHandlingResultSnapshot failureHandlingResult) {
+ // ALl exceptions as the ConcurrentExceptions when it's not a new
attempt.
+ if (!failureHandlingResult.isNewAttempt()) {
+ checkState(latestRootExceptionEntry != null, "It should have old
failure.");
+ List<Execution> concurrentlyExecutions = new LinkedList<>();
+
failureHandlingResult.getRootCauseExecution().ifPresent(concurrentlyExecutions::add);
+
concurrentlyExecutions.addAll(failureHandlingResult.getConcurrentlyFailedExecution());
Review Comment:
nit: Here we use a LinkedList on a Set (which is returned by
`getConcurrentlyFailedExecution`). Either we switch to `ArrayList` to benefit
from the `addAll` call or we revert the return type of
`getConcurrentlyFailedExecution` back to `Iterable` and benefit from the
single-element add performance of `LinkedList`. WDYT? :thinking:
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java:
##########
@@ -150,7 +154,11 @@ public long getTimestamp() {
*
* @return The concurrently failed {@code Executions}.
*/
- public Iterable<Execution> getConcurrentlyFailedExecution() {
+ public Set<Execution> getConcurrentlyFailedExecution() {
Review Comment:
I'm not sure whether this change is necessary. See more other related
comment.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureHandlingResult.java:
##########
@@ -206,6 +214,11 @@ public boolean isGlobalFailure() {
return globalFailure;
}
+ /** @return Whether this failure is a new attempt. */
+ public boolean isNewAttempt() {
Review Comment:
The term "attempt" seems to be a bit ambigious in the context of the
`FailreHandlingResult`. WDYT? :thinking:
But I cannot come up with a better proposal, either. :shrug:
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java:
##########
@@ -96,7 +98,7 @@ public static RootExceptionHistoryEntry fromGlobalFailure(
}
public static RootExceptionHistoryEntry fromExceptionHistoryEntry(
- ExceptionHistoryEntry entry, Iterable<ExceptionHistoryEntry>
entries) {
+ ExceptionHistoryEntry entry, List<ExceptionHistoryEntry> entries) {
Review Comment:
`List` is too restrictive here, AFAIS
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java:
##########
@@ -703,27 +707,41 @@ private void archiveGlobalFailure(
long timestamp,
CompletableFuture<Map<String, String>> failureLabels,
Iterable<Execution> executions) {
- exceptionHistory.add(
+ latestRootExceptionEntry =
RootExceptionHistoryEntry.fromGlobalFailure(
- failure, timestamp, failureLabels, executions));
+ failure, timestamp, failureLabels, executions);
+ exceptionHistory.add(latestRootExceptionEntry);
log.debug("Archive global failure.", failure);
}
protected final void archiveFromFailureHandlingResult(
FailureHandlingResultSnapshot failureHandlingResult) {
+ // ALl exceptions as the ConcurrentExceptions when it's not a new
attempt.
+ if (!failureHandlingResult.isNewAttempt()) {
+ checkState(latestRootExceptionEntry != null, "It should have old
failure.");
Review Comment:
```suggestion
checkState(latestRootExceptionEntry != null, "A root exception
entry should exist if failureHandlingResult wasn't generated as part of a new
error handling cycle.");
```
Another nitty thing: Using "it" (or any other pronouns) in code causes
ambiguity in a lot of cases. We might want to be more explicit when documenting
code.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java:
##########
@@ -140,15 +142,20 @@ private static RootExceptionHistoryEntry
createRootExceptionHistoryEntry(
failureLabels,
failingTaskName,
taskManagerLocation,
- StreamSupport.stream(executions.spliterator(), false)
- .filter(execution ->
execution.getFailureInfo().isPresent())
- .map(
- execution ->
- ExceptionHistoryEntry.create(
- execution,
-
execution.getVertexWithAttempt(),
-
FailureEnricherUtils.EMPTY_FAILURE_LABELS))
- .collect(Collectors.toList()));
+ createExceptionHistoryEntries(executions));
+ }
+
+ public static List<ExceptionHistoryEntry> createExceptionHistoryEntries(
Review Comment:
The intial idea was to keep the exception history entries immutable. This
change adds the `addConcurrentExceptions` method (which is ok, I guess, because
the scheduler runs in the main thread and we don't have to be that strict on
immutable objects in this case). But we don't need to expose
`createExceptionHistoryEntries` here. Instead, we could move the logic into
`addConcurrentExceptions` and call `addConcurrentExceptions` within
`createRootExceptionHistoryEntry` on the newly created instance. WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]