rkhachatryan commented on code in PR #27617:
URL: https://github.com/apache/flink/pull/27617#discussion_r2812395664


##########
flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java:
##########
@@ -338,14 +355,48 @@ public static void initialize(Configuration config, 
@Nullable PluginManager plug
             final List<FileSystemFactory> fileSystemFactories =
                     loadFileSystemFactories(factorySuppliers);
 
+            // Track registered priorities for factory selection
+            final HashMap<String, Integer> registeredPriorities = new 
HashMap<>();
+
             // configure all file system factories
             for (FileSystemFactory factory : fileSystemFactories) {
                 factory.configure(config);
-                String scheme = factory.getScheme();
+                final String scheme = factory.getScheme();
 
                 FileSystemFactory fsf =
                         ConnectionLimitingFactory.decorateIfLimited(factory, 
scheme, config);
-                FS_FACTORIES.put(scheme, fsf);
+
+                final String className = resolveFactoryClassName(factory);
+                final int registeredPriority =
+                        registeredPriorities.getOrDefault(scheme, 
Integer.MIN_VALUE);
+                final int newPriority =
+                        
config.getOptional(CoreOptions.fileSystemFactoryPriority(scheme, className))
+                                .orElse(factory.getPriority());
+
+                LOG.info(
+                        "{} filesystem factory {} for scheme '{}' "
+                                + "with priority {} (highest registered 
priority: {})",
+                        newPriority >= registeredPriority ? "Registering" : 
"Skipping",
+                        className,
+                        scheme,
+                        newPriority,
+                        registeredPriority);
+                if (newPriority >= registeredPriority) {

Review Comment:
   We could probably add a section to 
https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/internals/filesystems/
 describing the behavior of this feature (and maybe how to add custom factories)
   
   WDYT?



##########
flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java:
##########
@@ -297,6 +299,21 @@ private static void initializeWithoutPlugins(Configuration 
config)
         initialize(config, null);
     }
 
+    /**
+     * Returns a list of initialized {@link FileSystemFactory FS factories}.
+     *
+     * @return a snapshot of the currently registered file system factories
+     */
+    @Internal
+    public static List<FileSystemFactory> getInitializedFileSystems() {
+        LOCK.lock();

Review Comment:
   I guess the writes to `FS_FACTORIES` in `initialize()` might not be visible 
here without the lock



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to