XComp opened a new pull request #15640:
URL: https://github.com/apache/flink/pull/15640


   ## What is the purpose of the change
   
   This commit fixes a case where multiple failures can occur close to each 
other. 
   In that case, the `DefaultScheduler`'s [restart 
logic](https://github.com/apache/flink/blob/768c04a501602cf706cd56df1e258e5a292c8da4/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java#L258)
 competes for each of these 
   failures. If multiple failures refer to the same `Execution`, it might be 
that the 
   restart due to one failure handling cleans up the failure already. This 
leads to 
   an `IllegalArgumentException` when archiving the next failure refering to 
the same 
   `Execution` as the root cause. The issue was that the code relied on 
`ExecutionVertexID`s provided by the `FailureHandlingResult` instead of the 
actual `Executions`. 
   
   
   ## Brief change log
   
   The new implementation relies on the `Executions` that were present when the 
   failure was handled. Therefore, `FailureHandlingResultSnapshot` is 
introduced. It 
   extracts the `Execution` information from the `ExecutionGraph` while the 
failure is processed.
   
   Additionally, instead of accessing on 
`ExecutionVertex.getTaskNameWithSubtaskIndex()`
   to collect the task name, the new implementation relies on 
   `Execution.getVertexWithAttempt()`. This enables us to solely rely on the 
`Execution` 
   without this extra dependency on the `ExecutionVertex`.
   
   The new implementation also removes the `add` method from 
`RootExceptionHistoryEntry`. 
   This makes the instantiation cleaner. `ExceptionHistoryEntryExtractor` was 
replaced 
   by the factory methods 
`RootExceptionHistoryEntry.fromFailureHandlingResultSnapshot` and 
   `RootExceptionHistoryEntry.fromFailureHandlingResultSnapshot` as part of 
this effort.
   
   
   ## Verifying this change
   
   * `DefaultSchedulerTest.testExceptionHistoryConcurrentRestart` is added to 
cover [FLINK-22276](https://issues.apache.org/jira/browse/FLINK-22276)
   * `FailureHandlingResultSnapshotTest` was added to test the filtering of 
failed `Executions`
   * `ExceptionHistoryEntryTest` and `RootExceptionHistoryEntryTest` were 
introduced to replace the test coverage which was provided by 
`ExceptionHistoryEntryExtractorTest` (except for the subtask ID tests that are 
not necessary anymore)
   * `JobExceptionsHandlerTest` was only adapted to consider the refactorings 
as part of this PR
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to