davidradl commented on code in PR #27617:
URL: https://github.com/apache/flink/pull/27617#discussion_r2812612623
##########
flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java:
##########
@@ -338,14 +355,48 @@ public static void initialize(Configuration config,
@Nullable PluginManager plug
final List<FileSystemFactory> fileSystemFactories =
loadFileSystemFactories(factorySuppliers);
+ // Track registered priorities for factory selection
+ final HashMap<String, Integer> registeredPriorities = new
HashMap<>();
+
// configure all file system factories
for (FileSystemFactory factory : fileSystemFactories) {
factory.configure(config);
- String scheme = factory.getScheme();
+ final String scheme = factory.getScheme();
FileSystemFactory fsf =
ConnectionLimitingFactory.decorateIfLimited(factory,
scheme, config);
- FS_FACTORIES.put(scheme, fsf);
+
+ final String className = resolveFactoryClassName(factory);
+ final int registeredPriority =
+ registeredPriorities.getOrDefault(scheme,
Integer.MIN_VALUE);
+ final int newPriority =
+
config.getOptional(CoreOptions.fileSystemFactoryPriority(scheme, className))
+ .orElse(factory.getPriority());
+
+ LOG.info(
+ "{} filesystem factory {} for scheme '{}' "
+ + "with priority {} (highest registered
priority: {})",
+ newPriority >= registeredPriority ? "Registering" :
"Skipping",
+ className,
+ scheme,
+ newPriority,
+ registeredPriority);
+ if (newPriority >= registeredPriority) {
Review Comment:
If I understand correctly, we can have config like this to associate a
Factory implementation with a priority
fs.s3.priority.org.apache.flink.runtime.fs.hdfs.HadoopFsFactory: 1
fs.s3.priority.org.apache.flink.runtime.fs.native.AwsNativeS3FileSystemFactory:
-1
I assume in these cases org.apache.flink.runtime.fs.hdfs.HadoopFsFactory and
org.apache.flink.runtime.fs.native.AwsNativeS3FileSystemFactory are the
implementations class names.
If you are specifying implementation class names in the config anyway, why
not just indicate which class you want loaded in config . Isn't the priority
mechanism superfluous?
It would seem the usual Flink way to solve this problem would be to have a
constant like :
`public static final String IDENTIFIER = "HadoopFS";`. That name would be
resolved by FactoryUtils type logic; using the jar manifest - like formats and
connections do. This would allow you to target the implementation you want. I
assume you do not want to have this as you want to do this at runtime with
system wide configuration; can you confirm my understanding?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]