This is an automated email from the ASF dual-hosted git repository.
dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 603181da811 [FLINK-31993][runtime] Initialize and pass down
FailureEnrichers
603181da811 is described below
commit 603181da811edb47c0d573492639a381fbbedc28
Author: Panagiotis Garefalakis <[email protected]>
AuthorDate: Wed May 3 19:58:35 2023 -0700
[FLINK-31993][runtime] Initialize and pass down FailureEnrichers
* Init FailureEnrichers as part of ClusterEntrypoint
---
.../flink/runtime/dispatcher/Dispatcher.java | 4 ++
.../runtime/dispatcher/DispatcherServices.java | 15 +++++++-
.../dispatcher/JobManagerRunnerFactory.java | 4 ++
.../JobMasterServiceLeadershipRunnerFactory.java | 5 +++
.../dispatcher/PartialDispatcherServices.java | 12 +++++-
...atcherServicesWithJobPersistenceComponents.java | 7 +++-
.../runtime/entrypoint/ClusterEntrypoint.java | 7 ++++
...tDispatcherResourceManagerComponentFactory.java | 5 ++-
.../DispatcherResourceManagerComponentFactory.java | 3 ++
.../runtime/failure/FailureEnricherUtils.java | 45 +++++++++-------------
.../factories/DefaultJobMasterServiceFactory.java | 8 +++-
.../flink/runtime/minicluster/MiniCluster.java | 1 +
.../dispatcher/DispatcherResourceCleanupTest.java | 6 +++
.../flink/runtime/dispatcher/DispatcherTest.java | 9 +++++
.../ExecutionGraphInfoStoreTestUtils.java | 1 +
.../runtime/dispatcher/MiniDispatcherTest.java | 4 +-
.../runtime/dispatcher/TestingDispatcher.java | 3 +-
...ingJobMasterServiceLeadershipRunnerFactory.java | 4 ++
.../TestingPartialDispatcherServices.java | 12 ++++--
.../ZooKeeperDefaultDispatcherRunnerTest.java | 4 +-
.../runtime/minicluster/TestingMiniCluster.java | 2 +
.../recovery/ProcessFailureCancelingITCase.java | 2 +
22 files changed, 124 insertions(+), 39 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 09cb126ee0b..e06a6d5bfc4 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -35,6 +35,7 @@ import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.checkpoint.Checkpoints;
@@ -156,6 +157,7 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
private final BlobServer blobServer;
private final FatalErrorHandler fatalErrorHandler;
+ private final Collection<FailureEnricher> failureEnrichers;
private final OnMainThreadJobManagerRunnerRegistry
jobManagerRunnerRegistry;
@@ -267,6 +269,7 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
this.heartbeatServices = dispatcherServices.getHeartbeatServices();
this.blobServer = dispatcherServices.getBlobServer();
this.fatalErrorHandler = dispatcherServices.getFatalErrorHandler();
+ this.failureEnrichers = dispatcherServices.getFailureEnrichers();
this.jobGraphWriter = dispatcherServices.getJobGraphWriter();
this.jobResultStore = dispatcherServices.getJobResultStore();
this.jobManagerMetricGroup =
dispatcherServices.getJobManagerMetricGroup();
@@ -656,6 +659,7 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
jobManagerSharedServices,
new
DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
fatalErrorHandler,
+ failureEnrichers,
System.currentTimeMillis());
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java
index 12112b15b83..e3fd5f90702 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.dispatcher;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.dispatcher.cleanup.CleanupRunnerFactory;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -33,6 +34,7 @@ import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
+import java.util.Collection;
import java.util.concurrent.Executor;
/** {@link Dispatcher} services container. */
@@ -70,6 +72,8 @@ public class DispatcherServices {
private final Executor ioExecutor;
+ private final Collection<FailureEnricher> failureEnrichers;
+
DispatcherServices(
Configuration configuration,
HighAvailabilityServices highAvailabilityServices,
@@ -86,7 +90,8 @@ public class DispatcherServices {
JobResultStore jobResultStore,
JobManagerRunnerFactory jobManagerRunnerFactory,
CleanupRunnerFactory cleanupRunnerFactory,
- Executor ioExecutor) {
+ Executor ioExecutor,
+ Collection<FailureEnricher> failureEnrichers) {
this.configuration = Preconditions.checkNotNull(configuration,
"Configuration");
this.highAvailabilityServices =
Preconditions.checkNotNull(highAvailabilityServices,
"HighAvailabilityServices");
@@ -111,6 +116,7 @@ public class DispatcherServices {
this.cleanupRunnerFactory =
Preconditions.checkNotNull(cleanupRunnerFactory,
"CleanupRunnerFactory");
this.ioExecutor = Preconditions.checkNotNull(ioExecutor, "IOExecutor");
+ this.failureEnrichers = Preconditions.checkNotNull(failureEnrichers,
"FailureEnrichers");
}
public Configuration getConfiguration() {
@@ -178,6 +184,10 @@ public class DispatcherServices {
return ioExecutor;
}
+ public Collection<FailureEnricher> getFailureEnrichers() {
+ return failureEnrichers;
+ }
+
public static DispatcherServices from(
PartialDispatcherServicesWithJobPersistenceComponents
partialDispatcherServicesWithJobPersistenceComponents,
@@ -204,6 +214,7 @@ public class DispatcherServices {
partialDispatcherServicesWithJobPersistenceComponents.getJobResultStore(),
jobManagerRunnerFactory,
cleanupRunnerFactory,
-
partialDispatcherServicesWithJobPersistenceComponents.getIoExecutor());
+
partialDispatcherServicesWithJobPersistenceComponents.getIoExecutor(),
+
partialDispatcherServicesWithJobPersistenceComponents.getFailureEnrichers());
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerFactory.java
index 08c9cc7a47f..3567ee02883 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerFactory.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.dispatcher;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -28,6 +29,8 @@ import
org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFact
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
+import java.util.Collection;
+
/** Factory for a {@link JobManagerRunner}. */
@FunctionalInterface
public interface JobManagerRunnerFactory {
@@ -41,6 +44,7 @@ public interface JobManagerRunnerFactory {
JobManagerSharedServices jobManagerServices,
JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler,
+ Collection<FailureEnricher> failureEnrichers,
long initializationTimestamp)
throws Exception;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java
index 3b9e8db13f8..f8c51e45b35 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.dispatcher;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -40,6 +41,8 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.Preconditions;
+import java.util.Collection;
+
import static org.apache.flink.util.Preconditions.checkArgument;
/** Factory which creates a {@link JobMasterServiceLeadershipRunner}. */
@@ -56,6 +59,7 @@ public enum JobMasterServiceLeadershipRunnerFactory
implements JobManagerRunnerF
JobManagerSharedServices jobManagerServices,
JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler,
+ Collection<FailureEnricher> failureEnrichers,
long initializationTimestamp)
throws Exception {
@@ -105,6 +109,7 @@ public enum JobMasterServiceLeadershipRunnerFactory
implements JobManagerRunnerF
jobManagerJobMetricGroupFactory,
fatalErrorHandler,
userCodeClassLoader,
+ failureEnrichers,
initializationTimestamp);
final DefaultJobMasterServiceProcessFactory
jobMasterServiceProcessFactory =
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServices.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServices.java
index 477ea57e8f2..9ee850f6ec6 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServices.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServices.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.dispatcher;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -29,6 +30,7 @@ import
org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import java.util.Collection;
import java.util.concurrent.Executor;
/**
@@ -61,6 +63,8 @@ public class PartialDispatcherServices {
@Nonnull private final Executor ioExecutor;
+ @Nonnull private final Collection<FailureEnricher> failureEnrichers;
+
public PartialDispatcherServices(
@Nonnull Configuration configuration,
@Nonnull HighAvailabilityServices highAvailabilityServices,
@@ -73,7 +77,8 @@ public class PartialDispatcherServices {
@Nonnull HistoryServerArchivist historyServerArchivist,
@Nullable String metricQueryServiceAddress,
@Nonnull Executor ioExecutor,
- @Nonnull DispatcherOperationCaches operationCaches) {
+ @Nonnull DispatcherOperationCaches operationCaches,
+ @Nonnull Collection<FailureEnricher> failureEnrichers) {
this.configuration = configuration;
this.highAvailabilityServices = highAvailabilityServices;
this.resourceManagerGatewayRetriever = resourceManagerGatewayRetriever;
@@ -86,6 +91,7 @@ public class PartialDispatcherServices {
this.metricQueryServiceAddress = metricQueryServiceAddress;
this.ioExecutor = ioExecutor;
this.operationCaches = operationCaches;
+ this.failureEnrichers = failureEnrichers;
}
@Nonnull
@@ -147,4 +153,8 @@ public class PartialDispatcherServices {
public Executor getIoExecutor() {
return ioExecutor;
}
+
+ public Collection<FailureEnricher> getFailureEnrichers() {
+ return failureEnrichers;
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobPersistenceComponents.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobPersistenceComponents.java
index c6fe7b7eaa5..7689bfc6e72 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobPersistenceComponents.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/PartialDispatcherServicesWithJobPersistenceComponents.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.dispatcher;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -30,6 +31,7 @@ import
org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import javax.annotation.Nullable;
+import java.util.Collection;
import java.util.concurrent.Executor;
/** {@link DispatcherFactory} services container. */
@@ -52,6 +54,7 @@ public class
PartialDispatcherServicesWithJobPersistenceComponents
@Nullable String metricQueryServiceAddress,
Executor ioExecutor,
DispatcherOperationCaches operationCaches,
+ Collection<FailureEnricher> failureEnrichers,
JobGraphWriter jobGraphWriter,
JobResultStore jobResultStore) {
super(
@@ -66,7 +69,8 @@ public class
PartialDispatcherServicesWithJobPersistenceComponents
historyServerArchivist,
metricQueryServiceAddress,
ioExecutor,
- operationCaches);
+ operationCaches,
+ failureEnrichers);
this.jobGraphWriter = jobGraphWriter;
this.jobResultStore = jobResultStore;
}
@@ -96,6 +100,7 @@ public class
PartialDispatcherServicesWithJobPersistenceComponents
partialDispatcherServices.getMetricQueryServiceAddress(),
partialDispatcherServices.getIoExecutor(),
partialDispatcherServices.getOperationCaches(),
+ partialDispatcherServices.getFailureEnrichers(),
jobGraphWriter,
jobResultStore);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 9aebd8bec30..8bbe80877e8 100755
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -33,6 +33,7 @@ import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.core.plugin.PluginUtils;
@@ -47,6 +48,7 @@ import org.apache.flink.runtime.dispatcher.MiniDispatcher;
import
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
import
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
+import org.apache.flink.runtime.failure.FailureEnricherUtils;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
@@ -151,6 +153,9 @@ public abstract class ClusterEntrypoint implements
AutoCloseableAsync, FatalErro
@GuardedBy("lock")
private HeartbeatServices heartbeatServices;
+ @GuardedBy("lock")
+ private Collection<FailureEnricher> failureEnrichers;
+
@GuardedBy("lock")
private DelegationTokenManager delegationTokenManager;
@@ -303,6 +308,7 @@ public abstract class ClusterEntrypoint implements
AutoCloseableAsync, FatalErro
executionGraphInfoStore,
new RpcMetricQueryServiceRetriever(
metricRegistry.getMetricQueryServiceRpcService()),
+ failureEnrichers,
this);
clusterComponent
@@ -397,6 +403,7 @@ public abstract class ClusterEntrypoint implements
AutoCloseableAsync, FatalErro
blobServer.start();
configuration.setString(BlobServerOptions.PORT,
String.valueOf(blobServer.getPort()));
heartbeatServices = createHeartbeatServices(configuration);
+ failureEnrichers =
FailureEnricherUtils.getFailureEnrichers(configuration);
metricRegistry = createMetricRegistry(configuration,
pluginManager, rpcSystem);
final RpcService metricQueryServiceRpcService =
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
index a24e87dcae0..8f2a3c80997 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.entrypoint.component;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
@@ -113,6 +114,7 @@ public class
DefaultDispatcherResourceManagerComponentFactory
MetricRegistry metricRegistry,
ExecutionGraphInfoStore executionGraphInfoStore,
MetricQueryServiceRetriever metricQueryServiceRetriever,
+ Collection<FailureEnricher> failureEnrichers,
FatalErrorHandler fatalErrorHandler)
throws Exception {
@@ -217,7 +219,8 @@ public class
DefaultDispatcherResourceManagerComponentFactory
historyServerArchivist,
metricRegistry.getMetricQueryServiceGatewayRpcAddress(),
ioExecutor,
- dispatcherOperationCaches);
+ dispatcherOperationCaches,
+ failureEnrichers);
log.debug("Starting Dispatcher.");
dispatcherRunner =
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java
index bd3799ff797..52ee8d3bc7e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.entrypoint.component;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
@@ -30,6 +31,7 @@ import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.security.token.DelegationTokenManager;
import
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import java.util.Collection;
import java.util.concurrent.Executor;
/** Factory for the {@link DispatcherResourceManagerComponent}. */
@@ -47,6 +49,7 @@ public interface DispatcherResourceManagerComponentFactory {
MetricRegistry metricRegistry,
ExecutionGraphInfoStore executionGraphInfoStore,
MetricQueryServiceRetriever metricQueryServiceRetriever,
+ Collection<FailureEnricher> failureEnrichers,
FatalErrorHandler fatalErrorHandler)
throws Exception;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java
index cf05e0a40cb..f704a1ddbd7 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java
@@ -32,7 +32,6 @@ import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -85,31 +84,25 @@ public class FailureEnricherUtils {
pluginManager.load(FailureEnricherFactory.class);
final Set<FailureEnricher> failureEnrichers = new HashSet<>();
while (factoryIterator.hasNext()) {
- try {
- final FailureEnricherFactory failureEnricherFactory =
factoryIterator.next();
- final FailureEnricher failureEnricher =
-
failureEnricherFactory.createFailureEnricher(configuration);
- if
(includedEnrichers.contains(failureEnricher.getClass().getName())) {
- failureEnrichers.add(failureEnricher);
- LOG.info(
- "Found failure enricher {} at {}.",
- failureEnricherFactory.getClass().getName(),
- new File(
- failureEnricher
- .getClass()
- .getProtectionDomain()
- .getCodeSource()
- .getLocation()
- .toURI())
- .getCanonicalPath());
- } else {
- LOG.debug(
- "Excluding failure enricher {}, not configured in
enricher list ({}).",
- failureEnricherFactory.getClass().getName(),
- includedEnrichers);
- }
- } catch (Exception e) {
- LOG.warn("Error while loading failure enricher factory.", e);
+ final FailureEnricherFactory failureEnricherFactory =
factoryIterator.next();
+ final FailureEnricher failureEnricher =
+
failureEnricherFactory.createFailureEnricher(configuration);
+ if
(includedEnrichers.contains(failureEnricher.getClass().getName())) {
+ failureEnrichers.add(failureEnricher);
+ LOG.info(
+ "Found failure enricher {} at {}.",
+ failureEnricherFactory.getClass().getName(),
+ failureEnricher
+ .getClass()
+ .getProtectionDomain()
+ .getCodeSource()
+ .getLocation()
+ .getPath());
+ } else {
+ LOG.debug(
+ "Excluding failure enricher {}, not configured in
enricher list ({}).",
+ failureEnricherFactory.getClass().getName(),
+ includedEnrichers);
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
index c65c11bd2a7..f34fede5197 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.jobmaster.factories;
+import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.blocklist.BlocklistUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -38,7 +39,7 @@ import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.util.function.FunctionUtils;
-import java.util.Collections;
+import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
@@ -57,6 +58,7 @@ public class DefaultJobMasterServiceFactory implements
JobMasterServiceFactory {
private final FatalErrorHandler fatalErrorHandler;
private final ClassLoader userCodeClassloader;
private final ShuffleMaster<?> shuffleMaster;
+ private final Collection<FailureEnricher> failureEnrichers;
private final long initializationTimestamp;
public DefaultJobMasterServiceFactory(
@@ -71,6 +73,7 @@ public class DefaultJobMasterServiceFactory implements
JobMasterServiceFactory {
JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler,
ClassLoader userCodeClassloader,
+ Collection<FailureEnricher> failureEnrichers,
long initializationTimestamp) {
this.executor = executor;
this.rpcService = rpcService;
@@ -84,6 +87,7 @@ public class DefaultJobMasterServiceFactory implements
JobMasterServiceFactory {
this.fatalErrorHandler = fatalErrorHandler;
this.userCodeClassloader = userCodeClassloader;
this.shuffleMaster = jobManagerSharedServices.getShuffleMaster();
+ this.failureEnrichers = failureEnrichers;
this.initializationTimestamp = initializationTimestamp;
}
@@ -123,7 +127,7 @@ public class DefaultJobMasterServiceFactory implements
JobMasterServiceFactory {
DefaultExecutionDeploymentReconciler::new,
BlocklistUtils.loadBlocklistHandlerFactory(
jobMasterConfiguration.getConfiguration()),
- Collections.emptySet(),
+ failureEnrichers,
initializationTimestamp);
jobMaster.start();
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 7f39ab1ae0c..6de16027cc4 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -561,6 +561,7 @@ public class MiniCluster implements AutoCloseableAsync {
metricRegistry,
new MemoryExecutionGraphInfoStore(),
metricQueryServiceRetriever,
+ Collections.emptySet(),
fatalErrorHandler);
FutureUtils.assertNoException(
dispatcherResourceManagerComponent
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index a7c5cda3812..13fb6afac04 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobUtils;
@@ -72,6 +73,7 @@ import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
@@ -727,6 +729,7 @@ public class DispatcherResourceCleanupTest extends
TestLogger {
JobManagerSharedServices jobManagerSharedServices,
JobManagerJobMetricGroupFactory
jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler,
+ Collection<FailureEnricher> failureEnrichers,
long initializationTimestamp)
throws Exception {
jobManagerRunnerCreationLatch.run();
@@ -741,6 +744,7 @@ public class DispatcherResourceCleanupTest extends
TestLogger {
jobManagerSharedServices,
jobManagerJobMetricGroupFactory,
fatalErrorHandler,
+ failureEnrichers,
initializationTimestamp);
TestingJobMasterGateway testingJobMasterGateway =
@@ -787,6 +791,7 @@ public class DispatcherResourceCleanupTest extends
TestLogger {
JobManagerSharedServices jobManagerServices,
JobManagerJobMetricGroupFactory
jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler,
+ Collection<FailureEnricher> failureEnrichers,
long initializationTimestamp) {
return Optional.ofNullable(jobManagerRunners.poll())
.orElseThrow(
@@ -813,6 +818,7 @@ public class DispatcherResourceCleanupTest extends
TestLogger {
JobManagerSharedServices jobManagerServices,
JobManagerJobMetricGroupFactory
jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler,
+ Collection<FailureEnricher> failureEnrichers,
long initializationTimestamp)
throws Exception {
throw testException;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 3bd1821f44b..007550e2101 100755
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobServer;
@@ -1447,6 +1448,7 @@ public class DispatcherTest extends
AbstractDispatcherTest {
JobManagerSharedServices jobManagerServices,
JobManagerJobMetricGroupFactory
jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler,
+ Collection<FailureEnricher> failureEnricher,
long initializationTimestamp)
throws Exception {
assertEquals(expectedJobId, jobGraph.getJobID());
@@ -1553,6 +1555,7 @@ public class DispatcherTest extends
AbstractDispatcherTest {
JobManagerSharedServices jobManagerServices,
JobManagerJobMetricGroupFactory
jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler,
+ Collection<FailureEnricher> failureEnrichers,
long initializationTimestamp)
throws Exception {
@@ -1612,6 +1615,7 @@ public class DispatcherTest extends
AbstractDispatcherTest {
JobManagerSharedServices jobManagerServices,
JobManagerJobMetricGroupFactory
jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler,
+ Collection<FailureEnricher> failureEnrichers,
long initializationTimestamp)
throws Exception {
return new BlockingTerminationJobManagerService(
@@ -1689,6 +1693,7 @@ public class DispatcherTest extends
AbstractDispatcherTest {
JobManagerSharedServices jobManagerServices,
JobManagerJobMetricGroupFactory
jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler,
+ Collection<FailureEnricher> failureEnrichers,
long initializationTimestamp) {
initializationTimestampQueue.offer(initializationTimestamp);
return
TestingJobManagerRunner.newBuilder().setJobId(jobGraph.getJobID()).build();
@@ -1714,6 +1719,7 @@ public class DispatcherTest extends
AbstractDispatcherTest {
JobManagerSharedServices jobManagerSharedServices,
JobManagerJobMetricGroupFactory
jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler,
+ Collection<FailureEnricher> failureEnrichers,
long initializationTimestamp)
throws Exception {
assertEquals(expectedJobId, jobGraph.getJobID());
@@ -1727,6 +1733,7 @@ public class DispatcherTest extends
AbstractDispatcherTest {
jobManagerSharedServices,
jobManagerJobMetricGroupFactory,
fatalErrorHandler,
+ Collections.emptySet(),
initializationTimestamp);
}
}
@@ -1749,6 +1756,7 @@ public class DispatcherTest extends
AbstractDispatcherTest {
JobManagerSharedServices jobManagerServices,
JobManagerJobMetricGroupFactory
jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler,
+ Collection<FailureEnricher> failureEnrichers,
long initializationTimestamp)
throws Exception {
return resultFutureQueue.remove();
@@ -1776,6 +1784,7 @@ public class DispatcherTest extends
AbstractDispatcherTest {
JobManagerSharedServices jobManagerServices,
JobManagerJobMetricGroupFactory
jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler,
+ Collection<FailureEnricher> failureEnrichers,
long initializationTimestamp)
throws Exception {
final TestingJobManagerRunner runner =
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ExecutionGraphInfoStoreTestUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ExecutionGraphInfoStoreTestUtils.java
index d01469b9af4..7a2cf2c7954 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ExecutionGraphInfoStoreTestUtils.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ExecutionGraphInfoStoreTestUtils.java
@@ -245,6 +245,7 @@ public class ExecutionGraphInfoStoreTestUtils {
metricRegistry,
executionGraphInfoStore,
metricQueryServiceRetriever,
+ Collections.emptySet(),
fatalErrorHandler));
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index 4539ce32dfa..2e8c1b5a31f 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -60,6 +60,7 @@ import org.junit.rules.TemporaryFolder;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
@@ -358,7 +359,8 @@ public class MiniDispatcherTest extends TestLogger {
highAvailabilityServices.getJobResultStore(),
testingJobManagerRunnerFactory,
testingCleanupRunnerFactory,
- ForkJoinPool.commonPool()),
+ ForkJoinPool.commonPool(),
+ Collections.emptySet()),
recoveredJobGraph,
recoveredDirtyJob,
(dispatcher, scheduledExecutor, errorHandler) -> new
NoOpDispatcherBootstrap(),
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
index c91b4a3f32b..0e6424a5cfe 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
@@ -113,7 +113,8 @@ class TestingDispatcher extends Dispatcher {
jobResultStore,
jobManagerRunnerFactory,
cleanupRunnerFactory,
- ioExecutor),
+ ioExecutor,
+ Collections.emptySet()),
jobManagerRunnerRegistry,
resourceCleanerFactory);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobMasterServiceLeadershipRunnerFactory.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobMasterServiceLeadershipRunnerFactory.java
index 0eb6d4c4270..a177d7ee20a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobMasterServiceLeadershipRunnerFactory.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobMasterServiceLeadershipRunnerFactory.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.dispatcher;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -28,6 +29,8 @@ import
org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFact
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
+import java.util.Collection;
+
/**
* {@code TestingJobMasterServiceLeadershipRunnerFactory} implements {@code
JobManagerRunnerFactory}
* providing a factory method usually used for {@link
@@ -54,6 +57,7 @@ public class TestingJobMasterServiceLeadershipRunnerFactory
extends TestingJobMa
JobManagerSharedServices jobManagerServices,
JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler,
+ Collection<FailureEnricher> failureEnrichers,
long initializationTimestamp)
throws Exception {
return offerTestingJobManagerRunner(jobGraph.getJobID());
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingPartialDispatcherServices.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingPartialDispatcherServices.java
index ba986291aab..c8004311eb1 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingPartialDispatcherServices.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingPartialDispatcherServices.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.dispatcher;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.TestingBlobStoreBuilder;
@@ -36,6 +37,8 @@ import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
@@ -57,7 +60,8 @@ public class TestingPartialDispatcherServices extends
PartialDispatcherServices
HistoryServerArchivist historyServerArchivist,
@Nullable String metricQueryServiceAddress,
Executor ioExecutor,
- DispatcherOperationCaches operationCaches) {
+ DispatcherOperationCaches operationCaches,
+ Collection<FailureEnricher> failureEnrichers) {
super(
configuration,
highAvailabilityServices,
@@ -70,7 +74,8 @@ public class TestingPartialDispatcherServices extends
PartialDispatcherServices
historyServerArchivist,
metricQueryServiceAddress,
ioExecutor,
- operationCaches);
+ operationCaches,
+ failureEnrichers);
}
public static Builder builder() {
@@ -178,7 +183,8 @@ public class TestingPartialDispatcherServices extends
PartialDispatcherServices
historyServerArchivist,
metricQueryServiceAddress,
ioExecutor,
- operationCaches);
+ operationCaches,
+ Collections.emptySet());
}
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java
index d16bb2f6065..aec37af090d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java
@@ -73,6 +73,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
+import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
@@ -178,7 +179,8 @@ class ZooKeeperDefaultDispatcherRunnerTest {
VoidHistoryServerArchivist.INSTANCE,
null,
ForkJoinPool.commonPool(),
- new DispatcherOperationCaches());
+ new DispatcherOperationCaches(),
+ Collections.emptySet());
final DefaultDispatcherRunnerFactory
defaultDispatcherRunnerFactory =
DefaultDispatcherRunnerFactory.createSessionRunner(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
index 68a8398d447..fe02100f73d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
@@ -37,6 +37,7 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -176,6 +177,7 @@ public class TestingMiniCluster extends MiniCluster {
metricRegistry,
new MemoryExecutionGraphInfoStore(),
metricQueryServiceRetriever,
+ Collections.emptySet(),
fatalErrorHandler);
final CompletableFuture<Void> shutDownFuture =
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index e5d24deb6d2..179d30d4ea7 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -68,6 +68,7 @@ import org.junit.jupiter.api.io.TempDir;
import java.nio.file.Path;
import java.time.Duration;
+import java.util.Collections;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
@@ -160,6 +161,7 @@ class ProcessFailureCancelingITCase {
NoOpMetricRegistry.INSTANCE,
new MemoryExecutionGraphInfoStore(),
VoidMetricQueryServiceRetriever.INSTANCE,
+ Collections.emptySet(),
fatalErrorHandler);
TestProcessBuilder taskManagerProcessBuilder =