Samrat002 commented on code in PR #27617:
URL: https://github.com/apache/flink/pull/27617#discussion_r2813907923
##########
flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java:
##########
@@ -115,6 +124,67 @@ void testKnownFSWithoutPluginsAndException() {
}
}
+ private static final String PRIORITY_TEST_SCHEME = "priority-test";
+
+ static Stream<Arguments> prioritySelectionCases() {
+ return Stream.of(
+ Arguments.of(0, 0, DummyFsB.class),
Review Comment:
I see a silent bug here, not sure if it holds true or not .
The (0, 0, DummyFsB) and (50, 50, DummyFsB) cases assert that DummyFsB wins
on a tie. This relies on FactoryB being the last in iteration order (since the
pluginManager is called with new FactoryA(), new FactoryB()). If the iteration
order ever changes, these tests would break silently.
##########
flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java:
##########
@@ -567,4 +567,24 @@ public static ConfigOption<Long>
fileSystemConnectionLimitStreamInactivityTimeou
.longType()
.defaultValue(0L);
}
+
+ /**
+ * Explicitly resolves the conflict between multiple FileSystemFactory
implementations when
+ * multiple jars are loaded for the same schema. Primary use is to allow
configuration based
+ * migration between file systems without the need to build separate
images.
+ *
+ * <p>Config key pattern: {@code fs.<scheme>.priority.<factoryClassName>}
+ */
+ public static ConfigOption<Integer> fileSystemFactoryPriority(String
scheme, String className) {
+ return ConfigOptions.key("fs." + scheme + ".priority." + className)
Review Comment:
The config key fs.<scheme>.priority.<factoryClassName> embeds a fully
qualified class name into a dot-separated key hierarchy, e.g.
fs.s3.priority.org.apache.flink.fs.s3.hadoop.S3FileSystemFactory.
Have you been able to verify this works correctly with Flink's YAML config
parser?
Some configuration parsers interpret each dot as a nesting level, which
could cause issues. If so, would a different separator for the class name
portion be safer. maybe something like
fs.s3.priority-class.org_apache_flink_...?
##########
flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java:
##########
@@ -567,4 +567,24 @@ public static ConfigOption<Long>
fileSystemConnectionLimitStreamInactivityTimeou
.longType()
.defaultValue(0L);
}
+
+ /**
+ * Explicitly resolves the conflict between multiple FileSystemFactory
implementations when
+ * multiple jars are loaded for the same schema. Primary use is to allow
configuration based
Review Comment:
```suggestion
* multiple jars are loaded for the same scheme. Primary use is to allow
configuration based
```
##########
flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java:
##########
@@ -338,14 +355,48 @@ public static void initialize(Configuration config,
@Nullable PluginManager plug
final List<FileSystemFactory> fileSystemFactories =
loadFileSystemFactories(factorySuppliers);
+ // Track registered priorities for factory selection
+ final HashMap<String, Integer> registeredPriorities = new
HashMap<>();
+
// configure all file system factories
for (FileSystemFactory factory : fileSystemFactories) {
factory.configure(config);
- String scheme = factory.getScheme();
+ final String scheme = factory.getScheme();
FileSystemFactory fsf =
ConnectionLimitingFactory.decorateIfLimited(factory,
scheme, config);
- FS_FACTORIES.put(scheme, fsf);
+
+ final String className = resolveFactoryClassName(factory);
+ final int registeredPriority =
+ registeredPriorities.getOrDefault(scheme,
Integer.MIN_VALUE);
+ final int newPriority =
+
config.getOptional(CoreOptions.fileSystemFactoryPriority(scheme, className))
+ .orElse(factory.getPriority());
+
+ LOG.info(
+ "{} filesystem factory {} for scheme '{}' "
+ + "with priority {} (highest registered
priority: {})",
+ newPriority >= registeredPriority ? "Registering" :
"Skipping",
+ className,
+ scheme,
+ newPriority,
+ registeredPriority);
+ if (newPriority >= registeredPriority) {
Review Comment:
Have you considered using strict `>`. isn't `>=` confusing here ?
##########
flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java:
##########
@@ -297,6 +299,21 @@ private static void initializeWithoutPlugins(Configuration
config)
initialize(config, null);
}
+ /**
+ * Returns a list of initialized {@link FileSystemFactory FS factories}.
+ *
+ * @return a snapshot of the currently registered file system factories
+ */
+ @Internal
+ public static List<FileSystemFactory> getInitializedFileSystems() {
Review Comment:
```suggestion
public static List<FileSystemFactory> getRegisteredFileSystemFactories()
{
```
Nit: The method name getInitializedFileSystems is misleading. It returns
FileSystemFactory instances, not FileSystem instances.
##########
flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java:
##########
@@ -115,6 +124,67 @@ void testKnownFSWithoutPluginsAndException() {
}
}
+ private static final String PRIORITY_TEST_SCHEME = "priority-test";
+
+ static Stream<Arguments> prioritySelectionCases() {
Review Comment:
Consider adding a reverse-order test to verify that FactoryA wins the tie
when it's last, confirming the behaviour is truly order-dependent.
##########
flink-core/src/test/java/org/apache/flink/core/fs/FileSystemTest.java:
##########
@@ -115,6 +124,67 @@ void testKnownFSWithoutPluginsAndException() {
}
}
+ private static final String PRIORITY_TEST_SCHEME = "priority-test";
+
+ static Stream<Arguments> prioritySelectionCases() {
+ return Stream.of(
+ Arguments.of(0, 0, DummyFsB.class),
+ Arguments.of(0, 100, DummyFsB.class),
+ Arguments.of(100, 0, DummyFsA.class),
+ Arguments.of(-10, 0, DummyFsB.class),
+ Arguments.of(50, 50, DummyFsB.class));
+ }
+
+ @ParameterizedTest
+ @MethodSource("prioritySelectionCases")
+ void shouldSelectFactoryByPriority(
+ final int firstPriority,
+ final int secondPriority,
+ final Class<? extends FileSystem> expectedFsClass)
+ throws Exception {
+ final Configuration config = new Configuration();
+ config.set(
+ CoreOptions.fileSystemFactoryPriority(
+ PRIORITY_TEST_SCHEME, FactoryA.class.getName()),
+ firstPriority);
+ config.set(
+ CoreOptions.fileSystemFactoryPriority(
+ PRIORITY_TEST_SCHEME, FactoryB.class.getName()),
+ secondPriority);
+
+ try {
+ FileSystem.initialize(config, pluginManager(new FactoryA(), new
FactoryB()));
+
+
assertThat(createAndUnwrap(PRIORITY_TEST_SCHEME)).isInstanceOf(expectedFsClass);
+ } finally {
+ FileSystem.initialize(new Configuration(), null);
+ }
+ }
+
+ @Test
+ void shouldRegisterSingleFactory() throws Exception {
+ try {
+ FileSystem.initialize(new Configuration(), pluginManager(new
FactoryA()));
+
+
assertThat(createAndUnwrap(PRIORITY_TEST_SCHEME)).isInstanceOf(DummyFsA.class);
+ } finally {
+ FileSystem.initialize(new Configuration(), null);
+ }
+ }
+
+ @Test
+ void shouldUseFactoryDeclaredPriorityWhenNoConfigSet() throws Exception {
Review Comment:
Add test for reversed plugin order (pluginManager(new FactoryB(), new
FactoryA())) that still asserts DummyFsB wins. This would confirm that the
declared priority (FactoryA=-1 vs FactoryB=0) takes precedence over iteration
order, regardless of which factory comes first.
##########
flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java:
##########
@@ -338,14 +355,48 @@ public static void initialize(Configuration config,
@Nullable PluginManager plug
final List<FileSystemFactory> fileSystemFactories =
loadFileSystemFactories(factorySuppliers);
+ // Track registered priorities for factory selection
+ final HashMap<String, Integer> registeredPriorities = new
HashMap<>();
Review Comment:
```suggestion
final Map<String, Integer> registeredPriorities = new
HashMap<>();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]