This is an automated email from the ASF dual-hosted git repository.

dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a9383fd4d51 [FLINK-31890][runtime] Introduce DefaultScheduler failure 
enrichment/labeling
a9383fd4d51 is described below

commit a9383fd4d51b1161292628145e2f427f574a07d4
Author: Panagiotis Garefalakis <[email protected]>
AuthorDate: Mon May 1 20:22:14 2023 -0700

    [FLINK-31890][runtime] Introduce DefaultScheduler failure 
enrichment/labeling
    
    * Introduce async task failure labeling as part of 
ExecutionFailureHandler#handleFailure (both for local and global failures)
    * Introduce two fields to ExceptionHistoryEntry: a transient 
CompletableFuture<Map<String, String>> failureLabelsFuture as well as a 
Map<String, String> failureLabels -- the failureLabels are set as soon as 
failureLabelsFuture is completed
    * Extend ExceptionHistoryEntry, FailureHandlingResult, 
FailureHandlingResultSnapshot to expose labels as part of ExceptionHistory
    * Extend existing tests (e.g., DefaultSchedulerTest, 
FailureHandlingResultTest) to validate functionality
---
 .../flink/core/failure/TestingFailureEnricher.java |  51 ++++++++++
 .../failover/flip1/ExecutionFailureHandler.java    |  40 +++++++-
 .../failover/flip1/FailureHandlingResult.java      |  31 +++++-
 .../runtime/failure/FailureEnricherUtils.java      |  12 ++-
 .../DefaultSlotPoolServiceSchedulerFactory.java    |   4 +
 .../apache/flink/runtime/jobmaster/JobMaster.java  |   7 ++
 .../jobmaster/SlotPoolServiceSchedulerFactory.java |   3 +
 .../factories/DefaultJobMasterServiceFactory.java  |   2 +
 .../flink/runtime/scheduler/DefaultScheduler.java  |  34 ++++++-
 .../runtime/scheduler/DefaultSchedulerFactory.java |   4 +
 .../flink/runtime/scheduler/SchedulerBase.java     |  18 +++-
 .../runtime/scheduler/SchedulerNGFactory.java      |   3 +
 .../adaptive/AdaptiveSchedulerFactory.java         |   3 +
 .../adaptive/StateWithExecutionGraph.java          |   5 +-
 .../adaptivebatch/AdaptiveBatchScheduler.java      |   7 +-
 .../AdaptiveBatchSchedulerFactory.java             |   5 +
 .../adaptivebatch/SpeculativeScheduler.java        |   8 +-
 .../exceptionhistory/ExceptionHistoryEntry.java    |  49 +++++++++-
 .../FailureHandlingResultSnapshot.java             |  15 +++
 .../RootExceptionHistoryEntry.java                 |  35 +++++--
 .../flip1/ExecutionFailureHandlerTest.java         |  28 +++++-
 .../failover/flip1/FailureHandlingResultTest.java  |  24 +++--
 .../runtime/failure/FailureEnricherUtilsTest.java  |  19 +++-
 .../runtime/jobmaster/JobMasterSchedulerTest.java  |   3 +
 .../runtime/jobmaster/utils/JobMasterBuilder.java  |   6 ++
 .../rest/handler/job/JobExceptionsHandlerTest.java |  58 ++++++++---
 .../runtime/scheduler/DefaultSchedulerBuilder.java |  13 +++
 .../runtime/scheduler/DefaultSchedulerTest.java    | 108 ++++++++++++++++++++-
 .../scheduler/TestingSchedulerNGFactory.java       |   3 +
 .../ExceptionHistoryEntryMatcher.java              |  37 ++++++-
 .../ExceptionHistoryEntryTest.java                 |  22 ++++-
 .../ExceptionHistoryEntryTestingUtils.java         |  35 +++++++
 .../FailureHandlingResultSnapshotTest.java         |  23 ++++-
 .../RootExceptionHistoryEntryTest.java             |  18 +++-
 34 files changed, 671 insertions(+), 62 deletions(-)

diff --git 
a/flink-core/src/test/java/org/apache/flink/core/failure/TestingFailureEnricher.java
 
