This is an automated email from the ASF dual-hosted git repository.

dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 603181da811 [FLINK-31993][runtime] Initialize and pass down 
FailureEnrichers
603181da811 is described below

commit 603181da811edb47c0d573492639a381fbbedc28
Author: Panagiotis Garefalakis <[email protected]>
AuthorDate: Wed May 3 19:58:35 2023 -0700

    [FLINK-31993][runtime] Initialize and pass down FailureEnrichers
    
    * Init FailureEnrichers as part of ClusterEntrypoint
---
 .../flink/runtime/dispatcher/Dispatcher.java       |  4 ++
 .../runtime/dispatcher/DispatcherServices.java     | 15 +++++++-
 .../dispatcher/JobManagerRunnerFactory.java        |  4 ++
 .../JobMasterServiceLeadershipRunnerFactory.java   |  5 +++
 .../dispatcher/PartialDispatcherServices.java      | 12 +++++-
 ...atcherServicesWithJobPersistenceComponents.java |  7 +++-
 .../runtime/entrypoint/ClusterEntrypoint.java      |  7 ++++
 ...tDispatcherResourceManagerComponentFactory.java |  5 ++-
 .../DispatcherResourceManagerComponentFactory.java |  3 ++
 .../runtime/failure/FailureEnricherUtils.java      | 45 +++++++++-------------
 .../factories/DefaultJobMasterServiceFactory.java  |  8 +++-
 .../flink/runtime/minicluster/MiniCluster.java     |  1 +
 .../dispatcher/DispatcherResourceCleanupTest.java  |  6 +++
 .../flink/runtime/dispatcher/DispatcherTest.java   |  9 +++++
 .../ExecutionGraphInfoStoreTestUtils.java          |  1 +
 .../runtime/dispatcher/MiniDispatcherTest.java     |  4 +-
 .../runtime/dispatcher/TestingDispatcher.java      |  3 +-
 ...ingJobMasterServiceLeadershipRunnerFactory.java |  4 ++
 .../TestingPartialDispatcherServices.java          | 12 ++++--
 .../ZooKeeperDefaultDispatcherRunnerTest.java      |  4 +-
 .../runtime/minicluster/TestingMiniCluster.java    |  2 +
 .../recovery/ProcessFailureCancelingITCase.java    |  2 +
 22 files changed, 124 insertions(+), 39 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 09cb126ee0b..e06a6d5bfc4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -35,6 +35,7 @@ import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.execution.CheckpointType;
 import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
@@ -156,6 +157,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
     private final BlobServer blobServer;
 
     private final FatalErrorHandler fatalErrorHandler;
+    private final Collection<FailureEnricher> failureEnrichers;
 
     private final OnMainThreadJobManagerRunnerRegistry 
jobManagerRunnerRegistry;
 
@@ -267,6 +269,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
         this.heartbeatServices = dispatcherServices.getHeartbeatServices();
         this.blobServer = dispatcherServices.getBlobServer();
         this.fatalErrorHandler = dispatcherServices.getFatalErrorHandler();
+        this.failureEnrichers = dispatcherServices.getFailureEnrichers();
         this.jobGraphWriter = dispatcherServices.getJobGraphWriter();
         this.jobResultStore = dispatcherServices.getJobResultStore();
         this.jobManagerMetricGroup = 
dispatcherServices.getJobManagerMetricGroup();
@@ -656,6 +659,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId>
                 jobManagerSharedServices,
                 new 
DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
                 fatalErrorHandler,
+                failureEnrichers,
                 System.currentTimeMillis());
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java
index 12112b15b83..e3fd5f90702 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.dispatcher.cleanup.CleanupRunnerFactory;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -33,6 +34,7 @@ import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
+import java.util.Collection;
 import java.util.concurrent.Executor;
 
 /** {@link Dispatcher} services container. */
