Samrat002 commented on code in PR #27617:
URL: https://github.com/apache/flink/pull/27617#discussion_r2813907923


##########
flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java:
##########
@@ -115,6 +124,67 @@ void testKnownFSWithoutPluginsAndException() {
         }
     }
 
+    private static final String PRIORITY_TEST_SCHEME = "priority-test";
+
+    static Stream<Arguments> prioritySelectionCases() {
+        return Stream.of(
+                Arguments.of(0, 0, DummyFsB.class),

Review Comment:
   I see a silent bug here, not sure if it holds true or not . 
   
   The (0, 0, DummyFsB) and (50, 50, DummyFsB) cases assert that DummyFsB wins 
on a tie. This relies on FactoryB being the last in iteration order (since the 
pluginManager is called with new FactoryA(), new FactoryB()). If the iteration 
order ever changes, these tests would break silently.
   
   



##########
flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java:
##########
@@ -567,4 +567,24 @@ public static ConfigOption<Long> 
fileSystemConnectionLimitStreamInactivityTimeou
                 .longType()
                 .defaultValue(0L);
     }
+
+    /**
+     * Explicitly resolves the conflict between multiple FileSystemFactory 
implementations when
+     * multiple jars are loaded for the same schema. Primary use is to allow 
configuration based
+     * migration between file systems without the need to build separate 
images.
+     *
+     * <p>Config key pattern: {@code fs.<scheme>.priority.<factoryClassName>}
+     */
+    public static ConfigOption<Integer> fileSystemFactoryPriority(String 
scheme, String className) {
+        return ConfigOptions.key("fs." + scheme + ".priority." + className)

Review Comment:
   The config key fs.<scheme>.priority.<factoryClassName> embeds a fully 
qualified class name into a dot-separated key hierarchy, e.g. 
fs.s3.priority.org.apache.flink.fs.s3.hadoop.S3FileSystemFactory.
   
   Have you been able to verify this works correctly with Flink's YAML config 
parser? 
   
   Some configuration parsers interpret each dot as a nesting level, which 
could cause issues. If so, would a different separator for the class name 
portion be safer. maybe  something like 
fs.s3.priority-class.org_apache_flink_...?



##########
flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java:
##########
@@ -567,4 +567,24 @@ public static ConfigOption<Long> 
fileSystemConnectionLimitStreamInactivityTimeou
                 .longType()
                 .defaultValue(0L);
     }
+
+    /**
+     * Explicitly resolves the conflict between multiple FileSystemFactory 
implementations when
+     * multiple jars are loaded for the same schema. Primary use is to allow 
configuration based

Review Comment:
   ```suggestion
        * multiple jars are loaded for the same scheme. Primary use is to allow 
configuration based
   ```



##########
flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java:
##########
@@ -338,14 +355,48 @@ public static void initialize(Configuration config, 
@Nullable PluginManager plug
             final List<FileSystemFactory> fileSystemFactories =
                     loadFileSystemFactories(factorySuppliers);
 
+            // Track registered priorities for factory selection
+            final HashMap<String, Integer> registeredPriorities = new 
HashMap<>();
+
             // configure all file system factories
             for (FileSystemFactory factory : fileSystemFactories) {
                 factory.configure(config);
-                String scheme = factory.getScheme();
+                final String scheme = factory.getScheme();
 
                 FileSystemFactory fsf =
                         ConnectionLimitingFactory.decorateIfLimited(factory, 
scheme, config);
-                FS_FACTORIES.put(scheme, fsf);
+
+                final String className = resolveFactoryClassName(factory);
+                final int registeredPriority =
+                        registeredPriorities.getOrDefault(scheme, 
Integer.MIN_VALUE);
+                final int newPriority =
+                        
config.getOptional(CoreOptions.fileSystemFactoryPriority(scheme, className))
+                                .orElse(factory.getPriority());
+
+                LOG.info(
+                        "{} filesystem factory {} for scheme '{}' "
+                                + "with priority {} (highest registered 
priority: {})",
+                        newPriority >= registeredPriority ? "Registering" : 
"Skipping",
+                        className,
+                        scheme,
+                        newPriority,
+                        registeredPriority);
+                if (newPriority >= registeredPriority) {

Review Comment:
   
   Have you considered using strict `>`. isn't `>=`  confusing here ? 



##########
flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java:
##########
@@ -297,6 +299,21 @@ private static void initializeWithoutPlugins(Configuration 
config)
         initialize(config, null);
     }
 
+    /**
+     * Returns a list of initialized {@link FileSystemFactory FS factories}.
+     *
+     * @return a snapshot of the currently registered file system factories
+     */
+    @Internal
+    public static List<FileSystemFactory> getInitializedFileSystems() {

Review Comment:
   ```suggestion
       public static List<FileSystemFactory> getRegisteredFileSystemFactories() 
{
   ```
   
   Nit: The method name getInitializedFileSystems is misleading. It returns 
FileSystemFactory instances, not FileSystem instances.



##########
flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java:
##########
@@ -115,6 +124,67 @@ void testKnownFSWithoutPluginsAndException() {
         }
     }
 
+    private static final String PRIORITY_TEST_SCHEME = "priority-test";
+
+    static Stream<Arguments> prioritySelectionCases() {

Review Comment:
   Consider adding a reverse-order test to verify that FactoryA wins the tie 
when it's last, confirming the behaviour is truly order-dependent.



##########
flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java:
##########
@@ -115,6 +124,67 @@ void testKnownFSWithoutPluginsAndException() {
         }
     }
 
+    private static final String PRIORITY_TEST_SCHEME = "priority-test";
+
+    static Stream<Arguments> prioritySelectionCases() {
+        return Stream.of(
+                Arguments.of(0, 0, DummyFsB.class),
+                Arguments.of(0, 100, DummyFsB.class),
+                Arguments.of(100, 0, DummyFsA.class),
+                Arguments.of(-10, 0, DummyFsB.class),
+                Arguments.of(50, 50, DummyFsB.class));
+    }
+
+    @ParameterizedTest
+    @MethodSource("prioritySelectionCases")
+    void shouldSelectFactoryByPriority(
+            final int firstPriority,
+            final int secondPriority,
+            final Class<? extends FileSystem> expectedFsClass)
+            throws Exception {
+        final Configuration config = new Configuration();
+        config.set(
+                CoreOptions.fileSystemFactoryPriority(
+                        PRIORITY_TEST_SCHEME, FactoryA.class.getName()),
+                firstPriority);
+        config.set(
+                CoreOptions.fileSystemFactoryPriority(
+                        PRIORITY_TEST_SCHEME, FactoryB.class.getName()),
+                secondPriority);
+
+        try {
+            FileSystem.initialize(config, pluginManager(new FactoryA(), new 
FactoryB()));
+
+            
assertThat(createAndUnwrap(PRIORITY_TEST_SCHEME)).isInstanceOf(expectedFsClass);
+        } finally {
+            FileSystem.initialize(new Configuration(), null);
+        }
+    }
+
+    @Test
+    void shouldRegisterSingleFactory() throws Exception {
+        try {
+            FileSystem.initialize(new Configuration(), pluginManager(new 
FactoryA()));
+
+            
assertThat(createAndUnwrap(PRIORITY_TEST_SCHEME)).isInstanceOf(DummyFsA.class);
+        } finally {
+            FileSystem.initialize(new Configuration(), null);
+        }
+    }
+
+    @Test
+    void shouldUseFactoryDeclaredPriorityWhenNoConfigSet() throws Exception {

Review Comment:
   Add test for reversed plugin order (pluginManager(new FactoryB(), new 
FactoryA())) that still asserts DummyFsB wins. This would confirm that the 
declared priority (FactoryA=-1 vs FactoryB=0) takes precedence over iteration 
order, regardless of which factory comes first.



##########
flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java:
##########
@@ -338,14 +355,48 @@ public static void initialize(Configuration config, 
@Nullable PluginManager plug
             final List<FileSystemFactory> fileSystemFactories =
                     loadFileSystemFactories(factorySuppliers);
 
+            // Track registered priorities for factory selection
+            final HashMap<String, Integer> registeredPriorities = new 
HashMap<>();

Review Comment:
   
   ```suggestion
               final Map<String, Integer> registeredPriorities = new 
HashMap<>();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to