davidradl commented on code in PR #27617:
URL: https://github.com/apache/flink/pull/27617#discussion_r2812612623


##########
flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java:
##########
@@ -338,14 +355,48 @@ public static void initialize(Configuration config, 
@Nullable PluginManager plug
             final List<FileSystemFactory> fileSystemFactories =
                     loadFileSystemFactories(factorySuppliers);
 
+            // Track registered priorities for factory selection
+            final HashMap<String, Integer> registeredPriorities = new 
HashMap<>();
+
             // configure all file system factories
             for (FileSystemFactory factory : fileSystemFactories) {
                 factory.configure(config);
-                String scheme = factory.getScheme();
+                final String scheme = factory.getScheme();
 
                 FileSystemFactory fsf =
                         ConnectionLimitingFactory.decorateIfLimited(factory, 
scheme, config);
-                FS_FACTORIES.put(scheme, fsf);
+
+                final String className = resolveFactoryClassName(factory);
+                final int registeredPriority =
+                        registeredPriorities.getOrDefault(scheme, 
Integer.MIN_VALUE);
+                final int newPriority =
+                        
config.getOptional(CoreOptions.fileSystemFactoryPriority(scheme, className))
+                                .orElse(factory.getPriority());
+
+                LOG.info(
+                        "{} filesystem factory {} for scheme '{}' "
+                                + "with priority {} (highest registered 
priority: {})",
+                        newPriority >= registeredPriority ? "Registering" : 
"Skipping",
+                        className,
+                        scheme,
+                        newPriority,
+                        registeredPriority);
+                if (newPriority >= registeredPriority) {

Review Comment:
   If I understand correctly, we can have config like this to associate a 
Factory implementation with a priority
   fs.s3.priority.org.apache.flink.runtime.fs.hdfs.HadoopFsFactory: 1 
fs.s3.priority.org.apache.flink.runtime.fs.native.AwsNativeS3FileSystemFactory: 
-1
   I assume in these cases org.apache.flink.runtime.fs.hdfs.HadoopFsFactory and 
org.apache.flink.runtime.fs.native.AwsNativeS3FileSystemFactory are the 
implementations class names. 
   
   If you are specifying implementation class names in the config anyway, why 
not just indicate which class you want loaded in config . Isn't the priority 
mechanism superfluous? 
   
   It would seem the usual Flink way to solve this problem would be to have a 
constant like  : 
   `public static final String IDENTIFIER = "HadoopFS";`.  That name would be 
resolved by FactoryUtils type logic; using the jar manifest - like formats and 
connections do. This would allow you to target the implementation you want. I 
assume you do not want to have this as you want to do this at runtime with 
system wide configuration; can you confirm my understanding? 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to