XComp opened a new pull request #15640: URL: https://github.com/apache/flink/pull/15640
## What is the purpose of the change This commit fixes a case where multiple failures can occur close to each other. In that case, the `DefaultScheduler`'s [restart logic](https://github.com/apache/flink/blob/768c04a501602cf706cd56df1e258e5a292c8da4/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java#L258) competes for each of these failures. If multiple failures refer to the same `Execution`, it might be that the restart due to one failure handling cleans up the failure already. This leads to an `IllegalArgumentException` when archiving the next failure refering to the same `Execution` as the root cause. The issue was that the code relied on `ExecutionVertexID`s provided by the `FailureHandlingResult` instead of the actual `Executions`. ## Brief change log The new implementation relies on the `Executions` that were present when the failure was handled. Therefore, `FailureHandlingResultSnapshot` is introduced. It extracts the `Execution` information from the `ExecutionGraph` while the failure is processed. Additionally, instead of accessing on `ExecutionVertex.getTaskNameWithSubtaskIndex()` to collect the task name, the new implementation relies on `Execution.getVertexWithAttempt()`. This enables us to solely rely on the `Execution` without this extra dependency on the `ExecutionVertex`. The new implementation also removes the `add` method from `RootExceptionHistoryEntry`. This makes the instantiation cleaner. `ExceptionHistoryEntryExtractor` was replaced by the factory methods `RootExceptionHistoryEntry.fromFailureHandlingResultSnapshot` and `RootExceptionHistoryEntry.fromFailureHandlingResultSnapshot` as part of this effort. ## Verifying this change * `DefaultSchedulerTest.testExceptionHistoryConcurrentRestart` is added to cover [FLINK-22276](https://issues.apache.org/jira/browse/FLINK-22276) * `FailureHandlingResultSnapshotTest` was added to test the filtering of failed `Executions` * `ExceptionHistoryEntryTest` and `RootExceptionHistoryEntryTest` were introduced to replace the test coverage which was provided by `ExceptionHistoryEntryExtractorTest` (except for the subtask ID tests that are not necessary anymore) * `JobExceptionsHandlerTest` was only adapted to consider the refactorings as part of this PR ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
