Izeren commented on code in PR #27617:
URL: https://github.com/apache/flink/pull/27617#discussion_r2812382375


##########
flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java:
##########
@@ -338,14 +355,48 @@ public static void initialize(Configuration config, 
@Nullable PluginManager plug
             final List<FileSystemFactory> fileSystemFactories =
                     loadFileSystemFactories(factorySuppliers);
 
+            // Track registered priorities for factory selection
+            final HashMap<String, Integer> registeredPriorities = new 
HashMap<>();
+
             // configure all file system factories
             for (FileSystemFactory factory : fileSystemFactories) {
                 factory.configure(config);
-                String scheme = factory.getScheme();
+                final String scheme = factory.getScheme();
 
                 FileSystemFactory fsf =
                         ConnectionLimitingFactory.decorateIfLimited(factory, 
scheme, config);
-                FS_FACTORIES.put(scheme, fsf);
+
+                final String className = resolveFactoryClassName(factory);
+                final int registeredPriority =
+                        registeredPriorities.getOrDefault(scheme, 
Integer.MIN_VALUE);
+                final int newPriority =
+                        
config.getOptional(CoreOptions.fileSystemFactoryPriority(scheme, className))
+                                .orElse(factory.getPriority());
+
+                LOG.info(
+                        "{} filesystem factory {} for scheme '{}' "
+                                + "with priority {} (highest registered 
priority: {})",
+                        newPriority >= registeredPriority ? "Registering" : 
"Skipping",
+                        className,
+                        scheme,
+                        newPriority,
+                        registeredPriority);
+                if (newPriority >= registeredPriority) {

Review Comment:
   Thank you for your feedback! I have answered your questions below. 
   
   > There is no write up for this. The Flip and Jira do not mention priority. 
   
   It is not yet there, I am discussing this in slack with @Samrat002. This PR 
is my proposal to extend "migration strategy" in a way that would allow 
packaging 2 jars for the same schema in the image in order to simplify 
migration between 2 file systems in prod. 
   
   > So I assume that this will not be picked up unless -1 is specified in 
config
   
   That suggests that we would add new SDK native FS (describe in a FLIP) and 
override `getPriority` method to `-1`. This way, even packaging 2 jars will 
safely preserve stable FS in production unless you explicitly create config 
override to swap to new FS. I made it this way to preserve safe defaults
   
   > how to associate a file system with a priority
   
   As per PR description, example usage: 
`fs.s3.priority.org.apache.flink.runtime.fs.hdfs.HadoopFsFactory: 1`  
specifying config like this would allow to override default priority (0) with 
higher value. Let's say we have introduced new SDK native factory with default 
priority -1. Then we can use the following config overwrite to switch it in 
production: 
`fs.s3.priority.org.apache.flink.runtime.fs.native.AwsNativeS3FileSystemFactory:
 1`, without this config, it would still load hadoop (or whichever filesystem 
was loaded in plugin).
   
   > are we likely to have more that 2 matching schemes in you use case.
   
   It is in principle possible if you want to do experimentation and test 
multiple file systems at the same time. However, the primary purpose of this PR 
is to support migration between stable and experimental file systems in 
production. 
   
   > why it is be useful to have multiple priorities to justify the if 
newPriority >= registeredPriority . Rather 
   than just targeting the required priority
   
   I have found it more flexible because otherwise it is not obvious what 
should be the target priority and why existing factories should not default to 
it. 
   
   >is the concept of priority different from a version? Would it not be better 
to just target a scheme version.
   
   It is slightly different in the sense that priority allows me to load 
factories for the same schema and the 
   same version of the schema without reconfiguring everything else that may 
depend on the schema version. 
   
   >without the config; how do we determine which file system is the highest 
priority - I assume this could be different for different users; this implies 
that one file system scheme is inherently higher priority than another.
   
   The primary use case is to distinguish priority between stable and 
experimental filesystems. The proposal is to set the default filesystem 
priority as 0 for all stable ones and -1 for experimental or deprecated. 
   



##########
flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java:
##########
@@ -297,6 +299,21 @@ private static void initializeWithoutPlugins(Configuration 
config)
         initialize(config, null);
     }
 
+    /**
+     * Returns a list of initialized {@link FileSystemFactory FS factories}.
+     *
+     * @return a snapshot of the currently registered file system factories
+     */
+    @Internal
+    public static List<FileSystemFactory> getInitializedFileSystems() {
+        LOCK.lock();

Review Comment:
   Could you please elaborate? This method provides generic access to the 
FS_FACTORIES values and is meant to be potentially used be external threads. If 
we don't use lock, what would guarantee that this call wouldn't be "optimized" 
by VM to return values that were at initialization (e.g. empty list)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to