izzyacademy commented on a change in pull request #16559:
URL: https://github.com/apache/flink/pull/16559#discussion_r682769816
##########
File path:
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AbstractAzureFSFactory.java
##########
@@ -75,18 +76,27 @@ public void configure(Configuration config) {
@Override
public FileSystem create(URI fsUri) throws IOException {
checkNotNull(fsUri, "passed file system URI object should not be
null");
- LOG.info("Trying to load and instantiate Azure File System");
+ LOG.info("Trying to load and instantiate Azure File System for {}",
fsUri);
return new HadoopFileSystem(createInitializedAzureFS(fsUri,
flinkConfig));
}
- // uri is of the form:
wasb(s)://[email protected]/testDir
+ // uri is of the form:
wasb(s)://[email protected]/testDir (or)
+ // abfs(s):////[email protected]/testDir
private org.apache.hadoop.fs.FileSystem createInitializedAzureFS(
URI fsUri, Configuration flinkConfig) throws IOException {
org.apache.hadoop.conf.Configuration hadoopConfig =
configLoader.getOrLoadHadoopConfig();
-
- org.apache.hadoop.fs.FileSystem azureFS = new NativeAzureFileSystem();
- azureFS.initialize(fsUri, hadoopConfig);
-
- return azureFS;
+ String scheme = fsUri.getScheme();
+
+ if (scheme.startsWith("wasb")) {
+ LOG.info("Trying to initialize hadoop filesystem for {}.", scheme);
+ org.apache.hadoop.fs.FileSystem azureFS = new
NativeAzureFileSystem();
+ azureFS.initialize(fsUri, hadoopConfig);
+ return azureFS;
+ } else {
Review comment:
Do we need to check if the scheme prefix starts with "abfs" here to
cover both the secure and none secure schemes without including other schemes
that are non Azure file systems?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]