dmvk commented on code in PR #22516:
URL: https://github.com/apache/flink/pull/22516#discussion_r1205406251
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java:
##########
@@ -57,6 +59,7 @@ public class DefaultJobMasterServiceFactory implements
JobMasterServiceFactory {
private final FatalErrorHandler fatalErrorHandler;
private final ClassLoader userCodeClassloader;
private final ShuffleMaster<?> shuffleMaster;
+ private final Collection<FailureEnricher> failureEnrichers;
Review Comment:
unused?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java:
##########
@@ -197,6 +198,9 @@ public class MiniCluster implements AutoCloseableAsync {
@GuardedBy("lock")
private HeartbeatServices heartbeatServices;
+ @GuardedBy("lock")
+ private Collection<FailureEnricher> failureEnrichers;
Review Comment:
Why do we have an extra property here if we don't allow users to set it?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java:
##########
@@ -451,6 +464,10 @@ protected HeartbeatServices
createHeartbeatServices(Configuration configuration)
return HeartbeatServices.fromConfiguration(configuration);
}
+ protected Collection<FailureEnricher> createFailureEnrichers(Configuration
configuration) {
Review Comment:
Why do we need an extra method instead of calling `getFailureEnrichers`
directly?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java:
##########
@@ -109,7 +109,7 @@ static Collection<FailureEnricher> getFailureEnrichers(
includedEnrichers);
}
} catch (Exception e) {
- LOG.warn("Error while loading failure enricher factory.", e);
+ throw new RuntimeException("Error while loading failure
enricher factory.", e);
Review Comment:
If you replace
```
new File( ...
.getCodeSource()
.getLocation()
.toURI())
```
above with
```
LOG.info(
"Found failure enricher {} at {}.",
failureEnricherFactory.getClass().getName(),
failureEnricher
.getClass()
.getProtectionDomain()
.getCodeSource()
.getLocation()
.getPath());
```
there will be nothing to handle...
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java:
##########
@@ -397,6 +403,13 @@ protected void initializeServices(Configuration
configuration, PluginManager plu
blobServer.start();
configuration.setString(BlobServerOptions.PORT,
String.valueOf(blobServer.getPort()));
heartbeatServices = createHeartbeatServices(configuration);
+ failureEnrichers = createFailureEnrichers(configuration);
+ delegationTokenManager =
+ DefaultDelegationTokenManagerFactory.create(
+ configuration,
+ pluginManager,
+ commonRpcService.getScheduledExecutor(),
+ ioExecutor);
Review Comment:
how is this related?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java:
##########
@@ -178,6 +184,10 @@ public Executor getIoExecutor() {
return ioExecutor;
}
+ public Collection<FailureEnricher> getFailureListeners() {
Review Comment:
listeners?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]