zhuzhurk commented on code in PR #22506:
URL: https://github.com/apache/flink/pull/22506#discussion_r1197573074


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java:
##########
@@ -51,22 +58,39 @@ public class ExecutionFailureHandler {
     /** Number of all restarts happened since this job is submitted. */
     private long numberOfRestarts;
 
+    private final Context taskFailureCtx;
+    private final Context globalFailureCtx;
+    private final Collection<FailureEnricher> failureEnrichers;
+    private final ComponentMainThreadExecutor mainThreadExecutor;
+
     /**
      * Creates the handler to deal with task failures.
      *
      * @param schedulingTopology contains the topology info for failover
      * @param failoverStrategy helps to decide tasks to restart on task 
failures
      * @param restartBackoffTimeStrategy helps to decide whether to restart 
failed tasks and the
      *     restarting delay
+     * @param mainThreadExecutor the main thread executor of the job master
+     * @param failureEnrichers a collection of {@link FailureEnricher} that 
enrich failures
+     * @param taskFailureCtx Task failure Context used by FailureEnrichers
+     * @param globalFailureCtx Global failure Context used by FailureEnrichers
      */
     public ExecutionFailureHandler(
             final SchedulingTopology schedulingTopology,
             final FailoverStrategy failoverStrategy,
-            final RestartBackoffTimeStrategy restartBackoffTimeStrategy) {
+            final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
+            final ComponentMainThreadExecutor mainThreadExecutor,
+            final Collection<FailureEnricher> failureEnrichers,
+            final Context taskFailureCtx,
+            final Context globalFailureCtx) {
 
         this.schedulingTopology = checkNotNull(schedulingTopology);
         this.failoverStrategy = checkNotNull(failoverStrategy);
         this.restartBackoffTimeStrategy = 
checkNotNull(restartBackoffTimeStrategy);
+        this.mainThreadExecutor = checkNotNull(mainThreadExecutor);
+        this.failureEnrichers = checkNotNull(failureEnrichers);
+        this.taskFailureCtx = taskFailureCtx;

Review Comment:
   We actually never expect this context to be null in production. 
   I see that it is not checked to be non-null because it can be `null` in 
tests. Usually we will introduce a `DummyFailureEnricherContext` for this 
purpose.
   But since there is a follow-up task FLINK-32114, I think we can refine it 
there. (maybe by introducing `FailureEnricherContextFactory` and 
`DummyFailureEnricherContextFactory`?)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java:
##########
@@ -107,26 +121,33 @@ public static RootExceptionHistoryEntry 
fromExceptionHistoryEntry(
     public static RootExceptionHistoryEntry fromGlobalFailure(ErrorInfo 
errorInfo) {
         Preconditions.checkNotNull(errorInfo, "errorInfo");
         return fromGlobalFailure(
-                errorInfo.getException(), errorInfo.getTimestamp(), 
Collections.emptyList());
+                errorInfo.getException(),
+                errorInfo.getTimestamp(),
+                FailureEnricherUtils.EMPTY_FAILURE_LABELS,
+                Collections.emptyList());
     }
 
     private static RootExceptionHistoryEntry createRootExceptionHistoryEntry(
             Throwable cause,
             long timestamp,
+            CompletableFuture<Map<String, String>> failureLabels,
             @Nullable String failingTaskName,
             @Nullable TaskManagerLocation taskManagerLocation,
             Iterable<Execution> executions) {
         return new RootExceptionHistoryEntry(
                 cause,
                 timestamp,
+                failureLabels,
                 failingTaskName,
                 taskManagerLocation,
                 StreamSupport.stream(executions.spliterator(), false)
                         .filter(execution -> 
execution.getFailureInfo().isPresent())
                         .map(
                                 execution ->
                                         ExceptionHistoryEntry.create(
-                                                execution, 
execution.getVertexWithAttempt()))
+                                                execution,
+                                                
execution.getVertexWithAttempt(),
+                                                failureLabels))

Review Comment:
   These concurrent failures can be quite different from the root one. 
Assigning the same failure labels to them may be confusing. e.g. labeled a 
concurrent Flink system error to be `USER` when the root failure is a `USER` 
error.
   Given that the concurrent failures are not failure root causes, I think they 
are not that critical. Therefore I think it's acceptable to assign empty labels 
to them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to