@@ -70,6 +72,8 @@ public class DispatcherServices {
 
     private final Executor ioExecutor;
 
+    private final Collection<FailureEnricher> failureEnrichers;
+
     DispatcherServices(
             Configuration configuration,
             HighAvailabilityServices highAvailabilityServices,
@@ -86,7 +90,8 @@ public class DispatcherServices {
             JobResultStore jobResultStore,
             JobManagerRunnerFactory jobManagerRunnerFactory,
             CleanupRunnerFactory cleanupRunnerFactory,
-            Executor ioExecutor) {
+            Executor ioExecutor,
+            Collection<FailureEnricher> failureEnrichers) {
         this.configuration = Preconditions.checkNotNull(configuration, 
"Configuration");
         this.highAvailabilityServices =
                 Preconditions.checkNotNull(highAvailabilityServices, 
"HighAvailabilityServices");
@@ -111,6 +116,7 @@ public class DispatcherServices {
         this.cleanupRunnerFactory =
                 Preconditions.checkNotNull(cleanupRunnerFactory, 
"CleanupRunnerFactory");
         this.ioExecutor = Preconditions.checkNotNull(ioExecutor, "IOExecutor");
+        this.failureEnrichers = Preconditions.checkNotNull(failureEnrichers, 
"FailureEnrichers");
     }
 
     public Configuration getConfiguration() {
@@ -178,6 +184,10 @@ public class DispatcherServices {
         return ioExecutor;
     }
 
+    public Collection<FailureEnricher> getFailureEnrichers() {
+        return failureEnrichers;
+    }
+
     public static DispatcherServices from(
             PartialDispatcherServicesWithJobPersistenceComponents
                     partialDispatcherServicesWithJobPersistenceComponents,
@@ -204,6 +214,7 @@ public class DispatcherServices {
                 
partialDispatcherServicesWithJobPersistenceComponents.getJobResultStore(),
                 jobManagerRunnerFactory,
                 cleanupRunnerFactory,
-                
partialDispatcherServicesWithJobPersistenceComponents.getIoExecutor());
+                
partialDispatcherServicesWithJobPersistenceComponents.getIoExecutor(),
+                
partialDispatcherServicesWithJobPersistenceComponents.getFailureEnrichers());
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerFactory.java
index 08c9cc7a47f..3567ee02883 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerFactory.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -28,6 +29,8 @@ import 
org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFact
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
+import java.util.Collection;
+
 /** Factory for a {@link JobManagerRunner}. */
 @FunctionalInterface
 public interface JobManagerRunnerFactory {
@@ -41,6 +44,7 @@ public interface JobManagerRunnerFactory {
             JobManagerSharedServices jobManagerServices,
             JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
             FatalErrorHandler fatalErrorHandler,
+            Collection<FailureEnricher> failureEnrichers,
             long initializationTimestamp)
             throws Exception;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java
index 3b9e8db13f8..f8c51e45b35 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.dispatcher;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -40,6 +41,8 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.Preconditions;
 
+import java.util.Collection;
+
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /** Factory which creates a {@link JobMasterServiceLeadershipRunner}. */
@@ -56,6 +59,7 @@ public enum JobMasterServiceLeadershipRunnerFactory 
implements JobManagerRunnerF
             JobManagerSharedServices jobManagerServices,
             JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
             FatalErrorHandler fatalErrorHandler,
+            Collection<FailureEnricher> failureEnrichers,
             long initializationTimestamp)
             throws Exception {
 
@@ -105,6 +109,7 @@ public enum JobMasterServiceLeadershipRunnerFactory 
implements JobManagerRunnerF
                         jobManagerJobMetricGroupFactory,
                         fatalErrorHandler,
                         userCodeClassLoader,
+                        failureEnrichers,
                         initializationTimestamp);
 
         final DefaultJobMasterServiceProcessFactory 
jobMasterServiceProcessFactory =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServices.java
index 477ea57e8f2..9ee850f6ec6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServices.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -29,6 +30,7 @@ import 
org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.util.Collection;
 import java.util.concurrent.Executor;
 
 /**
@@ -61,6 +63,8 @@ public class PartialDispatcherServices {
 
     @Nonnull private final Executor ioExecutor;
 
+    @Nonnull private final Collection<FailureEnricher> failureEnrichers;
+
     public PartialDispatcherServices(
             @Nonnull Configuration configuration,
             @Nonnull HighAvailabilityServices highAvailabilityServices,
@@ -73,7 +77,8 @@ public class PartialDispatcherServices {
             @Nonnull HistoryServerArchivist historyServerArchivist,
             @Nullable String metricQueryServiceAddress,
             @Nonnull Executor ioExecutor,
-            @Nonnull DispatcherOperationCaches operationCaches) {
+            @Nonnull DispatcherOperationCaches operationCaches,
+            @Nonnull Collection<FailureEnricher> failureEnrichers) {
         this.configuration = configuration;
         this.highAvailabilityServices = highAvailabilityServices;
         this.resourceManagerGatewayRetriever = resourceManagerGatewayRetriever;
@@ -86,6 +91,7 @@ public class PartialDispatcherServices {
         this.metricQueryServiceAddress = metricQueryServiceAddress;
         this.ioExecutor = ioExecutor;
         this.operationCaches = operationCaches;
+        this.failureEnrichers = failureEnrichers;
     }
 
     @Nonnull
@@ -147,4 +153,8 @@ public class PartialDispatcherServices {
     public Executor getIoExecutor() {
         return ioExecutor;
     }
+
+    public Collection<FailureEnricher> getFailureEnrichers() {
+        return failureEnrichers;
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobPersistenceComponents.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobPersistenceComponents.java
index c6fe7b7eaa5..7689bfc6e72 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobPersistenceComponents.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobPersistenceComponents.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -30,6 +31,7 @@ import 
org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
 import javax.annotation.Nullable;
 
+import java.util.Collection;
 import java.util.concurrent.Executor;
 
 /** {@link DispatcherFactory} services container. */
@@ -52,6 +54,7 @@ public class 
PartialDispatcherServicesWithJobPersistenceComponents
             @Nullable String metricQueryServiceAddress,
             Executor ioExecutor,
             DispatcherOperationCaches operationCaches,
+            Collection<FailureEnricher> failureEnrichers,
             JobGraphWriter jobGraphWriter,
             JobResultStore jobResultStore) {
         super(
@@ -66,7 +69,8 @@ public class 
PartialDispatcherServicesWithJobPersistenceComponents
                 historyServerArchivist,
                 metricQueryServiceAddress,
                 ioExecutor,
-                operationCaches);
+                operationCaches,
+                failureEnrichers);
         this.jobGraphWriter = jobGraphWriter;
         this.jobResultStore = jobResultStore;
     }
@@ -96,6 +100,7 @@ public class 
PartialDispatcherServicesWithJobPersistenceComponents
                 partialDispatcherServices.getMetricQueryServiceAddress(),
                 partialDispatcherServices.getIoExecutor(),
                 partialDispatcherServices.getOperationCaches(),
+                partialDispatcherServices.getFailureEnrichers(),
                 jobGraphWriter,
                 jobResultStore);
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 9aebd8bec30..8bbe80877e8 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -33,6 +33,7 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.SchedulerExecutionMode;
 import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.plugin.PluginManager;
 import org.apache.flink.core.plugin.PluginUtils;
@@ -47,6 +48,7 @@ import org.apache.flink.runtime.dispatcher.MiniDispatcher;
 import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
 import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
+import org.apache.flink.runtime.failure.FailureEnricherUtils;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
@@ -151,6 +153,9 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
     @GuardedBy("lock")
     private HeartbeatServices heartbeatServices;
 
+    @GuardedBy("lock")
+    private Collection<FailureEnricher> failureEnrichers;
+
     @GuardedBy("lock")
     private DelegationTokenManager delegationTokenManager;
 
@@ -303,6 +308,7 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
                             executionGraphInfoStore,
                             new RpcMetricQueryServiceRetriever(
                                     
metricRegistry.getMetricQueryServiceRpcService()),
+                            failureEnrichers,
                             this);
 
             clusterComponent
