dmvk commented on code in PR #22516:
URL: https://github.com/apache/flink/pull/22516#discussion_r1205406251


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java:
##########
@@ -57,6 +59,7 @@ public class DefaultJobMasterServiceFactory implements 
JobMasterServiceFactory {
     private final FatalErrorHandler fatalErrorHandler;
     private final ClassLoader userCodeClassloader;
     private final ShuffleMaster<?> shuffleMaster;
+    private final Collection<FailureEnricher> failureEnrichers;

Review Comment:
   unused?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java:
##########
@@ -197,6 +198,9 @@ public class MiniCluster implements AutoCloseableAsync {
     @GuardedBy("lock")
     private HeartbeatServices heartbeatServices;
 
+    @GuardedBy("lock")
+    private Collection<FailureEnricher> failureEnrichers;

Review Comment:
   Why do we have an extra property here if we don't allow users to set it?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java:
##########
@@ -451,6 +464,10 @@ protected HeartbeatServices 
createHeartbeatServices(Configuration configuration)
         return HeartbeatServices.fromConfiguration(configuration);
     }
 
+    protected Collection<FailureEnricher> createFailureEnrichers(Configuration 
configuration) {

Review Comment:
   Why do we need an extra method instead of calling `getFailureEnrichers` 
directly?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/failure/FailureEnricherUtils.java:
##########
@@ -109,7 +109,7 @@ static Collection<FailureEnricher> getFailureEnrichers(
                             includedEnrichers);
                 }
             } catch (Exception e) {
-                LOG.warn("Error while loading failure enricher factory.", e);
+                throw new RuntimeException("Error while loading failure 
enricher factory.", e);

Review Comment:
   If you replace
   
   ```
   new File( ...
     .getCodeSource()
     .getLocation()
     .toURI())
   ```
   above with
   ```
   LOG.info(
           "Found failure enricher {} at {}.",
           failureEnricherFactory.getClass().getName(),
           failureEnricher
               .getClass()
               .getProtectionDomain()
               .getCodeSource()
               .getLocation()
               .getPath());
   ```
   there will be nothing to handle...
   
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java:
##########
@@ -397,6 +403,13 @@ protected void initializeServices(Configuration 
configuration, PluginManager plu
             blobServer.start();
             configuration.setString(BlobServerOptions.PORT, 
String.valueOf(blobServer.getPort()));
             heartbeatServices = createHeartbeatServices(configuration);
+            failureEnrichers = createFailureEnrichers(configuration);
+            delegationTokenManager =
+                    DefaultDelegationTokenManagerFactory.create(
+                            configuration,
+                            pluginManager,
+                            commonRpcService.getScheduledExecutor(),
+                            ioExecutor);

Review Comment:
   how is this related?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherServices.java:
##########
@@ -178,6 +184,10 @@ public Executor getIoExecutor() {
         return ioExecutor;
     }
 
+    public Collection<FailureEnricher> getFailureListeners() {

Review Comment:
   listeners?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to