Izeren commented on code in PR #27617:
URL: https://github.com/apache/flink/pull/27617#discussion_r2812938325


##########
flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java:
##########
@@ -338,14 +355,48 @@ public static void initialize(Configuration config, 
@Nullable PluginManager plug
             final List<FileSystemFactory> fileSystemFactories =
                     loadFileSystemFactories(factorySuppliers);
 
+            // Track registered priorities for factory selection
+            final HashMap<String, Integer> registeredPriorities = new 
HashMap<>();
+
             // configure all file system factories
             for (FileSystemFactory factory : fileSystemFactories) {
                 factory.configure(config);
-                String scheme = factory.getScheme();
+                final String scheme = factory.getScheme();
 
                 FileSystemFactory fsf =
                         ConnectionLimitingFactory.decorateIfLimited(factory, 
scheme, config);
-                FS_FACTORIES.put(scheme, fsf);
+
+                final String className = resolveFactoryClassName(factory);
+                final int registeredPriority =
+                        registeredPriorities.getOrDefault(scheme, 
Integer.MIN_VALUE);
+                final int newPriority =
+                        
config.getOptional(CoreOptions.fileSystemFactoryPriority(scheme, className))
+                                .orElse(factory.getPriority());
+
+                LOG.info(
+                        "{} filesystem factory {} for scheme '{}' "
+                                + "with priority {} (highest registered 
priority: {})",
+                        newPriority >= registeredPriority ? "Registering" : 
"Skipping",
+                        className,
+                        scheme,
+                        newPriority,
+                        registeredPriority);
+                if (newPriority >= registeredPriority) {

Review Comment:
   > fs.s3.priority.org.apache.flink.runtime.fs.hdfs.HadoopFsFactory: 1 
fs.s3.priority.org.apache.flink.runtime.fs.native.AwsNativeS3FileSystemFactory: 
-1
   
   This is correct, but I want to go a bit further. In this particular example 
I want it to be "safe default" that you "don't have to" configure when you 
first add the jar to the image. 
   
   Yes, config could be targeting the exact factory instead, but you would 
still need to have safe "priority" logic to choose between jars when config is 
not provided, and I chose keeping this logic transparent. Arguably you could 
ensure that correct config is always specified, but I trust it less for actual 
use in prod runtime.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to