b/flink-core/src/test/java/org/apache/flink/core/failure/TestingFailureEnricher.java
new file mode 100644
index 00000000000..d6a940871a0
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/core/failure/TestingFailureEnricher.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.failure;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/** A {@link FailureEnricher} for testing purposes tracking Throwables and 
output failure labels. */
+public class TestingFailureEnricher implements FailureEnricher {
+
+    final Set<Throwable> seenThrowables = new HashSet<>();
+    final Map<String, String> failureLabels = 
Collections.singletonMap("failKey", "failValue");
+
+    @Override
+    public Set<String> getOutputKeys() {
+        return Collections.singleton("failKey");
+    }
+
+    @Override
+    public CompletableFuture<Map<String, String>> processFailure(Throwable 
cause, Context context) {
+        seenThrowables.add(cause);
+        return CompletableFuture.completedFuture(failureLabels);
+    }
+
+    public Set<Throwable> getSeenThrowables() {
+        return seenThrowables;
+    }
+
+    public Map<String, String> getFailureLabels() {
+        return failureLabels;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
index 127fe18ee6b..cee213c7488 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
@@ -17,8 +17,12 @@
 
 package org.apache.flink.runtime.executiongraph.failover.flip1;
 
+import org.apache.flink.core.failure.FailureEnricher;
+import org.apache.flink.core.failure.FailureEnricher.Context;
 import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.failure.FailureEnricherUtils;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
@@ -28,8 +32,11 @@ import org.apache.flink.util.IterableUtils;
 
 import javax.annotation.Nullable;
 
+import java.util.Collection;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -51,6 +58,11 @@ public class ExecutionFailureHandler {
     /** Number of all restarts happened since this job is submitted. */
     private long numberOfRestarts;
 
+    private final Context taskFailureCtx;
+    private final Context globalFailureCtx;
+    private final Collection<FailureEnricher> failureEnrichers;
+    private final ComponentMainThreadExecutor mainThreadExecutor;
+
     /**
      * Creates the handler to deal with task failures.
      *
@@ -58,15 +70,27 @@ public class ExecutionFailureHandler {
      * @param failoverStrategy helps to decide tasks to restart on task 
failures
      * @param restartBackoffTimeStrategy helps to decide whether to restart 
failed tasks and the
      *     restarting delay
+     * @param mainThreadExecutor the main thread executor of the job master
+     * @param failureEnrichers a collection of {@link FailureEnricher} that 
enrich failures
+     * @param taskFailureCtx Task failure Context used by FailureEnrichers
+     * @param globalFailureCtx Global failure Context used by FailureEnrichers
      */
     public ExecutionFailureHandler(
             final SchedulingTopology schedulingTopology,
             final FailoverStrategy failoverStrategy,
-            final RestartBackoffTimeStrategy restartBackoffTimeStrategy) {
+            final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
+            final ComponentMainThreadExecutor mainThreadExecutor,
+            final Collection<FailureEnricher> failureEnrichers,
+            final Context taskFailureCtx,
+            final Context globalFailureCtx) {
 
         this.schedulingTopology = checkNotNull(schedulingTopology);
         this.failoverStrategy = checkNotNull(failoverStrategy);
         this.restartBackoffTimeStrategy = 
checkNotNull(restartBackoffTimeStrategy);
+        this.mainThreadExecutor = checkNotNull(mainThreadExecutor);
+        this.failureEnrichers = checkNotNull(failureEnrichers);
+        this.taskFailureCtx = taskFailureCtx;
+        this.globalFailureCtx = globalFailureCtx;
     }
 
     /**
@@ -109,6 +133,14 @@ public class ExecutionFailureHandler {
                 true);
     }
 
+    private CompletableFuture<Map<String, String>> labelFailure(Throwable 
cause, boolean isGlobal) {
+        if (failureEnrichers.isEmpty()) {
+            return FailureEnricherUtils.EMPTY_FAILURE_LABELS;
+        }
+        final Context ctx = isGlobal ? globalFailureCtx : taskFailureCtx;
+        return FailureEnricherUtils.labelFailure(cause, ctx, 
mainThreadExecutor, failureEnrichers);
+    }
+
     private FailureHandlingResult handleFailure(
             @Nullable final Execution failedExecution,
             final Throwable cause,
@@ -116,11 +148,15 @@ public class ExecutionFailureHandler {
             final Set<ExecutionVertexID> verticesToRestart,
             final boolean globalFailure) {
 
+        final CompletableFuture<Map<String, String>> failureLabels =
+                labelFailure(cause, globalFailure);
+
         if (isUnrecoverableError(cause)) {
             return FailureHandlingResult.unrecoverable(
                     failedExecution,
                     new JobException("The failure is not recoverable", cause),
                     timestamp,
+                    failureLabels,
                     globalFailure);
         }
 
@@ -132,6 +168,7 @@ public class ExecutionFailureHandler {
                     failedExecution,
                     cause,
                     timestamp,
+                    failureLabels,
                     verticesToRestart,
                     restartBackoffTimeStrategy.getBackoffTime(),
                     globalFailure);
@@ -141,6 +178,7 @@ public class ExecutionFailureHandler {
                     new JobException(
                             "Recovery is suppressed by " + 
restartBackoffTimeStrategy, cause),
                     timestamp,
+                    failureLabels,
                     globalFailure);
         }
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
index 828fbe243e7..ede19bbde15 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
@@ -24,8 +24,10 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.Collections;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -55,6 +57,9 @@ public class FailureHandlingResult {
     /** Failure reason. {@code @Nullable} because of FLINK-21376. */
     @Nullable private final Throwable error;
 
+    /** Future Map of string labels characterizing the failure. */
+    private final CompletableFuture<Map<String, String>> failureLabels;
+
     /** Failure timestamp. */
     private final long timestamp;
 
@@ -68,6 +73,7 @@ public class FailureHandlingResult {
      *     {@code null} as a value indicates that the failure was issued by 
Flink itself.
      * @param cause the exception that caused this failure.
      * @param timestamp the time the failure was handled.
+     * @param failureLabels collection of string tags characterizing the 
failure.
      * @param verticesToRestart containing task vertices to restart to recover 
from the failure.
      *     {@code null} indicates that the failure is not restartable.
      * @param restartDelayMS indicate a delay before conducting the restart
@@ -76,6 +82,7 @@ public class FailureHandlingResult {
             @Nullable Execution failedExecution,
             @Nullable Throwable cause,
             long timestamp,
+            CompletableFuture<Map<String, String>> failureLabels,
             @Nullable Set<ExecutionVertexID> verticesToRestart,
             long restartDelayMS,
             boolean globalFailure) {
@@ -85,6 +92,7 @@ public class FailureHandlingResult {
         this.restartDelayMS = restartDelayMS;
         this.failedExecution = failedExecution;
         this.error = cause;
+        this.failureLabels = failureLabels;
         this.timestamp = timestamp;
         this.globalFailure = globalFailure;
     }
@@ -96,16 +104,20 @@ public class FailureHandlingResult {
      *     {@code null} as a value indicates that the failure was issued by 
Flink itself.
      * @param error reason why the failure is not recoverable
      * @param timestamp the time the failure was handled.
+     * @param failureLabels collection of tags characterizing the failure as 
produced by the
+     *     FailureEnrichers
      */
     private FailureHandlingResult(
             @Nullable Execution failedExecution,
             @Nonnull Throwable error,
             long timestamp,
+            CompletableFuture<Map<String, String>> failureLabels,
             boolean globalFailure) {
         this.verticesToRestart = null;
         this.restartDelayMS = -1;
         this.failedExecution = failedExecution;
         this.error = checkNotNull(error);
+        this.failureLabels = failureLabels;
         this.timestamp = timestamp;
         this.globalFailure = globalFailure;
     }
@@ -159,6 +171,15 @@ public class FailureHandlingResult {
         return error;
     }
 
+    /**
+     * Returns the labels future associated with the failure.
+     *
+     * @return the CompletableFuture Map of String labels
+     */
+    public CompletableFuture<Map<String, String>> getFailureLabels() {
+        return failureLabels;
+    }
+
     /**
      * Returns the time of the failure.
      *
@@ -195,6 +216,8 @@ public class FailureHandlingResult {
      *     {@code null} as a value indicates that the failure was issued by 
Flink itself.
      * @param cause The reason of the failure.
      * @param timestamp The time of the failure.
+     * @param failureLabels Map of labels characterizing the failure produced 
by the
+     *     FailureEnrichers.
      * @param verticesToRestart containing task vertices to restart to recover 
from the failure.
      *     {@code null} indicates that the failure is not restartable.
      * @param restartDelayMS indicate a delay before conducting the restart
@@ -204,6 +227,7 @@ public class FailureHandlingResult {
             @Nullable Execution failedExecution,
             @Nullable Throwable cause,
             long timestamp,
+            CompletableFuture<Map<String, String>> failureLabels,
             @Nullable Set<ExecutionVertexID> verticesToRestart,
             long restartDelayMS,
             boolean globalFailure) {
@@ -211,6 +235,7 @@ public class FailureHandlingResult {
                 failedExecution,
                 cause,
                 timestamp,
+                failureLabels,
                 verticesToRestart,
                 restartDelayMS,
                 globalFailure);
@@ -226,13 +251,17 @@ public class FailureHandlingResult {
      *     {@code null} as a value indicates that the failure was issued by 
Flink itself.
      * @param error reason why the failure is not recoverable
      * @param timestamp The time of the failure.
+     * @param failureLabels Map of labels characterizing the failure produced 
by the
+     *     FailureEnrichers.
      * @return result indicating the failure is not recoverable
      */
     public static FailureHandlingResult unrecoverable(
             @Nullable Execution failedExecution,
             @Nonnull Throwable error,
             long timestamp,
+            CompletableFuture<Map<String, String>> failureLabels,
             boolean globalFailure) {
-        return new FailureHandlingResult(failedExecution, error, timestamp, 
globalFailure);
+        return new FailureHandlingResult(
+                failedExecution, error, timestamp, failureLabels, 
globalFailure);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java
index e9a111405a7..cf05e0a40cb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java
@@ -42,6 +42,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
@@ -49,6 +50,10 @@ import java.util.stream.Collectors;
 public class FailureEnricherUtils {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FailureEnricherUtils.class);
+
+    public static final CompletableFuture<Map<String, String>> 
EMPTY_FAILURE_LABELS =
+            CompletableFuture.completedFuture(Collections.emptyMap());
+
     // regex pattern to split the defined failure enrichers
     private static final Pattern enricherListPattern = 
Pattern.compile("\\s*,\\s*");
     static final String MERGE_EXCEPTION_MSG =
@@ -170,12 +175,14 @@ public class FailureEnricherUtils {
      *
      * @param cause the Throwable to label
      * @param context the context of the Throwable
+     * @param mainThreadExecutor the executor to complete the enricher 
labeling on
      * @param failureEnrichers a collection of FailureEnrichers to enrich the 
context with
      * @return a CompletableFuture that will complete with a map of labels
      */
     public static CompletableFuture<Map<String, String>> labelFailure(
             final Throwable cause,
             final Context context,
+            final Executor mainThreadExecutor,
             final Collection<FailureEnricher> failureEnrichers) {
         // list of CompletableFutures to enrich failure with labels from each 
enricher
         final Collection<CompletableFuture<Map<String, String>>> enrichFutures 
= new ArrayList<>();
@@ -204,7 +211,7 @@ public class FailureEnricherUtils {
         }
         // combine all CompletableFutures into a single CompletableFuture 
containing a Map of labels
         return FutureUtils.combineAll(enrichFutures)
-                .thenApply(
+                .thenApplyAsync(
                         labelsToMerge -> {
                             final Map<String, String> mergedLabels = new 
HashMap<>();
                             for (Map<String, String> labels : labelsToMerge) {
@@ -223,6 +230,7 @@ public class FailureEnricherUtils {
                                                         }));
                             }
                             return mergedLabels;
-                        });
+                        },
+                        mainThreadExecutor);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
index 8373c302c75..ec4146e7679 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blocklist.BlocklistOperations;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -57,6 +58,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
+import java.util.Collection;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -114,6 +116,7 @@ public final class DefaultSlotPoolServiceSchedulerFactory
             ComponentMainThreadExecutor mainThreadExecutor,
             FatalErrorHandler fatalErrorHandler,
             JobStatusListener jobStatusListener,
+            Collection<FailureEnricher> failureEnrichers,
             BlocklistOperations blocklistOperations)
             throws Exception {
         return schedulerNGFactory.createInstance(
@@ -136,6 +139,7 @@ public final class DefaultSlotPoolServiceSchedulerFactory
                 mainThreadExecutor,
                 fatalErrorHandler,
                 jobStatusListener,
+                failureEnrichers,
                 blocklistOperations);
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index c40f13c685c..755129605d2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.execution.CheckpointType;
 import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.blob.BlobWriter;
@@ -206,6 +207,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
 
     private final ExecutionDeploymentTracker executionDeploymentTracker;
     private final ExecutionDeploymentReconciler executionDeploymentReconciler;
+    private final Collection<FailureEnricher> failureEnrichers;
 
     // -------- Mutable fields ---------
 
@@ -243,6 +245,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
             ExecutionDeploymentTracker executionDeploymentTracker,
             ExecutionDeploymentReconciler.Factory 
executionDeploymentReconcilerFactory,
             BlocklistHandler.Factory blocklistHandlerFactory,
+            Collection<FailureEnricher> failureEnrichers,
             long initializationTimestamp)
             throws Exception {
 
@@ -345,6 +348,9 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
 
         this.jobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph);
         this.jobStatusListener = new JobManagerJobStatusListener();
+
+        this.failureEnrichers = checkNotNull(failureEnrichers);
+
         this.schedulerNG =
                 createScheduler(
                         slotPoolServiceSchedulerFactory,
@@ -389,6 +395,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
                         getMainThreadExecutor(),
                         fatalErrorHandler,
                         jobStatusListener,
+                        failureEnrichers,
                         blocklistHandler::addNewBlockedNodes);
 
         return scheduler;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java
index df8c40c86c4..02a610988da 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blocklist.BlocklistOperations;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -38,6 +39,7 @@ import org.apache.flink.runtime.shuffle.ShuffleMaster;
 
 import org.slf4j.Logger;
 
+import java.util.Collection;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -87,6 +89,7 @@ public interface SlotPoolServiceSchedulerFactory {
             ComponentMainThreadExecutor mainThreadExecutor,
             FatalErrorHandler fatalErrorHandler,
             JobStatusListener jobStatusListener,
+            Collection<FailureEnricher> failureEnrichers,
             BlocklistOperations blocklistOperations)
             throws Exception;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
index ed5c7903634..c65c11bd2a7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.util.function.FunctionUtils;
 
+import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -122,6 +123,7 @@ public class DefaultJobMasterServiceFactory implements 
JobMasterServiceFactory {
                         DefaultExecutionDeploymentReconciler::new,
                         BlocklistUtils.loadBlocklistHandlerFactory(
                                 jobMasterConfiguration.getConfiguration()),
+                        Collections.emptySet(),
                         initializationTimestamp);
 
         jobMaster.start();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index 7127c0f9986..5fdde343a10 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -22,6 +22,8 @@ package org.apache.flink.runtime.scheduler;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.failure.FailureEnricher;
+import org.apache.flink.core.failure.FailureEnricher.Context;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -36,6 +38,7 @@ import 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHa
 import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.failure.DefaultFailureEnricherContext;
 import org.apache.flink.runtime.io.network.partition.PartitionException;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
@@ -126,6 +129,7 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
             long initializationTimestamp,
             final ComponentMainThreadExecutor mainThreadExecutor,
             final JobStatusListener jobStatusListener,
+            final Collection<FailureEnricher> failureEnrichers,
             final ExecutionGraphFactory executionGraphFactory,
             final ShuffleMaster<?> shuffleMaster,
             final Time rpcTimeout,
@@ -167,9 +171,32 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
                 jobGraph.getName(),
                 jobGraph.getJobID());
 
+        final Context taskFailureCtx =
+                DefaultFailureEnricherContext.forTaskFailure(
+                        jobGraph.getJobID(),
+                        jobGraph.getName(),
+                        jobManagerJobMetricGroup,
+                        ioExecutor,
+                        userCodeLoader);
+
+        final Context globalFailureCtx =
+                DefaultFailureEnricherContext.forGlobalFailure(
+                        jobGraph.getJobID(),
+                        jobGraph.getName(),
+                        jobManagerJobMetricGroup,
+                        ioExecutor,
+                        userCodeLoader);
+
         this.executionFailureHandler =
                 new ExecutionFailureHandler(
-                        getSchedulingTopology(), failoverStrategy, 
restartBackoffTimeStrategy);
+                        getSchedulingTopology(),
+                        failoverStrategy,
+                        restartBackoffTimeStrategy,
+                        mainThreadExecutor,
+                        failureEnrichers,
+                        taskFailureCtx,
+                        globalFailureCtx);
+
         this.schedulingStrategy =
                 schedulingStrategyFactory.createInstance(this, 
getSchedulingTopology());
 
@@ -306,7 +333,10 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
         if (failureHandlingResult.canRestart()) {
             restartTasksWithDelay(failureHandlingResult);
         } else {
-            failJob(failureHandlingResult.getError(), 
failureHandlingResult.getTimestamp());
+            failJob(
+                    failureHandlingResult.getError(),
+                    failureHandlingResult.getTimestamp(),
+                    failureHandlingResult.getFailureLabels());
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
index 315bb2911ff..25c623e1846 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blocklist.BlocklistOperations;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -44,6 +45,7 @@ import 
org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
 
 import org.slf4j.Logger;
 
+import java.util.Collection;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -74,6 +76,7 @@ public class DefaultSchedulerFactory implements 
SchedulerNGFactory {
             final ComponentMainThreadExecutor mainThreadExecutor,
             final FatalErrorHandler fatalErrorHandler,
             final JobStatusListener jobStatusListener,
+            final Collection<FailureEnricher> failureEnrichers,
             final BlocklistOperations blocklistOperations)
             throws Exception {
 
@@ -146,6 +149,7 @@ public class DefaultSchedulerFactory implements 
SchedulerNGFactory {
                     }
                     jobStatusListener.jobStatusChanges(jobId, jobStatus, 
timestamp);
                 },
+                failureEnrichers,
                 executionGraphFactory,
                 shuffleMaster,
                 rpcTimeout,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 63cd10f6fa5..86587a0b934 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -527,11 +527,12 @@ public abstract class SchedulerBase implements 
SchedulerNG, CheckpointScheduling
         return mainThreadExecutor;
     }
 
-    protected void failJob(Throwable cause, long timestamp) {
+    protected void failJob(
+            Throwable cause, long timestamp, CompletableFuture<Map<String, 
String>> failureLabels) {
         incrementVersionsOfAllVertices();
         cancelAllPendingSlotRequestsInternal();
         executionGraph.failJob(cause, timestamp);
-        getJobTerminationFuture().thenRun(() -> archiveGlobalFailure(cause));
+        getJobTerminationFuture().thenRun(() -> archiveGlobalFailure(cause, 
failureLabels));
     }
 
     protected final SchedulingTopology getSchedulingTopology() {
@@ -677,19 +678,25 @@ public abstract class SchedulerBase implements 
SchedulerNG, CheckpointScheduling
         return executionGraph.getTerminationFuture();
     }
 
-    protected final void archiveGlobalFailure(Throwable failure) {
+    protected final void archiveGlobalFailure(
+            Throwable failure, CompletableFuture<Map<String, String>> 
failureLabels) {
         archiveGlobalFailure(
                 failure,
                 executionGraph.getStatusTimestamp(JobStatus.FAILED),
+                failureLabels,
                 
StreamSupport.stream(executionGraph.getAllExecutionVertices().spliterator(), 
false)
                         .map(ExecutionVertex::getCurrentExecutionAttempt)
                         .collect(Collectors.toSet()));
     }
 
     private void archiveGlobalFailure(
-            Throwable failure, long timestamp, Iterable<Execution> executions) 
{
+            Throwable failure,
+            long timestamp,
+            CompletableFuture<Map<String, String>> failureLabels,
+            Iterable<Execution> executions) {
         exceptionHistory.add(
-                RootExceptionHistoryEntry.fromGlobalFailure(failure, 
timestamp, executions));
+                RootExceptionHistoryEntry.fromGlobalFailure(
+                        failure, timestamp, failureLabels, executions));
         log.debug("Archive global failure.", failure);
     }
 
@@ -712,6 +719,7 @@ public abstract class SchedulerBase implements SchedulerNG, 
CheckpointScheduling
             archiveGlobalFailure(
                     failureHandlingResult.getRootCause(),
                     failureHandlingResult.getTimestamp(),
+                    failureHandlingResult.getFailureLabels(),
                     failureHandlingResult.getConcurrentlyFailedExecution());
         }
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java
index d3809b6b353..31b44c5aeca 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java
@@ -22,6 +22,7 @@ package org.apache.flink.runtime.scheduler;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blocklist.BlocklistOperations;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -37,6 +38,7 @@ import org.apache.flink.runtime.shuffle.ShuffleMaster;
 
 import org.slf4j.Logger;
 
+import java.util.Collection;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -63,6 +65,7 @@ public interface SchedulerNGFactory {
             ComponentMainThreadExecutor mainThreadExecutor,
             FatalErrorHandler fatalErrorHandler,
             JobStatusListener jobStatusListener,
+            Collection<FailureEnricher> failureEnrichers,
             BlocklistOperations blocklistOperations)
             throws Exception;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
index 1d6d91d0476..f91605fd5fd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.scheduler.adaptive;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blocklist.BlocklistOperations;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -47,6 +48,7 @@ import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.slf4j.Logger;
 
 import java.time.Duration;
+import java.util.Collection;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -83,6 +85,7 @@ public class AdaptiveSchedulerFactory implements 
SchedulerNGFactory {
             ComponentMainThreadExecutor mainThreadExecutor,
             FatalErrorHandler fatalErrorHandler,
             JobStatusListener jobStatusListener,
+            Collection<FailureEnricher> failureEnrichers,
             BlocklistOperations blocklistOperations)
             throws Exception {
         final DeclarativeSlotPool declarativeSlotPool =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
index 953f36a2573..7789e167f0e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.failure.FailureEnricherUtils;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -359,7 +360,9 @@ abstract class StateWithExecutionGraph implements State {
             final String taskName = 
maybeTaskName.orElseThrow(NoSuchElementException::new);
             final ExecutionState currentState = execution.getState();
             if (currentState == desiredState) {
-                failureCollection.add(ExceptionHistoryEntry.create(execution, 
taskName));
+                failureCollection.add(
+                        ExceptionHistoryEntry.create(
+                                execution, taskName, 
FailureEnricherUtils.EMPTY_FAILURE_LABELS));
                 onFailure(
                         ErrorInfo.handleMissingThrowable(
                                 
taskExecutionStateTransition.getError(userCodeClassLoader)));
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
index d6ddce977a4..afe64c27728 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint;
+import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
@@ -42,6 +43,7 @@ import 
org.apache.flink.runtime.executiongraph.ParallelismAndInputInfos;
 import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
 import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.failure.FailureEnricherUtils;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
@@ -70,6 +72,7 @@ import org.apache.flink.util.concurrent.ScheduledExecutor;
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -121,6 +124,7 @@ public class AdaptiveBatchScheduler extends 
DefaultScheduler {
             long initializationTimestamp,
             final ComponentMainThreadExecutor mainThreadExecutor,
             final JobStatusListener jobStatusListener,
+            final Collection<FailureEnricher> failureEnrichers,
             final ExecutionGraphFactory executionGraphFactory,
             final ShuffleMaster<?> shuffleMaster,
             final Time rpcTimeout,
@@ -150,6 +154,7 @@ public class AdaptiveBatchScheduler extends 
DefaultScheduler {
                 initializationTimestamp,
                 mainThreadExecutor,
                 jobStatusListener,
+                failureEnrichers,
                 executionGraphFactory,
                 shuffleMaster,
                 rpcTimeout,
@@ -295,7 +300,7 @@ public class AdaptiveBatchScheduler extends 
DefaultScheduler {
             }
         } catch (JobException ex) {
             log.error("Unexpected error occurred when initializing 
ExecutionJobVertex", ex);
-            failJob(ex, System.currentTimeMillis());
+            failJob(ex, System.currentTimeMillis(), 
FailureEnricherUtils.EMPTY_FAILURE_LABELS);
         }
 
         if (newlyInitializedJobVertices.size() > 0) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
index 3746b549ed1..8417b7b93ac 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import 
org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint;
+import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blocklist.BlocklistOperations;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -77,6 +78,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
@@ -113,6 +115,7 @@ public class AdaptiveBatchSchedulerFactory implements 
SchedulerNGFactory {
             ComponentMainThreadExecutor mainThreadExecutor,
             FatalErrorHandler fatalErrorHandler,
             JobStatusListener jobStatusListener,
+            Collection<FailureEnricher> failureEnrichers,
             BlocklistOperations blocklistOperations)
             throws Exception {
 
@@ -206,6 +209,7 @@ public class AdaptiveBatchSchedulerFactory implements 
SchedulerNGFactory {
                     initializationTimestamp,
                     mainThreadExecutor,
                     jobStatusListener,
+                    failureEnrichers,
                     executionGraphFactory,
                     shuffleMaster,
                     rpcTimeout,
@@ -237,6 +241,7 @@ public class AdaptiveBatchSchedulerFactory implements 
SchedulerNGFactory {
                     initializationTimestamp,
                     mainThreadExecutor,
                     jobStatusListener,
+                    failureEnrichers,
                     executionGraphFactory,
                     shuffleMaster,
                     rpcTimeout,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
index 4a5afad9007..e5c79a31076 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.BatchExecutionOptions;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint;
+import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
@@ -120,6 +121,7 @@ public class SpeculativeScheduler extends 
AdaptiveBatchScheduler
             long initializationTimestamp,
             final ComponentMainThreadExecutor mainThreadExecutor,
             final JobStatusListener jobStatusListener,
+            final Collection<FailureEnricher> failureEnrichers,
             final ExecutionGraphFactory executionGraphFactory,
             final ShuffleMaster<?> shuffleMaster,
             final Time rpcTimeout,
@@ -150,6 +152,7 @@ public class SpeculativeScheduler extends 
AdaptiveBatchScheduler
                 initializationTimestamp,
                 mainThreadExecutor,
                 jobStatusListener,
+                failureEnrichers,
                 executionGraphFactory,
                 shuffleMaster,
                 rpcTimeout,
@@ -285,7 +288,10 @@ public class SpeculativeScheduler extends 
AdaptiveBatchScheduler
             archiveFromFailureHandlingResult(
                     
createFailureHandlingResultSnapshot(failureHandlingResult));
         } else {
-            failJob(error, failureHandlingResult.getTimestamp());
+            failJob(
+                    error,
+                    failureHandlingResult.getTimestamp(),
+                    failureHandlingResult.getFailureLabels());
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntry.java
index 7d9429af0c6..4c393481b4d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntry.java
@@ -23,13 +23,18 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.failure.FailureEnricherUtils;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
 import java.util.StringJoiner;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * {@code ExceptionHistoryEntry} collects information about a single failure 
that triggered the
@@ -41,18 +46,25 @@ public class ExceptionHistoryEntry extends ErrorInfo {
 
     @Nullable private final String failingTaskName;
     @Nullable private final ArchivedTaskManagerLocation taskManagerLocation;
+    private final transient CompletableFuture<Map<String, String>> 
failureLabelsFuture;
+    /** Labels associated with the failure, set as soon as failureLabelsFuture 
is completed. */
+    private Map<String, String> failureLabels;
 
     /**
      * Creates an {@code ExceptionHistoryEntry} based on the provided {@code 
Execution}.
      *
      * @param failedExecution the failed {@code Execution}.
      * @param taskName the name of the task.
+     * @param failureLabels the labels associated with the failure.
      * @return The {@code ExceptionHistoryEntry}.
      * @throws NullPointerException if {@code null} is passed as one of the 
parameters.
      * @throws IllegalArgumentException if the passed {@code Execution} does 
not provide a {@link
      *     Execution#getFailureInfo() failureInfo}.
      */
-    public static ExceptionHistoryEntry create(AccessExecution 
failedExecution, String taskName) {
+    public static ExceptionHistoryEntry create(
+            AccessExecution failedExecution,
+            String taskName,
+            CompletableFuture<Map<String, String>> failureLabels) {
         Preconditions.checkNotNull(failedExecution, "No Execution is 
specified.");
         Preconditions.checkNotNull(taskName, "No task name is specified.");
         Preconditions.checkArgument(
@@ -63,6 +75,7 @@ public class ExceptionHistoryEntry extends ErrorInfo {
         return new ExceptionHistoryEntry(
                 failure.getException(),
                 failure.getTimestamp(),
+                failureLabels,
                 taskName,
                 failedExecution.getAssignedResourceLocation());
     }
@@ -70,7 +83,11 @@ public class ExceptionHistoryEntry extends ErrorInfo {
     /** Creates an {@code ExceptionHistoryEntry} that is not based on an 
{@code Execution}. */
     public static ExceptionHistoryEntry createGlobal(Throwable cause) {
         return new ExceptionHistoryEntry(
-                cause, System.currentTimeMillis(), null, 
(ArchivedTaskManagerLocation) null);
+                cause,
+                System.currentTimeMillis(),
+                FailureEnricherUtils.EMPTY_FAILURE_LABELS,
+                null,
+                (ArchivedTaskManagerLocation) null);
     }
 
     /**
@@ -78,6 +95,7 @@ public class ExceptionHistoryEntry extends ErrorInfo {
      *
      * @param cause The reason for the failure.
      * @param timestamp The time the failure was caught.
+     * @param failureLabels The labels associated with the failure.
      * @param failingTaskName The name of the task that failed.
      * @param taskManagerLocation The host the task was running on.
      * @throws NullPointerException if {@code cause} is {@code null}.
@@ -87,11 +105,13 @@ public class ExceptionHistoryEntry extends ErrorInfo {
     protected ExceptionHistoryEntry(
             Throwable cause,
             long timestamp,
+            CompletableFuture<Map<String, String>> failureLabels,
             @Nullable String failingTaskName,
             @Nullable TaskManagerLocation taskManagerLocation) {
         this(
                 cause,
                 timestamp,
+                failureLabels,
                 failingTaskName,
                 
ArchivedTaskManagerLocation.fromTaskManagerLocation(taskManagerLocation));
     }
@@ -99,11 +119,17 @@ public class ExceptionHistoryEntry extends ErrorInfo {
     private ExceptionHistoryEntry(
             Throwable cause,
             long timestamp,
+            CompletableFuture<Map<String, String>> failureLabels,
             @Nullable String failingTaskName,
             @Nullable ArchivedTaskManagerLocation taskManagerLocation) {
         super(cause, timestamp);
         this.failingTaskName = failingTaskName;
         this.taskManagerLocation = taskManagerLocation;
+        this.failureLabelsFuture =
+                Preconditions.checkNotNull(failureLabels)
+                        .thenApply(
+                                labelMap ->
+                                        this.failureLabels = 
Collections.unmodifiableMap(labelMap));
     }
 
     public boolean isGlobal() {
@@ -120,6 +146,25 @@ public class ExceptionHistoryEntry extends ErrorInfo {
         return taskManagerLocation;
     }
 
+    /**
+     * Returns the labels associated with the failure that is set as soon as 
failureLabelsFuture is
+     * completed. When failureLabelsFuture is not completed, it returns an 
empty map.
+     *
+     * @return Map of failure labels
+     */
+    public Map<String, String> getFailureLabels() {
+        return 
Optional.ofNullable(failureLabels).orElse(Collections.emptyMap());
+    }
+
+    /**
+     * Returns the labels future associated with the failure.
+     *
+     * @return CompletableFuture of Map failure labels
+     */
+    public CompletableFuture<Map<String, String>> getFailureLabelsFuture() {
+        return failureLabelsFuture;
+    }
+
     /**
      * {@code ArchivedTaskManagerLocation} represents a archived (static) 
version of a {@link
      * TaskManagerLocation}. It overcomes the issue with {@link 
TaskManagerLocation#inetAddress}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java
index 337509e26e8..9fdd21d8982 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java
@@ -30,8 +30,10 @@ import javax.annotation.Nullable;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -43,6 +45,7 @@ public class FailureHandlingResultSnapshot {
 
     @Nullable private final Execution rootCauseExecution;
     private final Throwable rootCause;
+    private final CompletableFuture<Map<String, String>> failureLabels;
     private final long timestamp;
     private final Set<Execution> concurrentlyFailedExecutions;
 
@@ -80,6 +83,7 @@ public class FailureHandlingResultSnapshot {
                 rootCauseExecution,
                 
ErrorInfo.handleMissingThrowable(failureHandlingResult.getError()),
                 failureHandlingResult.getTimestamp(),
+                failureHandlingResult.getFailureLabels(),
                 concurrentlyFailedExecutions);
     }
 
@@ -88,6 +92,7 @@ public class FailureHandlingResultSnapshot {
             @Nullable Execution rootCauseExecution,
             Throwable rootCause,
             long timestamp,
+            CompletableFuture<Map<String, String>> failureLabels,
             Set<Execution> concurrentlyFailedExecutions) {
         Preconditions.checkArgument(
                 rootCauseExecution == null
@@ -95,6 +100,7 @@ public class FailureHandlingResultSnapshot {
                 "The rootCauseExecution should not be part of the 
concurrentlyFailedExecutions map.");
 
         this.rootCauseExecution = rootCauseExecution;
+        this.failureLabels = failureLabels;
         this.rootCause = Preconditions.checkNotNull(rootCause);
         this.timestamp = timestamp;
         this.concurrentlyFailedExecutions =
@@ -120,6 +126,15 @@ public class FailureHandlingResultSnapshot {
         return rootCause;
     }
 
+    /**
+     * Returns the labels future associated with the failure.
+     *
+     * @return the CompletableFuture map of String labels
+     */
+    public CompletableFuture<Map<String, String>> getFailureLabels() {
+        return failureLabels;
+    }
+
     /**
      * The time the failure occurred.
      *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java
index 0ff9bd1a148..cfbad29716a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java
@@ -21,12 +21,15 @@ package org.apache.flink.runtime.scheduler.exceptionhistory;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.failure.FailureEnricherUtils;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
 import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
@@ -63,6 +66,7 @@ public class RootExceptionHistoryEntry extends 
ExceptionHistoryEntry {
         return createRootExceptionHistoryEntry(
                 snapshot.getRootCause(),
                 snapshot.getTimestamp(),
+                snapshot.getFailureLabels(),
                 failingTaskName,
                 taskManagerLocation,
                 snapshot.getConcurrentlyFailedExecution());
@@ -75,6 +79,7 @@ public class RootExceptionHistoryEntry extends 
ExceptionHistoryEntry {
      *
      * @param cause The reason for the failure.
      * @param timestamp The time the failure was caught.
+     * @param failureLabels Map of string labels associated with the failure.
      * @param executions The {@link Execution} instances that shall be 
analyzed for failures.
      * @return The {@code RootExceptionHistoryEntry} instance.
      * @throws NullPointerException if {@code failure} is {@code null}.
@@ -82,14 +87,23 @@ public class RootExceptionHistoryEntry extends 
ExceptionHistoryEntry {
      *     0}.
      */
     public static RootExceptionHistoryEntry fromGlobalFailure(
-            Throwable cause, long timestamp, Iterable<Execution> executions) {
-        return createRootExceptionHistoryEntry(cause, timestamp, null, null, 
executions);
+            Throwable cause,
+            long timestamp,
+            CompletableFuture<Map<String, String>> failureLabels,
+            Iterable<Execution> executions) {
+        return createRootExceptionHistoryEntry(
+                cause, timestamp, failureLabels, null, null, executions);
     }
 
     public static RootExceptionHistoryEntry fromExceptionHistoryEntry(
             ExceptionHistoryEntry entry, Iterable<ExceptionHistoryEntry> 
entries) {
         return new RootExceptionHistoryEntry(
-                entry.getException(), entry.getTimestamp(), null, null, 
entries);
+                entry.getException(),
+                entry.getTimestamp(),
+                entry.getFailureLabelsFuture(),
+                null,
+                null,
+                entries);
     }
 
     /**
@@ -107,18 +121,23 @@ public class RootExceptionHistoryEntry extends 
ExceptionHistoryEntry {
     public static RootExceptionHistoryEntry fromGlobalFailure(ErrorInfo 
errorInfo) {
         Preconditions.checkNotNull(errorInfo, "errorInfo");
         return fromGlobalFailure(
-                errorInfo.getException(), errorInfo.getTimestamp(), 
Collections.emptyList());
+                errorInfo.getException(),
+                errorInfo.getTimestamp(),
+                FailureEnricherUtils.EMPTY_FAILURE_LABELS,
+                Collections.emptyList());
     }
 
     private static RootExceptionHistoryEntry createRootExceptionHistoryEntry(
             Throwable cause,
             long timestamp,
+            CompletableFuture<Map<String, String>> failureLabels,
             @Nullable String failingTaskName,
             @Nullable TaskManagerLocation taskManagerLocation,
             Iterable<Execution> executions) {
         return new RootExceptionHistoryEntry(
                 cause,
                 timestamp,
+                failureLabels,
                 failingTaskName,
                 taskManagerLocation,
                 StreamSupport.stream(executions.spliterator(), false)
@@ -126,7 +145,9 @@ public class RootExceptionHistoryEntry extends 
ExceptionHistoryEntry {
                         .map(
                                 execution ->
                                         ExceptionHistoryEntry.create(
-                                                execution, 
execution.getVertexWithAttempt()))
+                                                execution,
+                                                
execution.getVertexWithAttempt(),
+                                                
FailureEnricherUtils.EMPTY_FAILURE_LABELS))
                         .collect(Collectors.toList()));
     }
 
@@ -135,6 +156,7 @@ public class RootExceptionHistoryEntry extends 
ExceptionHistoryEntry {
      *
      * @param cause The reason for the failure.
      * @param timestamp The time the failure was caught.
+     * @param failureLabels labels associated with the failure.
      * @param failingTaskName The name of the task that failed.
      * @param taskManagerLocation The host the task was running on.
      * @throws NullPointerException if {@code cause} is {@code null}.
@@ -145,10 +167,11 @@ public class RootExceptionHistoryEntry extends 
ExceptionHistoryEntry {
     public RootExceptionHistoryEntry(
             Throwable cause,
             long timestamp,
+            CompletableFuture<Map<String, String>> failureLabels,
             @Nullable String failingTaskName,
             @Nullable TaskManagerLocation taskManagerLocation,
             Iterable<ExceptionHistoryEntry> concurrentExceptions) {
-        super(cause, timestamp, failingTaskName, taskManagerLocation);
+        super(cause, timestamp, failureLabels, failingTaskName, 
taskManagerLocation);
         this.concurrentExceptions = concurrentExceptions;
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java
index 2c4406fa263..c3df9922c5b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.executiongraph.failover.flip1;
 
+import org.apache.flink.core.failure.TestingFailureEnricher;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -35,6 +37,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.util.Collections;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
 
@@ -59,6 +62,8 @@ class ExecutionFailureHandlerTest {
 
     private ExecutionFailureHandler executionFailureHandler;
 
+    private TestingFailureEnricher testingFailureEnricher;
+
     @BeforeEach
     void setUp() {
         TestingSchedulingTopology topology = new TestingSchedulingTopology();
@@ -66,10 +71,17 @@ class ExecutionFailureHandlerTest {
         schedulingTopology = topology;
 
         failoverStrategy = new TestFailoverStrategy();
+        testingFailureEnricher = new TestingFailureEnricher();
         backoffTimeStrategy = new TestRestartBackoffTimeStrategy(true, 
RESTART_DELAY_MS);
         executionFailureHandler =
                 new ExecutionFailureHandler(
-                        schedulingTopology, failoverStrategy, 
backoffTimeStrategy);
+                        schedulingTopology,
+                        failoverStrategy,
+                        backoffTimeStrategy,
+                        
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+                        Collections.singleton(testingFailureEnricher),
+                        null,
+                        null);
     }
 
     /** Tests the case that task restarting is accepted. */
@@ -94,6 +106,9 @@ class ExecutionFailureHandlerTest {
         assertThat(result.getVerticesToRestart()).isEqualTo(tasksToRestart);
         assertThat(result.getError()).isSameAs(cause);
         assertThat(result.getTimestamp()).isEqualTo(timestamp);
+        
assertThat(testingFailureEnricher.getSeenThrowables()).containsExactly(cause);
+        assertThat(result.getFailureLabels().get())
+                .isEqualTo(testingFailureEnricher.getFailureLabels());
         assertThat(executionFailureHandler.getNumberOfRestarts()).isEqualTo(1);
     }
 
@@ -116,6 +131,9 @@ class ExecutionFailureHandlerTest {
         assertThat(result.getFailedExecution().get()).isSameAs(execution);
         assertThat(result.getError()).hasCause(error);
         assertThat(result.getTimestamp()).isEqualTo(timestamp);
+        
assertThat(testingFailureEnricher.getSeenThrowables()).containsExactly(error);
+        assertThat(result.getFailureLabels().get())
+                .isEqualTo(testingFailureEnricher.getFailureLabels());
         
assertThat(ExecutionFailureHandler.isUnrecoverableError(result.getError())).isFalse();
 
         assertThatThrownBy(result::getVerticesToRestart)
@@ -147,6 +165,9 @@ class ExecutionFailureHandlerTest {
         assertThat(result.getFailedExecution().get()).isSameAs(execution);
         assertThat(result.getError()).isNotNull();
         
assertThat(ExecutionFailureHandler.isUnrecoverableError(result.getError())).isTrue();
+        
assertThat(testingFailureEnricher.getSeenThrowables()).containsExactly(error);
+        assertThat(result.getFailureLabels().get())
+                .isEqualTo(testingFailureEnricher.getFailureLabels());
         assertThat(result.getTimestamp()).isEqualTo(timestamp);
 
         assertThatThrownBy(result::getVerticesToRestart)
@@ -180,7 +201,7 @@ class ExecutionFailureHandlerTest {
     }
 
     @Test
-    void testGlobalFailureHandling() {
+    void testGlobalFailureHandling() throws ExecutionException, 
InterruptedException {
         final Throwable error = new Exception("Expected test failure");
         final long timestamp = System.currentTimeMillis();
         final FailureHandlingResult result =
@@ -193,6 +214,9 @@ class ExecutionFailureHandlerTest {
                                 .collect(Collectors.toSet()));
         assertThat(result.getError()).isSameAs(error);
         assertThat(result.getTimestamp()).isEqualTo(timestamp);
+        
assertThat(testingFailureEnricher.getSeenThrowables()).containsExactly(error);
+        assertThat(result.getFailureLabels().get())
+                .isEqualTo(testingFailureEnricher.getFailureLabels());
     }
 
     // ------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java
index a5df025989b..064aac9147e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java
@@ -27,8 +27,11 @@ import 
org.apache.flink.testutils.executor.TestExecutorExtension;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionGraph;
@@ -53,15 +56,19 @@ class FailureHandlingResultTest {
         Set<ExecutionVertexID> tasks = new HashSet<>();
         tasks.add(execution.getVertex().getID());
 
-        long delay = 1234;
-        Throwable error = new RuntimeException();
-        long timestamp = System.currentTimeMillis();
+        final long delay = 1234;
+        final Throwable error = new RuntimeException();
+        final long timestamp = System.currentTimeMillis();
+        final CompletableFuture<Map<String, String>> failureLabels =
+                
CompletableFuture.completedFuture(Collections.singletonMap("key", "value"));
         FailureHandlingResult result =
-                FailureHandlingResult.restartable(execution, error, timestamp, 
tasks, delay, false);
+                FailureHandlingResult.restartable(
+                        execution, error, timestamp, failureLabels, tasks, 
delay, false);
 
         assertThat(result.canRestart()).isTrue();
         assertThat(delay).isEqualTo(result.getRestartDelayMS());
         assertThat(tasks).isEqualTo(result.getVerticesToRestart());
+        assertThat(result.getFailureLabels()).isEqualTo(failureLabels);
         assertThat(result.getError()).isSameAs(error);
         assertThat(result.getTimestamp()).isEqualTo(timestamp);
         assertThat(result.getFailedExecution()).isPresent();
@@ -72,14 +79,17 @@ class FailureHandlingResultTest {
     @Test
     void 
testRestartingSuppressedFailureHandlingResultWithNoCausingExecutionVertexId() {
         // create a FailureHandlingResult with error
-        Throwable error = new Exception("test error");
-        long timestamp = System.currentTimeMillis();
+        final Throwable error = new Exception("test error");
+        final long timestamp = System.currentTimeMillis();
+        final CompletableFuture<Map<String, String>> failureLabels =
+                
CompletableFuture.completedFuture(Collections.singletonMap("key", "value"));
         FailureHandlingResult result =
-                FailureHandlingResult.unrecoverable(null, error, timestamp, 
false);
+                FailureHandlingResult.unrecoverable(null, error, timestamp, 
failureLabels, false);
 
         assertThat(result.canRestart()).isFalse();
         assertThat(result.getError()).isSameAs(error);
         assertThat(result.getTimestamp()).isEqualTo(timestamp);
+        assertThat(result.getFailureLabels()).isEqualTo(failureLabels);
         assertThat(result.getFailedExecution()).isNotPresent();
 
         assertThatThrownBy(result::getVerticesToRestart)
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/failure/FailureEnricherUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/failure/FailureEnricherUtilsTest.java
index d86b0e06e37..8eedf2d0be2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/failure/FailureEnricherUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/failure/FailureEnricherUtilsTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.core.failure.FailureEnricherFactory;
 import org.apache.flink.core.plugin.PluginManager;
 import org.apache.flink.core.plugin.TestingPluginManager;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.util.TestLoggerExtension;
 
 import org.apache.commons.collections.IteratorUtils;
@@ -163,7 +164,11 @@ class FailureEnricherUtilsTest {
         failureEnrichers.add(validEnricher);
 
         final CompletableFuture<Map<String, String>> result =
-                FailureEnricherUtils.labelFailure(cause, null, 
failureEnrichers);
+                FailureEnricherUtils.labelFailure(
+                        cause,
+                        null,
+                        
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+                        failureEnrichers);
 
         assertThatFuture(result)
                 .eventuallySucceeds()
@@ -188,7 +193,11 @@ class FailureEnricherUtilsTest {
         failureEnrichers.add(invalidEnricher);
 
         final CompletableFuture<Map<String, String>> result =
-                FailureEnricherUtils.labelFailure(cause, null, 
failureEnrichers);
+                FailureEnricherUtils.labelFailure(
+                        cause,
+                        null,
+                        
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+                        failureEnrichers);
         // Ignoring labels
         assertThatFuture(result).eventuallySucceeds().satisfies(labels -> 
labels.isEmpty());
     }
@@ -208,7 +217,11 @@ class FailureEnricherUtilsTest {
                 };
 
         final CompletableFuture<Map<String, String>> result =
-                FailureEnricherUtils.labelFailure(cause, null, enrichers);
+                FailureEnricherUtils.labelFailure(
+                        cause,
+                        null,
+                        
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+                        enrichers);
 
         try {
             result.get();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
index 14bc3760792..f5dd1e649da 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blocklist.BlocklistOperations;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -46,6 +47,7 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.slf4j.Logger;
 
+import java.util.Collection;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -121,6 +123,7 @@ public class JobMasterSchedulerTest extends TestLogger {
                 ComponentMainThreadExecutor mainThreadExecutor,
                 FatalErrorHandler fatalErrorHandler,
                 JobStatusListener jobStatusListener,
+                Collection<FailureEnricher> failureEnrichers,
                 BlocklistOperations blocklistOperations) {
             return TestingSchedulerNG.newBuilder()
                     .setStartSchedulingRunnable(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java
index 5c558449143..a6d30b41ff8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java
@@ -18,6 +18,7 @@
 package org.apache.flink.runtime.jobmaster.utils;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.runtime.blocklist.BlocklistHandler;
 import org.apache.flink.runtime.blocklist.NoOpBlocklistHandler;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
@@ -49,6 +50,8 @@ import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.shuffle.ShuffleTestUtils;
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 
 /** A builder for the {@link JobMaster}. */
@@ -84,6 +87,8 @@ public class JobMasterBuilder {
 
     private FatalErrorHandler fatalErrorHandler = error -> {};
 
+    private Collection<FailureEnricher> failureEnrichers = 
Collections.emptySet();
+
     private ExecutionDeploymentTracker executionDeploymentTracker =
             new DefaultExecutionDeploymentTracker();
     private ExecutionDeploymentReconciler.Factory 
executionDeploymentReconcilerFactory =
@@ -211,6 +216,7 @@ public class JobMasterBuilder {
                 executionDeploymentTracker,
                 executionDeploymentReconcilerFactory,
                 blocklistHandlerFactory,
+                failureEnrichers,
                 System.currentTimeMillis());
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
index 7fe27b6c2f4..021d13314b3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.executiongraph.ExecutionHistory;
+import org.apache.flink.runtime.failure.FailureEnricherUtils;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.HandlerRequestException;
@@ -67,6 +68,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
 
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
@@ -110,7 +112,8 @@ public class JobExceptionsHandlerTest extends TestLogger {
     }
 
     @Test
-    public void testOnlyRootCause() throws HandlerRequestException {
+    public void testOnlyRootCause()
+            throws HandlerRequestException, ExecutionException, 
InterruptedException {
         final Throwable rootCause = new RuntimeException("root cause");
         final long rootCauseTimestamp = System.currentTimeMillis();
 
@@ -132,7 +135,8 @@ public class JobExceptionsHandlerTest extends TestLogger {
     }
 
     @Test
-    public void testOnlyExceptionHistory() throws HandlerRequestException {
+    public void testOnlyExceptionHistory()
+            throws HandlerRequestException, ExecutionException, 
InterruptedException {
         final RuntimeException rootThrowable = new RuntimeException("exception 
#0");
         final long rootTimestamp = System.currentTimeMillis();
         final RootExceptionHistoryEntry rootEntry = 
fromGlobalFailure(rootThrowable, rootTimestamp);
@@ -152,13 +156,15 @@ public class JobExceptionsHandlerTest extends TestLogger {
     }
 
     @Test
-    public void testWithExceptionHistory() throws HandlerRequestException {
+    public void testWithExceptionHistory()
+            throws HandlerRequestException, ExecutionException, 
InterruptedException {
         final RootExceptionHistoryEntry rootCause =
                 fromGlobalFailure(new RuntimeException("exception #0"), 
System.currentTimeMillis());
         final RootExceptionHistoryEntry otherFailure =
                 new RootExceptionHistoryEntry(
                         new RuntimeException("exception #1"),
                         System.currentTimeMillis(),
+                        
CompletableFuture.completedFuture(Collections.singletonMap("key", "value")),
                         "task name",
                         new LocalTaskManagerLocation(),
                         Collections.emptySet());
@@ -178,6 +184,7 @@ public class JobExceptionsHandlerTest extends TestLogger {
                         historyContainsJobExceptionInfo(
                                 otherFailure.getException(),
                                 otherFailure.getTimestamp(),
+                                otherFailure.getFailureLabelsFuture(),
                                 otherFailure.getFailingTaskName(),
                                 JobExceptionsHandler.toString(
                                         otherFailure.getTaskManagerLocation()),
@@ -188,11 +195,12 @@ public class JobExceptionsHandlerTest extends TestLogger {
 
     @Test
     public void 
testWithLocalExceptionHistoryEntryNotHavingATaskManagerInformationAvailable()
-            throws HandlerRequestException {
+            throws HandlerRequestException, ExecutionException, 
InterruptedException {
         final RootExceptionHistoryEntry failure =
                 new RootExceptionHistoryEntry(
                         new RuntimeException("exception #1"),
                         System.currentTimeMillis(),
+                        
CompletableFuture.completedFuture(Collections.singletonMap("key", "value")),
                         "task name",
                         null,
                         Collections.emptySet());
@@ -209,6 +217,7 @@ public class JobExceptionsHandlerTest extends TestLogger {
                         historyContainsJobExceptionInfo(
                                 failure.getException(),
                                 failure.getTimestamp(),
+                                failure.getFailureLabelsFuture(),
                                 failure.getFailingTaskName(),
                                 
JobExceptionsHandler.toString(failure.getTaskManagerLocation()),
                                 JobExceptionsHandler.toTaskManagerId(
@@ -217,13 +226,14 @@ public class JobExceptionsHandlerTest extends TestLogger {
 
     @Test
     public void testWithExceptionHistoryWithTruncationThroughParameter()
-            throws HandlerRequestException {
+            throws HandlerRequestException, ExecutionException, 
InterruptedException {
         final RootExceptionHistoryEntry rootCause =
                 fromGlobalFailure(new RuntimeException("exception #0"), 
System.currentTimeMillis());
         final RootExceptionHistoryEntry otherFailure =
                 new RootExceptionHistoryEntry(
                         new RuntimeException("exception #1"),
                         System.currentTimeMillis(),
+                        
CompletableFuture.completedFuture(Collections.singletonMap("key", "value")),
                         "task name",
                         new LocalTaskManagerLocation(),
                         Collections.emptySet());
@@ -318,6 +328,7 @@ public class JobExceptionsHandlerTest extends TestLogger {
                         new RootExceptionHistoryEntry(
                                 failureCause,
                                 failureTimestamp,
+                                FailureEnricherUtils.EMPTY_FAILURE_LABELS,
                                 "test task #1",
                                 new LocalTaskManagerLocation(),
                                 Collections.emptySet()));
@@ -421,7 +432,13 @@ public class JobExceptionsHandlerTest extends TestLogger {
     }
 
     private static RootExceptionHistoryEntry fromGlobalFailure(Throwable 
cause, long timestamp) {
-        return new RootExceptionHistoryEntry(cause, timestamp, null, null, 
Collections.emptySet());
+        return new RootExceptionHistoryEntry(
+                cause,
+                timestamp,
+                FailureEnricherUtils.EMPTY_FAILURE_LABELS,
+                null,
+                null,
+                Collections.emptySet());
     }
 
     // -------- factory methods for instantiating new Matchers --------
@@ -430,14 +447,17 @@ public class JobExceptionsHandlerTest extends TestLogger {
     private static Matcher<RootExceptionInfo> historyContainsJobExceptionInfo(
             Throwable expectedFailureCause,
             long expectedFailureTimestamp,
+            CompletableFuture<Map<String, String>> expectedFailureLabels,
             String expectedTaskNameWithSubtaskId,
             String expectedTaskManagerLocation,
             String expectedTaskManagerId,
-            Matcher<ExceptionInfo>... concurrentExceptionMatchers) {
+            Matcher<ExceptionInfo>... concurrentExceptionMatchers)
+            throws ExecutionException, InterruptedException {
         return new RootExceptionInfoMatcher(
                 matchesFailure(
                         expectedFailureCause,
                         expectedFailureTimestamp,
+                        expectedFailureLabels,
                         expectedTaskNameWithSubtaskId,
                         expectedTaskManagerLocation,
                         expectedTaskManagerId),
@@ -448,21 +468,31 @@ public class JobExceptionsHandlerTest extends TestLogger {
     private static Matcher<RootExceptionInfo> historyContainsGlobalFailure(
             Throwable expectedFailureCause,
             long expectedFailureTimestamp,
-            Matcher<ExceptionInfo>... concurrentExceptionMatchers) {
+            Matcher<ExceptionInfo>... concurrentExceptionMatchers)
+            throws ExecutionException, InterruptedException {
         return new RootExceptionInfoMatcher(
-                matchesFailure(expectedFailureCause, expectedFailureTimestamp, 
null, null, null),
+                matchesFailure(
+                        expectedFailureCause,
+                        expectedFailureTimestamp,
+                        FailureEnricherUtils.EMPTY_FAILURE_LABELS,
+                        null,
+                        null,
+                        null),
                 concurrentExceptionMatchers);
     }
 
     private static Matcher<ExceptionInfo> matchesFailure(
             Throwable expectedFailureCause,
             long expectedFailureTimestamp,
+            CompletableFuture<Map<String, String>> expectedFailureLabels,
             String expectedTaskNameWithSubtaskId,
             String expectedTaskManagerLocation,
-            String expectedTaskManagerId) {
+            String expectedTaskManagerId)
+            throws ExecutionException, InterruptedException {
         return new ExceptionInfoMatcher(
                 expectedFailureCause,
                 expectedFailureTimestamp,
+                expectedFailureLabels,
                 expectedTaskNameWithSubtaskId,
                 expectedTaskManagerLocation,
                 expectedTaskManagerId);
@@ -519,6 +549,7 @@ public class JobExceptionsHandlerTest extends TestLogger {
 
         private final Throwable expectedException;
         private final long expectedTimestamp;
+        private final Map<String, String> expectedFailureLabels;
         private final String expectedTaskName;
         private final String expectedLocation;
         private final String expectedTaskManagerId;
@@ -526,11 +557,14 @@ public class JobExceptionsHandlerTest extends TestLogger {
         private ExceptionInfoMatcher(
                 Throwable expectedException,
                 long expectedTimestamp,
+                CompletableFuture<Map<String, String>> expectedFailureLabels,
                 String expectedTaskName,
                 String expectedLocation,
-                String expectedTaskManagerId) {
+                String expectedTaskManagerId)
+                throws ExecutionException, InterruptedException {
             this.expectedException = 
deserializeSerializedThrowable(expectedException);
             this.expectedTimestamp = expectedTimestamp;
+            this.expectedFailureLabels = expectedFailureLabels.get();
             this.expectedTaskName = expectedTaskName;
             this.expectedLocation = expectedLocation;
             this.expectedTaskManagerId = expectedTaskManagerId;
@@ -545,6 +579,8 @@ public class JobExceptionsHandlerTest extends TestLogger {
                     .appendText(getExpectedStacktrace())
                     .appendText(", timestamp=")
                     .appendText(String.valueOf(expectedTimestamp))
+                    .appendText(", failureLabels=")
+                    .appendText(String.valueOf(expectedFailureLabels))
                     .appendText(", taskName=")
                     .appendText(expectedTaskName)
                     .appendText(", location=")
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java
index 56c7b29ebd3..38df9476854 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.BatchExecutionOptions;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint;
+import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.blocklist.BlocklistOperations;
@@ -62,7 +63,9 @@ import 
org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Function;
@@ -102,6 +105,7 @@ public class DefaultSchedulerBuilder {
     private ExecutionSlotAllocatorFactory executionSlotAllocatorFactory =
             new TestExecutionSlotAllocatorFactory();
     private JobStatusListener jobStatusListener = (ignoredA, ignoredB, 
ignoredC) -> {};
+    private Collection<FailureEnricher> failureEnrichers = new HashSet<>();
     private ExecutionDeployer.Factory executionDeployerFactory =
             new DefaultExecutionDeployer.Factory();
     private VertexParallelismAndInputInfosDecider 
vertexParallelismAndInputInfosDecider =
@@ -246,6 +250,12 @@ public class DefaultSchedulerBuilder {
         return this;
     }
 
+    public DefaultSchedulerBuilder setFailureEnrichers(
+            Collection<FailureEnricher> failureEnrichers) {
+        this.failureEnrichers = failureEnrichers;
+        return this;
+    }
+
     public DefaultSchedulerBuilder setExecutionDeployerFactory(
             ExecutionDeployer.Factory executionDeployerFactory) {
         this.executionDeployerFactory = executionDeployerFactory;
@@ -301,6 +311,7 @@ public class DefaultSchedulerBuilder {
                 System.currentTimeMillis(),
                 mainThreadExecutor,
                 jobStatusListener,
+                failureEnrichers,
                 createExecutionGraphFactory(false),
                 shuffleMaster,
                 rpcTimeout,
@@ -329,6 +340,7 @@ public class DefaultSchedulerBuilder {
                 System.currentTimeMillis(),
                 mainThreadExecutor,
                 jobStatusListener,
+                failureEnrichers,
                 createExecutionGraphFactory(true),
                 shuffleMaster,
                 rpcTimeout,
@@ -360,6 +372,7 @@ public class DefaultSchedulerBuilder {
                 System.currentTimeMillis(),
                 mainThreadExecutor,
                 jobStatusListener,
+                failureEnrichers,
                 createExecutionGraphFactory(true, new 
SpeculativeExecutionJobVertex.Factory()),
                 shuffleMaster,
                 rpcTimeout,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index bc6df6ee0a8..5e60f34cdd8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.core.failure.FailureEnricher;
+import org.apache.flink.core.failure.TestingFailureEnricher;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.core.testutils.ScheduledTask;
 import org.apache.flink.metrics.Gauge;
@@ -57,6 +59,7 @@ import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRe
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import 
org.apache.flink.runtime.executiongraph.utils.TestFailoverStrategyFactory;
+import org.apache.flink.runtime.failure.FailureEnricherUtils;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import 
org.apache.flink.runtime.io.network.partition.TestingJobMasterPartitionTracker;
@@ -111,6 +114,7 @@ import org.slf4j.Logger;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -969,7 +973,10 @@ public class DefaultSchedulerTest extends TestLogger {
         final ExecutionVertexVersion executionVertexVersion =
                 
executionVertexVersioner.getExecutionVertexVersion(onlyExecutionVertexId);
 
-        scheduler.failJob(new FlinkException("Test failure."), 
System.currentTimeMillis());
+        scheduler.failJob(
+                new FlinkException("Test failure."),
+                System.currentTimeMillis(),
+                FailureEnricherUtils.EMPTY_FAILURE_LABELS);
 
         
assertThat(executionVertexVersioner.isModified(executionVertexVersion)).isTrue();
     }
@@ -1297,6 +1304,73 @@ public class DefaultSchedulerTest extends TestLogger {
                 .isTrue();
     }
 
+    /** Verify DefaultScheduler propagates Task failure labels as generated by 
Failure Enrichers. */
+    @Test
+    void testTaskFailureWithFailureEnricherLabels() {
+        final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+        final TestingFailureEnricher testingFailureEnricher = new 
TestingFailureEnricher();
+        final DefaultScheduler scheduler =
+                createSchedulerAndStartScheduling(
+                        jobGraph, 
Collections.singleton(testingFailureEnricher));
+
+        final ExecutionAttemptID firstAttempt =
+                Iterables.getOnlyElement(
+                                scheduler
+                                        .requestJob()
+                                        .getArchivedExecutionGraph()
+                                        .getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final RuntimeException firstException = new RuntimeException("First 
exception");
+        final long firstFailTimestamp = initiateFailure(scheduler, 
firstAttempt, firstException);
+        taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+        final ArchivedExecutionVertex executionVertex =
+                Iterables.getOnlyElement(
+                        scheduler
+                                .requestJob()
+                                .getArchivedExecutionGraph()
+                                .getAllExecutionVertices());
+
+        // Make sure FailureEnricher is triggered
+        assertThat(testingFailureEnricher.getSeenThrowables().stream().map(t 
-> t.getMessage()))
+                .contains(firstException.getMessage());
+        // And failure labels are part of ExceptionHistory
+        assertThat(scheduler.getExceptionHistory())
+                .map(entry -> entry.getFailureLabelsFuture().get())
+                .contains(testingFailureEnricher.getFailureLabels());
+
+        assertThat(scheduler.getExceptionHistory())
+                .anySatisfy(
+                        e ->
+                                
ExceptionHistoryEntryTestingUtils.matchesFailure(
+                                        e,
+                                        firstException,
+                                        firstFailTimestamp,
+                                        
testingFailureEnricher.getFailureLabels()));
+
+        final RuntimeException anotherException = new 
RuntimeException("Another exception");
+        final long anotherFailTimestamp =
+                initiateFailure(
+                        scheduler,
+                        
executionVertex.getCurrentExecutionAttempt().getAttemptId(),
+                        anotherException);
+
+        taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+        assertThat(testingFailureEnricher.getSeenThrowables().stream().map(t 
-> t.getMessage()))
+                .contains(anotherException.getMessage());
+
+        assertThat(scheduler.getExceptionHistory())
+                .anySatisfy(
+                        e ->
+                                
ExceptionHistoryEntryTestingUtils.matchesFailure(
+                                        e,
+                                        anotherException,
+                                        anotherFailTimestamp,
+                                        
testingFailureEnricher.getFailureLabels()));
+    }
+
     @Test
     void testExceptionHistoryWithPreDeployFailure() {
         // disable auto-completing slot requests to simulate timeout
@@ -1934,6 +2008,14 @@ public class DefaultSchedulerTest extends TestLogger {
         return sortedVertices.get(0);
     }
 
+    private DefaultScheduler createSchedulerAndStartScheduling(
+            final JobGraph jobGraph, final Collection<FailureEnricher> 
failureEnrichers) {
+        return createSchedulerAndStartScheduling(
+                jobGraph,
+                ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+                failureEnrichers);
+    }
+
     private DefaultScheduler createSchedulerAndStartScheduling(final JobGraph 
jobGraph) {
         return createSchedulerAndStartScheduling(
                 jobGraph, 
ComponentMainThreadExecutorServiceAdapter.forMainThread());
@@ -1941,10 +2023,18 @@ public class DefaultSchedulerTest extends TestLogger {
 
     private DefaultScheduler createSchedulerAndStartScheduling(
             final JobGraph jobGraph, final ComponentMainThreadExecutor 
mainThreadExecutor) {
+        return createSchedulerAndStartScheduling(
+                jobGraph, mainThreadExecutor, Collections.emptySet());
+    }
+
+    private DefaultScheduler createSchedulerAndStartScheduling(
+            final JobGraph jobGraph,
+            final ComponentMainThreadExecutor mainThreadExecutor,
+            final Collection<FailureEnricher> failureEnrichers) {
 
         try {
             final DefaultScheduler scheduler =
-                    createSchedulerBuilder(jobGraph, 
mainThreadExecutor).build();
+                    createSchedulerBuilder(jobGraph, mainThreadExecutor, 
failureEnrichers).build();
             mainThreadExecutor.execute(scheduler::startScheduling);
             return scheduler;
         } catch (Exception e) {
@@ -1957,7 +2047,7 @@ public class DefaultSchedulerTest extends TestLogger {
             final ComponentMainThreadExecutor mainThreadExecutor,
             final SchedulingStrategyFactory schedulingStrategyFactory)
             throws Exception {
-        return createSchedulerBuilder(jobGraph, mainThreadExecutor)
+        return createSchedulerBuilder(jobGraph, mainThreadExecutor, 
Collections.emptySet())
                 .setSchedulingStrategyFactory(schedulingStrategyFactory)
                 .build();
     }
@@ -1968,7 +2058,7 @@ public class DefaultSchedulerTest extends TestLogger {
             final SchedulingStrategyFactory schedulingStrategyFactory,
             final FailoverStrategy.Factory failoverStrategyFactory)
             throws Exception {
-        return createSchedulerBuilder(jobGraph, mainThreadExecutor)
+        return createSchedulerBuilder(jobGraph, mainThreadExecutor, 
Collections.emptySet())
                 .setSchedulingStrategyFactory(schedulingStrategyFactory)
                 .setFailoverStrategyFactory(failoverStrategyFactory)
                 .build();
@@ -1981,7 +2071,7 @@ public class DefaultSchedulerTest extends TestLogger {
             final FailoverStrategy.Factory failoverStrategyFactory,
             final ScheduledExecutor delayExecutor)
             throws Exception {
-        return createSchedulerBuilder(jobGraph, mainThreadExecutor)
+        return createSchedulerBuilder(jobGraph, mainThreadExecutor, 
Collections.emptySet())
                 .setDelayExecutor(delayExecutor)
                 .setSchedulingStrategyFactory(schedulingStrategyFactory)
                 .setFailoverStrategyFactory(failoverStrategyFactory)
@@ -1990,6 +2080,13 @@ public class DefaultSchedulerTest extends TestLogger {
 
     private DefaultSchedulerBuilder createSchedulerBuilder(
             final JobGraph jobGraph, final ComponentMainThreadExecutor 
mainThreadExecutor) {
+        return createSchedulerBuilder(jobGraph, mainThreadExecutor, 
Collections.emptySet());
+    }
+
+    private DefaultSchedulerBuilder createSchedulerBuilder(
+            final JobGraph jobGraph,
+            final ComponentMainThreadExecutor mainThreadExecutor,
+            final Collection<FailureEnricher> failureEnrichers) {
         return new DefaultSchedulerBuilder(
                         jobGraph,
                         mainThreadExecutor,
@@ -2004,6 +2101,7 @@ public class DefaultSchedulerTest extends TestLogger {
                 .setExecutionOperations(testExecutionOperations)
                 .setExecutionVertexVersioner(executionVertexVersioner)
                 
.setExecutionSlotAllocatorFactory(executionSlotAllocatorFactory)
+                .setFailureEnrichers(failureEnrichers)
                 .setShuffleMaster(shuffleMaster)
                 .setPartitionTracker(partitionTracker)
                 .setRpcTimeout(timeout);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java
index 53b31232a97..7108ac3b4aa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.scheduler;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blocklist.BlocklistOperations;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -36,6 +37,7 @@ import org.apache.flink.runtime.shuffle.ShuffleMaster;
 
 import org.slf4j.Logger;
 
+import java.util.Collection;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -71,6 +73,7 @@ public class TestingSchedulerNGFactory implements 
SchedulerNGFactory {
             ComponentMainThreadExecutor mainThreadExecutor,
             FatalErrorHandler fatalErrorHandler,
             JobStatusListener jobStatusListener,
+            Collection<FailureEnricher> failureEnrichers,
             BlocklistOperations blocklistOperations)
             throws Exception {
         return schedulerNG;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryMatcher.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryMatcher.java
index d7665057d05..66fd9e3afd7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryMatcher.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryMatcher.java
@@ -25,12 +25,18 @@ import org.hamcrest.Description;
 import org.hamcrest.Matcher;
 import org.hamcrest.TypeSafeDiagnosingMatcher;
 
+import java.util.Collections;
+import java.util.Map;
+
 /** Matches {@link ExceptionHistoryEntry} instances. */
 public class ExceptionHistoryEntryMatcher extends 
TypeSafeDiagnosingMatcher<ExceptionHistoryEntry> {
 
     public static Matcher<ExceptionHistoryEntry> matchesGlobalFailure(
-            Throwable expectedException, long expectedTimestamp) {
-        return matchesFailure(expectedException, expectedTimestamp, null, 
null);
+            Throwable expectedException,
+            long expectedTimestamp,
+            Map<String, String> expectedFailureLabels) {
+        return matchesFailure(
+                expectedException, expectedTimestamp, expectedFailureLabels, 
null, null);
     }
 
     public static Matcher<ExceptionHistoryEntry> matchesFailure(
@@ -41,22 +47,40 @@ public class ExceptionHistoryEntryMatcher extends 
TypeSafeDiagnosingMatcher<Exce
         return new ExceptionHistoryEntryMatcher(
                 expectedException,
                 expectedTimestamp,
+                Collections.emptyMap(),
+                expectedTaskName,
+                expectedTaskManagerLocation);
+    }
+
+    public static Matcher<ExceptionHistoryEntry> matchesFailure(
+            Throwable expectedException,
+            long expectedTimestamp,
+            Map<String, String> expectedFailureLabels,
+            String expectedTaskName,
+            TaskManagerLocation expectedTaskManagerLocation) {
+        return new ExceptionHistoryEntryMatcher(
+                expectedException,
+                expectedTimestamp,
+                expectedFailureLabels,
                 expectedTaskName,
                 expectedTaskManagerLocation);
     }
 
     private final Throwable expectedException;
     private final long expectedTimestamp;
+    private final Map<String, String> expectedFailureLabels;
     private final String expectedTaskName;
     private final ArchivedTaskManagerLocationMatcher 
taskManagerLocationMatcher;
 
     public ExceptionHistoryEntryMatcher(
             Throwable expectedException,
             long expectedTimestamp,
+            Map<String, String> expectedFailureLabels,
             String expectedTaskName,
             TaskManagerLocation expectedTaskManagerLocation) {
         this.expectedException = expectedException;
         this.expectedTimestamp = expectedTimestamp;
+        this.expectedFailureLabels = expectedFailureLabels;
         this.expectedTaskName = expectedTaskName;
         this.taskManagerLocationMatcher =
                 new 
ArchivedTaskManagerLocationMatcher(expectedTaskManagerLocation);
@@ -87,6 +111,13 @@ public class ExceptionHistoryEntryMatcher extends 
TypeSafeDiagnosingMatcher<Exce
             match = false;
         }
 
+        if 
(!exceptionHistoryEntry.getFailureLabelsFuture().equals(expectedFailureLabels)) 
{
+            description
+                    .appendText(" actualFailureLabels=")
+                    
.appendText(String.valueOf(exceptionHistoryEntry.getFailureLabelsFuture()));
+            match = false;
+        }
+
         if (exceptionHistoryEntry.getFailingTaskName() == null) {
             if (expectedTaskName != null) {
                 description.appendText(" actualTaskName=null");
@@ -113,6 +144,8 @@ public class ExceptionHistoryEntryMatcher extends 
TypeSafeDiagnosingMatcher<Exce
                 
.appendText(ExceptionUtils.stringifyException(expectedException))
                 .appendText(" expectedTimestamp=")
                 .appendText(String.valueOf(expectedTimestamp))
+                .appendText(" expectedFailureLabels=")
+                .appendText(String.valueOf(expectedFailureLabels))
                 .appendText(" expectedTaskName=")
                 .appendText(expectedTaskName)
                 .appendText(" expectedTaskManagerLocation=");
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTest.java
index ff49166a5a2..ac36d14f1c5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTest.java
@@ -20,12 +20,17 @@ package org.apache.flink.runtime.scheduler.exceptionhistory;
 
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.failure.FailureEnricherUtils;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
 import static 
org.apache.flink.runtime.scheduler.exceptionhistory.ArchivedTaskManagerLocationMatcher.isArchivedTaskManagerLocation;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.nullValue;
@@ -45,8 +50,11 @@ public class ExceptionHistoryEntryTest extends TestLogger {
                         .withTaskManagerLocation(taskManagerLocation)
                         .build();
         final String taskName = "task name";
+        final Map<String, String> failureLabels = 
Collections.singletonMap("key", "value");
 
-        final ExceptionHistoryEntry entry = 
ExceptionHistoryEntry.create(execution, taskName);
+        final ExceptionHistoryEntry entry =
+                ExceptionHistoryEntry.create(
+                        execution, taskName, 
CompletableFuture.completedFuture(failureLabels));
 
         assertThat(
                 
entry.getException().deserializeError(ClassLoader.getSystemClassLoader()),
@@ -56,6 +64,7 @@ public class ExceptionHistoryEntryTest extends TestLogger {
         assertThat(
                 entry.getTaskManagerLocation(), 
isArchivedTaskManagerLocation(taskManagerLocation));
         assertThat(entry.isGlobal(), is(false));
+        assertThat(entry.getFailureLabels(), is(failureLabels));
     }
 
     @Test(expected = IllegalArgumentException.class)
@@ -64,12 +73,13 @@ public class ExceptionHistoryEntryTest extends TestLogger {
                 TestingAccessExecution.newBuilder()
                         .withTaskManagerLocation(new 
LocalTaskManagerLocation())
                         .build(),
-                "task name");
+                "task name",
+                FailureEnricherUtils.EMPTY_FAILURE_LABELS);
     }
 
     @Test(expected = NullPointerException.class)
     public void testNullExecution() {
-        ExceptionHistoryEntry.create(null, "task name");
+        ExceptionHistoryEntry.create(null, "task name", 
FailureEnricherUtils.EMPTY_FAILURE_LABELS);
     }
 
     @Test(expected = NullPointerException.class)
@@ -82,7 +92,8 @@ public class ExceptionHistoryEntryTest extends TestLogger {
                                         System.currentTimeMillis()))
                         .withTaskManagerLocation(new 
LocalTaskManagerLocation())
                         .build(),
-                null);
+                null,
+                FailureEnricherUtils.EMPTY_FAILURE_LABELS);
     }
 
     @Test
@@ -97,7 +108,8 @@ public class ExceptionHistoryEntryTest extends TestLogger {
                                 .withTaskManagerLocation(null)
                                 .withErrorInfo(new ErrorInfo(failure, 
timestamp))
                                 .build(),
-                        taskName);
+                        taskName,
+                        FailureEnricherUtils.EMPTY_FAILURE_LABELS);
 
         assertThat(
                 
entry.getException().deserializeError(ClassLoader.getSystemClassLoader()),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTestingUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTestingUtils.java
index 08fad7e96b9..7e1cb14d0a6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTestingUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTestingUtils.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.scheduler.exceptionhistory;
 
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
+import java.util.Collections;
+import java.util.Map;
 import java.util.Objects;
 
 import static 
org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry.ArchivedTaskManagerLocation.fromTaskManagerLocation;
@@ -36,6 +38,34 @@ public class ExceptionHistoryEntryTestingUtils {
     }
 
     public static boolean matchesFailure(
+            ExceptionHistoryEntry exceptionHistoryEntry,
+            Throwable expectedException,
+            long expectedTimestamp,
+            Map<String, String> expectedFailureLabels) {
+        return matchesInternal(
+                exceptionHistoryEntry,
+                expectedException,
+                expectedTimestamp,
+                null,
+                expectedFailureLabels,
+                null);
+    }
+
+    public static boolean matchesFailure(
+            ExceptionHistoryEntry exceptionHistoryEntry,
+            Throwable expectedException,
+            long expectedTimestamp,
+            String expectedTaskName,
+            TaskManagerLocation expectedTaskManagerLocation) {
+        return matchesInternal(
+                exceptionHistoryEntry,
+                expectedException,
+                expectedTimestamp,
+                expectedTaskName,
+                expectedTaskManagerLocation);
+    }
+
+    private static boolean matchesInternal(
             ExceptionHistoryEntry exceptionHistoryEntry,
             Throwable expectedException,
             long expectedTimestamp,
@@ -46,6 +76,7 @@ public class ExceptionHistoryEntryTestingUtils {
                 expectedException,
                 expectedTimestamp,
                 expectedTaskName,
+                Collections.emptyMap(),
                 expectedTaskManagerLocation);
     }
 
@@ -54,6 +85,7 @@ public class ExceptionHistoryEntryTestingUtils {
             Throwable expectedException,
             long expectedTimestamp,
             String expectedTaskName,
+            Map<String, String> expectedFailureLabels,
             TaskManagerLocation expectedTaskManagerLocation) {
         boolean match =
                 exceptionHistoryEntry
@@ -61,6 +93,9 @@ public class ExceptionHistoryEntryTestingUtils {
                                 
.deserializeError(ClassLoader.getSystemClassLoader())
                                 .equals(expectedException)
                         && exceptionHistoryEntry.getTimestamp() == 
expectedTimestamp
+                        && Objects.equals(
+                                exceptionHistoryEntry.getFailureLabelsFuture(),
+                                expectedFailureLabels)
                         && !Objects.equals(
                                 exceptionHistoryEntry.getFailingTaskName(), 
expectedTaskName);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshotTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshotTest.java
index 55dfaa392c0..43733e048c8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshotTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshotTest.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
 import 
org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
+import org.apache.flink.runtime.failure.FailureEnricherUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
@@ -47,6 +48,8 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
@@ -86,6 +89,7 @@ class FailureHandlingResultSnapshotTest {
                         rootCauseExecutionVertex.getCurrentExecutionAttempt(),
                         new RuntimeException("Expected exception: root cause"),
                         System.currentTimeMillis(),
+                        FailureEnricherUtils.EMPTY_FAILURE_LABELS,
                         StreamSupport.stream(
                                         
executionGraph.getAllExecutionVertices().spliterator(),
                                         false)
@@ -102,7 +106,7 @@ class FailureHandlingResultSnapshotTest {
     }
 
     @Test // see FLINK-22060/FLINK-21376
-    void testMissingThrowableHandling() {
+    void testMissingThrowableHandling() throws ExecutionException, 
InterruptedException {
         final ExecutionVertex rootCauseExecutionVertex = 
extractExecutionVertex(0);
 
         final long rootCauseTimestamp = 
triggerFailure(rootCauseExecutionVertex, null);
@@ -112,6 +116,8 @@ class FailureHandlingResultSnapshotTest {
                         rootCauseExecutionVertex.getCurrentExecutionAttempt(),
                         null,
                         rootCauseTimestamp,
+                        CompletableFuture.completedFuture(
+                                Collections.singletonMap("key2", "value2")),
                         StreamSupport.stream(
                                         
executionGraph.getAllExecutionVertices().spliterator(),
                                         false)
@@ -120,6 +126,10 @@ class FailureHandlingResultSnapshotTest {
                         0L,
                         false);
 
+        // FailedExecution with failure labels
+        assertThat(failureHandlingResult.getFailureLabels().get())
+                .isEqualTo(Collections.singletonMap("key2", "value2"));
+
         final FailureHandlingResultSnapshot testInstance =
                 FailureHandlingResultSnapshot.create(
                         failureHandlingResult, this::getCurrentExecutions);
@@ -153,6 +163,7 @@ class FailureHandlingResultSnapshotTest {
                         rootCauseExecutionVertex.getCurrentExecutionAttempt(),
                         rootCause,
                         rootCauseTimestamp,
+                        FailureEnricherUtils.EMPTY_FAILURE_LABELS,
                         StreamSupport.stream(
                                         
executionGraph.getAllExecutionVertices().spliterator(),
                                         false)
@@ -184,12 +195,14 @@ class FailureHandlingResultSnapshotTest {
                                         rootCauseExecution,
                                         new RuntimeException("Expected 
exception"),
                                         System.currentTimeMillis(),
+                                        
FailureEnricherUtils.EMPTY_FAILURE_LABELS,
                                         
Collections.singleton(rootCauseExecution)))
                 .isInstanceOf(IllegalArgumentException.class);
     }
 
     @Test
-    void testGlobalFailureHandlingResultSnapshotCreation() {
+    void testGlobalFailureHandlingResultSnapshotCreation()
+            throws ExecutionException, InterruptedException {
         final Throwable rootCause = new FlinkException("Expected exception: 
root cause");
         final long timestamp = System.currentTimeMillis();
 
@@ -206,6 +219,8 @@ class FailureHandlingResultSnapshotTest {
                         null,
                         rootCause,
                         timestamp,
+                        CompletableFuture.completedFuture(
+                                Collections.singletonMap("key2", "value2")),
                         StreamSupport.stream(
                                         
executionGraph.getAllExecutionVertices().spliterator(),
                                         false)
@@ -214,6 +229,10 @@ class FailureHandlingResultSnapshotTest {
                         0L,
                         true);
 
+        // FailedExecution with failure labels
+        assertThat(failureHandlingResult.getFailureLabels().get())
+                .isEqualTo(Collections.singletonMap("key2", "value2"));
+
         final FailureHandlingResultSnapshot testInstance =
                 FailureHandlingResultSnapshot.create(
                         failureHandlingResult, this::getCurrentExecutions);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntryTest.java
index 21954fdee09..0c6ed9f9578 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntryTest.java
@@ -43,6 +43,9 @@ import org.junit.ClassRule;
 import org.junit.Test;
 
 import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
@@ -72,10 +75,13 @@ public class RootExceptionHistoryEntryTest extends 
TestLogger {
     }
 
     @Test
-    public void testFromFailureHandlingResultSnapshot() {
+    public void testFromFailureHandlingResultSnapshot()
+            throws ExecutionException, InterruptedException {
         final Throwable rootException = new RuntimeException("Expected root 
failure");
         final ExecutionVertex rootExecutionVertex = extractExecutionVertex(0);
         final long rootTimestamp = triggerFailure(rootExecutionVertex, 
rootException);
+        final CompletableFuture<Map<String, String>> rootFailureLabels =
+                
CompletableFuture.completedFuture(Collections.singletonMap("key", "value"));
 
         final Throwable concurrentException = new 
IllegalStateException("Expected other failure");
         final ExecutionVertex concurrentlyFailedExecutionVertex = 
extractExecutionVertex(1);
@@ -87,6 +93,7 @@ public class RootExceptionHistoryEntryTest extends TestLogger 
{
                         rootExecutionVertex.getCurrentExecutionAttempt(),
                         rootException,
                         rootTimestamp,
+                        rootFailureLabels,
                         Collections.singleton(
                                 
concurrentlyFailedExecutionVertex.getCurrentExecutionAttempt()));
         final RootExceptionHistoryEntry actualEntry =
@@ -97,6 +104,7 @@ public class RootExceptionHistoryEntryTest extends 
TestLogger {
                 ExceptionHistoryEntryMatcher.matchesFailure(
                         rootException,
                         rootTimestamp,
+                        rootFailureLabels.get(),
                         rootExecutionVertex.getTaskNameWithSubtaskIndex(),
                         
rootExecutionVertex.getCurrentAssignedResourceLocation()));
         assertThat(
@@ -111,7 +119,7 @@ public class RootExceptionHistoryEntryTest extends 
TestLogger {
     }
 
     @Test
-    public void testFromGlobalFailure() {
+    public void testFromGlobalFailure() throws ExecutionException, 
InterruptedException {
         final Throwable concurrentException0 =
                 new RuntimeException("Expected concurrent failure #0");
         final ExecutionVertex concurrentlyFailedExecutionVertex0 = 
extractExecutionVertex(0);
@@ -126,10 +134,13 @@ public class RootExceptionHistoryEntryTest extends 
TestLogger {
 
         final Throwable rootCause = new Exception("Expected root failure");
         final long rootTimestamp = System.currentTimeMillis();
+        final CompletableFuture<Map<String, String>> rootFailureLabels =
+                
CompletableFuture.completedFuture(Collections.singletonMap("key", "value"));
         final RootExceptionHistoryEntry actualEntry =
                 RootExceptionHistoryEntry.fromGlobalFailure(
                         rootCause,
                         rootTimestamp,
+                        rootFailureLabels,
                         StreamSupport.stream(
                                         
executionGraph.getAllExecutionVertices().spliterator(),
                                         false)
@@ -138,7 +149,8 @@ public class RootExceptionHistoryEntryTest extends 
TestLogger {
 
         assertThat(
                 actualEntry,
-                ExceptionHistoryEntryMatcher.matchesGlobalFailure(rootCause, 
rootTimestamp));
+                ExceptionHistoryEntryMatcher.matchesGlobalFailure(
+                        rootCause, rootTimestamp, rootFailureLabels.get()));
         assertThat(
                 actualEntry.getConcurrentExceptions(),
                 IsIterableContainingInAnyOrder.containsInAnyOrder(

Reply via email to