Izeren commented on code in PR #27617:
URL: https://github.com/apache/flink/pull/27617#discussion_r2812382375
##########
flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java:
##########
@@ -338,14 +355,48 @@ public static void initialize(Configuration config,
@Nullable PluginManager plug
final List<FileSystemFactory> fileSystemFactories =
loadFileSystemFactories(factorySuppliers);
+ // Track registered priorities for factory selection
+ final HashMap<String, Integer> registeredPriorities = new
HashMap<>();
+
// configure all file system factories
for (FileSystemFactory factory : fileSystemFactories) {
factory.configure(config);
- String scheme = factory.getScheme();
+ final String scheme = factory.getScheme();
FileSystemFactory fsf =
ConnectionLimitingFactory.decorateIfLimited(factory,
scheme, config);
- FS_FACTORIES.put(scheme, fsf);
+
+ final String className = resolveFactoryClassName(factory);
+ final int registeredPriority =
+ registeredPriorities.getOrDefault(scheme,
Integer.MIN_VALUE);
+ final int newPriority =
+
config.getOptional(CoreOptions.fileSystemFactoryPriority(scheme, className))
+ .orElse(factory.getPriority());
+
+ LOG.info(
+ "{} filesystem factory {} for scheme '{}' "
+ + "with priority {} (highest registered
priority: {})",
+ newPriority >= registeredPriority ? "Registering" :
"Skipping",
+ className,
+ scheme,
+ newPriority,
+ registeredPriority);
+ if (newPriority >= registeredPriority) {
Review Comment:
Thank you for your feedback! I have answered your questions below.
> There is no write up for this. The Flip and Jira do not mention priority.
It is not yet there, I am discussing this in slack with @Samrat002. This PR
is my proposal to extend "migration strategy" in a way that would allow
packaging 2 jars for the same schema in the image in order to simplify
migration between 2 file systems in prod.
> So I assume that this will not be picked up unless -1 is specified in
config
That suggests that we would add new SDK native FS (describe in a FLIP) and
override `getPriority` method to `-1`. This way, even packaging 2 jars will
safely preserve stable FS in production unless you explicitly create config
override to swap to new FS. I made it this way to preserve safe defaults
> how to associate a file system with a priority
As per PR description, example usage:
`fs.s3.priority.org.apache.flink.runtime.fs.hdfs.HadoopFsFactory: 1`
specifying config like this would allow to override default priority (0) with
higher value. Let's say we have introduced new SDK native factory with default
priority -1. Then we can use the following config overwrite to switch it in
production:
`fs.s3.priority.org.apache.flink.runtime.fs.native.AwsNativeS3FileSystemFactory:
1`, without this config, it would still load hadoop (or whichever filesystem
was loaded in plugin).
> are we likely to have more that 2 matching schemes in you use case.
It is in principle possible if you want to do experimentation and test
multiple file systems at the same time. However, the primary purpose of this PR
is to support migration between stable and experimental file systems in
production.
> why it is be useful to have multiple priorities to justify the if
newPriority >= registeredPriority . Rather
than just targeting the required priority
I have found it more flexible because otherwise it is not obvious what
should be the target priority and why existing factories should not default to
it.
>is the concept of priority different from a version? Would it not be better
to just target a scheme version.
It is slightly different in the sense that priority allows me to load
factories for the same schema and the
same version of the schema without reconfiguring everything else that may
depend on the schema version.
>without the config; how do we determine which file system is the highest
priority - I assume this could be different for different users; this implies
that one file system scheme is inherently higher priority than another.
The primary use case is to distinguish priority between stable and
experimental filesystems. The proposal is to set the default filesystem
priority as 0 for all stable ones and -1 for experimental or deprecated.
##########
flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java:
##########
@@ -297,6 +299,21 @@ private static void initializeWithoutPlugins(Configuration
config)
initialize(config, null);
}
+ /**
+ * Returns a list of initialized {@link FileSystemFactory FS factories}.
+ *
+ * @return a snapshot of the currently registered file system factories
+ */
+ @Internal
+ public static List<FileSystemFactory> getInitializedFileSystems() {
+ LOCK.lock();
Review Comment:
Could you please elaborate? This method provides generic access to the
FS_FACTORIES values and is meant to be potentially used be external threads. If
we don't use lock, what would guarantee that this call wouldn't be "optimized"
by VM to return values that were at initialization (e.g. empty list)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]