@@ -397,6 +403,7 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
             blobServer.start();
             configuration.setString(BlobServerOptions.PORT, 
String.valueOf(blobServer.getPort()));
             heartbeatServices = createHeartbeatServices(configuration);
+            failureEnrichers = 
FailureEnricherUtils.getFailureEnrichers(configuration);
             metricRegistry = createMetricRegistry(configuration, 
pluginManager, rpcSystem);
 
             final RpcService metricQueryServiceRpcService =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
index a24e87dcae0..8f2a3c80997 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.entrypoint.component;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
@@ -113,6 +114,7 @@ public class 
DefaultDispatcherResourceManagerComponentFactory
             MetricRegistry metricRegistry,
             ExecutionGraphInfoStore executionGraphInfoStore,
             MetricQueryServiceRetriever metricQueryServiceRetriever,
+            Collection<FailureEnricher> failureEnrichers,
             FatalErrorHandler fatalErrorHandler)
             throws Exception {
 
@@ -217,7 +219,8 @@ public class 
DefaultDispatcherResourceManagerComponentFactory
                             historyServerArchivist,
                             
metricRegistry.getMetricQueryServiceGatewayRpcAddress(),
                             ioExecutor,
-                            dispatcherOperationCaches);
+                            dispatcherOperationCaches,
+                            failureEnrichers);
 
             log.debug("Starting Dispatcher.");
             dispatcherRunner =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java
index bd3799ff797..52ee8d3bc7e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.entrypoint.component;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
@@ -30,6 +31,7 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.security.token.DelegationTokenManager;
 import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 
+import java.util.Collection;
 import java.util.concurrent.Executor;
 
 /** Factory for the {@link DispatcherResourceManagerComponent}. */
@@ -47,6 +49,7 @@ public interface DispatcherResourceManagerComponentFactory {
             MetricRegistry metricRegistry,
             ExecutionGraphInfoStore executionGraphInfoStore,
             MetricQueryServiceRetriever metricQueryServiceRetriever,
+            Collection<FailureEnricher> failureEnrichers,
             FatalErrorHandler fatalErrorHandler)
             throws Exception;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java
index cf05e0a40cb..f704a1ddbd7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java
@@ -32,7 +32,6 @@ import org.apache.flink.util.concurrent.FutureUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -85,31 +84,25 @@ public class FailureEnricherUtils {
                 pluginManager.load(FailureEnricherFactory.class);
         final Set<FailureEnricher> failureEnrichers = new HashSet<>();
         while (factoryIterator.hasNext()) {
-            try {
-                final FailureEnricherFactory failureEnricherFactory = 
factoryIterator.next();
-                final FailureEnricher failureEnricher =
-                        
failureEnricherFactory.createFailureEnricher(configuration);
-                if 
(includedEnrichers.contains(failureEnricher.getClass().getName())) {
-                    failureEnrichers.add(failureEnricher);
-                    LOG.info(
-                            "Found failure enricher {} at {}.",
-                            failureEnricherFactory.getClass().getName(),
-                            new File(
-                                            failureEnricher
-                                                    .getClass()
-                                                    .getProtectionDomain()
-                                                    .getCodeSource()
-                                                    .getLocation()
-                                                    .toURI())
-                                    .getCanonicalPath());
-                } else {
-                    LOG.debug(
-                            "Excluding failure enricher {}, not configured in 
enricher list ({}).",
-                            failureEnricherFactory.getClass().getName(),
-                            includedEnrichers);
-                }
-            } catch (Exception e) {
-                LOG.warn("Error while loading failure enricher factory.", e);
+            final FailureEnricherFactory failureEnricherFactory = 
factoryIterator.next();
+            final FailureEnricher failureEnricher =
+                    
failureEnricherFactory.createFailureEnricher(configuration);
+            if 
(includedEnrichers.contains(failureEnricher.getClass().getName())) {
+                failureEnrichers.add(failureEnricher);
+                LOG.info(
+                        "Found failure enricher {} at {}.",
+                        failureEnricherFactory.getClass().getName(),
+                        failureEnricher
+                                .getClass()
+                                .getProtectionDomain()
+                                .getCodeSource()
+                                .getLocation()
+                                .getPath());
+            } else {
+                LOG.debug(
+                        "Excluding failure enricher {}, not configured in 
enricher list ({}).",
+                        failureEnricherFactory.getClass().getName(),
+                        includedEnrichers);
             }
         }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
index c65c11bd2a7..f34fede5197 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobmaster.factories;
 
+import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.runtime.blocklist.BlocklistUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -38,7 +39,7 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.util.function.FunctionUtils;
 
-import java.util.Collections;
+import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -57,6 +58,7 @@ public class DefaultJobMasterServiceFactory implements 
JobMasterServiceFactory {
     private final FatalErrorHandler fatalErrorHandler;
     private final ClassLoader userCodeClassloader;
     private final ShuffleMaster<?> shuffleMaster;
+    private final Collection<FailureEnricher> failureEnrichers;
     private final long initializationTimestamp;
 
     public DefaultJobMasterServiceFactory(
@@ -71,6 +73,7 @@ public class DefaultJobMasterServiceFactory implements 
JobMasterServiceFactory {
             JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
             FatalErrorHandler fatalErrorHandler,
             ClassLoader userCodeClassloader,
+            Collection<FailureEnricher> failureEnrichers,
             long initializationTimestamp) {
         this.executor = executor;
         this.rpcService = rpcService;
@@ -84,6 +87,7 @@ public class DefaultJobMasterServiceFactory implements 
JobMasterServiceFactory {
         this.fatalErrorHandler = fatalErrorHandler;
         this.userCodeClassloader = userCodeClassloader;
         this.shuffleMaster = jobManagerSharedServices.getShuffleMaster();
+        this.failureEnrichers = failureEnrichers;
         this.initializationTimestamp = initializationTimestamp;
     }
 
@@ -123,7 +127,7 @@ public class DefaultJobMasterServiceFactory implements 
JobMasterServiceFactory {
                         DefaultExecutionDeploymentReconciler::new,
                         BlocklistUtils.loadBlocklistHandlerFactory(
                                 jobMasterConfiguration.getConfiguration()),
-                        Collections.emptySet(),
+                        failureEnrichers,
                         initializationTimestamp);
 
         jobMaster.start();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 7f39ab1ae0c..6de16027cc4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -561,6 +561,7 @@ public class MiniCluster implements AutoCloseableAsync {
                         metricRegistry,
                         new MemoryExecutionGraphInfoStore(),
                         metricQueryServiceRetriever,
+                        Collections.emptySet(),
                         fatalErrorHandler);
         FutureUtils.assertNoException(
                 dispatcherResourceManagerComponent
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index a7c5cda3812..13fb6afac04 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.BlobUtils;
@@ -72,6 +73,7 @@ import org.junit.rules.TemporaryFolder;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -727,6 +729,7 @@ public class DispatcherResourceCleanupTest extends 
TestLogger {
                 JobManagerSharedServices jobManagerSharedServices,
                 JobManagerJobMetricGroupFactory 
jobManagerJobMetricGroupFactory,
                 FatalErrorHandler fatalErrorHandler,
+                Collection<FailureEnricher> failureEnrichers,
                 long initializationTimestamp)
                 throws Exception {
             jobManagerRunnerCreationLatch.run();
@@ -741,6 +744,7 @@ public class DispatcherResourceCleanupTest extends 
TestLogger {
                             jobManagerSharedServices,
                             jobManagerJobMetricGroupFactory,
                             fatalErrorHandler,
+                            failureEnrichers,
                             initializationTimestamp);
 
             TestingJobMasterGateway testingJobMasterGateway =
@@ -787,6 +791,7 @@ public class DispatcherResourceCleanupTest extends 
TestLogger {
                 JobManagerSharedServices jobManagerServices,
                 JobManagerJobMetricGroupFactory 
jobManagerJobMetricGroupFactory,
                 FatalErrorHandler fatalErrorHandler,
+                Collection<FailureEnricher> failureEnrichers,
                 long initializationTimestamp) {
             return Optional.ofNullable(jobManagerRunners.poll())
                     .orElseThrow(
@@ -813,6 +818,7 @@ public class DispatcherResourceCleanupTest extends 
TestLogger {
                 JobManagerSharedServices jobManagerServices,
                 JobManagerJobMetricGroupFactory 
jobManagerJobMetricGroupFactory,
                 FatalErrorHandler fatalErrorHandler,
+                Collection<FailureEnricher> failureEnrichers,
                 long initializationTimestamp)
                 throws Exception {
             throw testException;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 3bd1821f44b..007550e2101 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.core.testutils.FlinkMatchers;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobServer;
@@ -1447,6 +1448,7 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
                 JobManagerSharedServices jobManagerServices,
                 JobManagerJobMetricGroupFactory 
jobManagerJobMetricGroupFactory,
                 FatalErrorHandler fatalErrorHandler,
+                Collection<FailureEnricher> failureEnricher,
                 long initializationTimestamp)
                 throws Exception {
             assertEquals(expectedJobId, jobGraph.getJobID());
@@ -1553,6 +1555,7 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
                 JobManagerSharedServices jobManagerServices,
                 JobManagerJobMetricGroupFactory 
jobManagerJobMetricGroupFactory,
                 FatalErrorHandler fatalErrorHandler,
+                Collection<FailureEnricher> failureEnrichers,
                 long initializationTimestamp)
                 throws Exception {
 
@@ -1612,6 +1615,7 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
                 JobManagerSharedServices jobManagerServices,
                 JobManagerJobMetricGroupFactory 
jobManagerJobMetricGroupFactory,
                 FatalErrorHandler fatalErrorHandler,
+                Collection<FailureEnricher> failureEnrichers,
                 long initializationTimestamp)
                 throws Exception {
             return new BlockingTerminationJobManagerService(
@@ -1689,6 +1693,7 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
                 JobManagerSharedServices jobManagerServices,
                 JobManagerJobMetricGroupFactory 
jobManagerJobMetricGroupFactory,
                 FatalErrorHandler fatalErrorHandler,
+                Collection<FailureEnricher> failureEnrichers,
                 long initializationTimestamp) {
             initializationTimestampQueue.offer(initializationTimestamp);
             return 
TestingJobManagerRunner.newBuilder().setJobId(jobGraph.getJobID()).build();
@@ -1714,6 +1719,7 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
                 JobManagerSharedServices jobManagerSharedServices,
                 JobManagerJobMetricGroupFactory 
jobManagerJobMetricGroupFactory,
                 FatalErrorHandler fatalErrorHandler,
+                Collection<FailureEnricher> failureEnrichers,
                 long initializationTimestamp)
                 throws Exception {
             assertEquals(expectedJobId, jobGraph.getJobID());
@@ -1727,6 +1733,7 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
                     jobManagerSharedServices,
                     jobManagerJobMetricGroupFactory,
                     fatalErrorHandler,
+                    Collections.emptySet(),
                     initializationTimestamp);
         }
     }
@@ -1749,6 +1756,7 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
                 JobManagerSharedServices jobManagerServices,
                 JobManagerJobMetricGroupFactory 
jobManagerJobMetricGroupFactory,
                 FatalErrorHandler fatalErrorHandler,
+                Collection<FailureEnricher> failureEnrichers,
                 long initializationTimestamp)
                 throws Exception {
             return resultFutureQueue.remove();
@@ -1776,6 +1784,7 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
                 JobManagerSharedServices jobManagerServices,
                 JobManagerJobMetricGroupFactory 
jobManagerJobMetricGroupFactory,
                 FatalErrorHandler fatalErrorHandler,
+                Collection<FailureEnricher> failureEnrichers,
                 long initializationTimestamp)
                 throws Exception {
             final TestingJobManagerRunner runner =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ExecutionGraphInfoStoreTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ExecutionGraphInfoStoreTestUtils.java
index d01469b9af4..7a2cf2c7954 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ExecutionGraphInfoStoreTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ExecutionGraphInfoStoreTestUtils.java
@@ -245,6 +245,7 @@ public class ExecutionGraphInfoStoreTestUtils {
                             metricRegistry,
                             executionGraphInfoStore,
                             metricQueryServiceRetriever,
+                            Collections.emptySet(),
                             fatalErrorHandler));
         }
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index 4539ce32dfa..2e8c1b5a31f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -60,6 +60,7 @@ import org.junit.rules.TemporaryFolder;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ForkJoinPool;
@@ -358,7 +359,8 @@ public class MiniDispatcherTest extends TestLogger {
                         highAvailabilityServices.getJobResultStore(),
                         testingJobManagerRunnerFactory,
                         testingCleanupRunnerFactory,
-                        ForkJoinPool.commonPool()),
+                        ForkJoinPool.commonPool(),
+                        Collections.emptySet()),
                 recoveredJobGraph,
                 recoveredDirtyJob,
                 (dispatcher, scheduledExecutor, errorHandler) -> new 
NoOpDispatcherBootstrap(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
index c91b4a3f32b..0e6424a5cfe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
@@ -113,7 +113,8 @@ class TestingDispatcher extends Dispatcher {
                         jobResultStore,
                         jobManagerRunnerFactory,
                         cleanupRunnerFactory,
-                        ioExecutor),
+                        ioExecutor,
+                        Collections.emptySet()),
                 jobManagerRunnerRegistry,
                 resourceCleanerFactory);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobMasterServiceLeadershipRunnerFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobMasterServiceLeadershipRunnerFactory.java
index 0eb6d4c4270..a177d7ee20a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobMasterServiceLeadershipRunnerFactory.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobMasterServiceLeadershipRunnerFactory.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -28,6 +29,8 @@ import 
org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFact
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
+import java.util.Collection;
+
 /**
  * {@code TestingJobMasterServiceLeadershipRunnerFactory} implements {@code 
JobManagerRunnerFactory}
  * providing a factory method usually used for {@link
@@ -54,6 +57,7 @@ public class TestingJobMasterServiceLeadershipRunnerFactory 
extends TestingJobMa
             JobManagerSharedServices jobManagerServices,
             JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
             FatalErrorHandler fatalErrorHandler,
+            Collection<FailureEnricher> failureEnrichers,
             long initializationTimestamp)
             throws Exception {
         return offerTestingJobManagerRunner(jobGraph.getJobID());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingPartialDispatcherServices.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingPartialDispatcherServices.java
index ba986291aab..c8004311eb1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingPartialDispatcherServices.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingPartialDispatcherServices.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.BlobStore;
 import org.apache.flink.runtime.blob.TestingBlobStoreBuilder;
@@ -36,6 +37,8 @@ import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ForkJoinPool;
@@ -57,7 +60,8 @@ public class TestingPartialDispatcherServices extends 
PartialDispatcherServices
             HistoryServerArchivist historyServerArchivist,
             @Nullable String metricQueryServiceAddress,
             Executor ioExecutor,
-            DispatcherOperationCaches operationCaches) {
+            DispatcherOperationCaches operationCaches,
+            Collection<FailureEnricher> failureEnrichers) {
         super(
                 configuration,
                 highAvailabilityServices,
@@ -70,7 +74,8 @@ public class TestingPartialDispatcherServices extends 
PartialDispatcherServices
                 historyServerArchivist,
                 metricQueryServiceAddress,
                 ioExecutor,
-                operationCaches);
+                operationCaches,
+                failureEnrichers);
     }
 
     public static Builder builder() {
@@ -178,7 +183,8 @@ public class TestingPartialDispatcherServices extends 
PartialDispatcherServices
                     historyServerArchivist,
                     metricQueryServiceAddress,
                     ioExecutor,
-                    operationCaches);
+                    operationCaches,
+                    Collections.emptySet());
         }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java
index d16bb2f6065..aec37af090d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java
@@ -73,6 +73,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
+import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ForkJoinPool;
@@ -178,7 +179,8 @@ class ZooKeeperDefaultDispatcherRunnerTest {
                             VoidHistoryServerArchivist.INSTANCE,
                             null,
                             ForkJoinPool.commonPool(),
-                            new DispatcherOperationCaches());
+                            new DispatcherOperationCaches(),
+                            Collections.emptySet());
 
             final DefaultDispatcherRunnerFactory 
defaultDispatcherRunnerFactory =
                     DefaultDispatcherRunnerFactory.createSessionRunner(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
index 68a8398d447..fe02100f73d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
@@ -37,6 +37,7 @@ import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -176,6 +177,7 @@ public class TestingMiniCluster extends MiniCluster {
                             metricRegistry,
                             new MemoryExecutionGraphInfoStore(),
                             metricQueryServiceRetriever,
+                            Collections.emptySet(),
                             fatalErrorHandler);
 
             final CompletableFuture<Void> shutDownFuture =
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index e5d24deb6d2..179d30d4ea7 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -68,6 +68,7 @@ import org.junit.jupiter.api.io.TempDir;
 
 import java.nio.file.Path;
 import java.time.Duration;
+import java.util.Collections;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
@@ -160,6 +161,7 @@ class ProcessFailureCancelingITCase {
                             NoOpMetricRegistry.INSTANCE,
                             new MemoryExecutionGraphInfoStore(),
                             VoidMetricQueryServiceRetriever.INSTANCE,
+                            Collections.emptySet(),
                             fatalErrorHandler);
 
             TestProcessBuilder taskManagerProcessBuilder =


Reply via email to