This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 23d159485 [fs/azure] Support Azure Blob Storage (#1941)
23d159485 is described below
commit 23d15948500b81370842f8c8dc039776418c99ff
Author: gkatzioura <[email protected]>
AuthorDate: Tue Jan 27 15:15:35 2026 +0000
[fs/azure] Support Azure Blob Storage (#1941)
---
fluss-filesystems/fluss-fs-azure/pom.xml | 234 ++++++++++++++++++++
.../fluss/fs/azure/AbfsFileSystemPlugin.java | 35 +++
.../fluss/fs/azure/AbfssFileSystemPlugin.java | 35 +++
.../org/apache/fluss/fs/azure/AzureFileSystem.java | 65 ++++++
.../fluss/fs/azure/AzureFileSystemOptions.java | 58 +++++
.../fluss/fs/azure/AzureFileSystemPlugin.java | 123 +++++++++++
.../fluss/fs/azure/WasbFileSystemPlugin.java | 35 +++
.../fluss/fs/azure/WasbsFileSystemPlugin.java | 35 +++
.../azure/token/AbfsDelegationTokenReceiver.java | 35 +++
.../azure/token/AbfssDelegationTokenReceiver.java | 35 +++
.../azure/token/AzureDelegationTokenProvider.java | 91 ++++++++
.../azure/token/AzureDelegationTokenReceiver.java | 89 ++++++++
.../DynamicTemporaryAzureCredentialsProvider.java | 82 +++++++
.../azure/token/WasbDelegationTokenReceiver.java | 34 +++
.../azure/token/WasbsDelegationTokenReceiver.java | 35 +++
.../src/main/resources/META-INF/NOTICE | 80 +++++++
.../META-INF/licenses/LICENSE.animal-sniffer | 21 ++
.../resources/META-INF/licenses/LICENSE.bcprov | 18 ++
.../META-INF/licenses/LICENSE.checker-qual | 22 ++
.../resources/META-INF/licenses/LICENSE.dnsjava | 30 +++
.../META-INF/licenses/LICENSE.jakarta.activation | 28 +++
.../main/resources/META-INF/licenses/LICENSE.jaxb | 135 ++++++++++++
.../main/resources/META-INF/licenses/LICENSE.re2j | 32 +++
.../resources/META-INF/licenses/LICENSE.stax2api | 22 ++
.../services/org.apache.fluss.fs.FileSystemPlugin | 20 ++
...org.apache.fluss.fs.token.SecurityTokenReceiver | 20 ++
.../fs/azure/AbfsFileSystemBehaviorITCase.java | 82 +++++++
.../fluss/fs/azure/AzureFileSystemPluginTest.java | 124 +++++++++++
.../apache/fluss/fs/azure/AzureFileSystemTest.java | 49 +++++
.../apache/fluss/fs/azure/MemoryFileSystem.java | 241 +++++++++++++++++++++
.../token/AbfsDelegationTokenReceiverTest.java | 32 +++
.../token/AbfssDelegationTokenReceiverTest.java | 32 +++
.../fluss/fs/azure/token/AuthServerHandler.java | 134 ++++++++++++
.../token/AzureDelegationTokenProviderTest.java | 70 ++++++
.../token/AzureDelegationTokenReceiverTest.java | 88 ++++++++
...namicTemporaryAzureCredentialsProviderTest.java | 77 +++++++
.../fluss/fs/azure/token/MockAuthServer.java | 89 ++++++++
.../token/WasbDelegationTokenReceiverTest.java | 32 +++
.../token/WasbsDelegationTokenReceiverTest.java | 32 +++
.../src/test/resources/create-token.json | 5 +
fluss-filesystems/pom.xml | 1 +
41 files changed, 2537 insertions(+)
diff --git a/fluss-filesystems/fluss-fs-azure/pom.xml
b/fluss-filesystems/fluss-fs-azure/pom.xml
new file mode 100644
index 000000000..13084da2e
--- /dev/null
+++ b/fluss-filesystems/fluss-fs-azure/pom.xml
@@ -0,0 +1,234 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.fluss</groupId>
+ <artifactId>fluss-filesystems</artifactId>
+ <version>0.9-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>fluss-fs-azure</artifactId>
+ <name>Fluss : FileSystems : Azure FS</name>
+
+ <packaging>jar</packaging>
+
+ <properties>
+ <fs.azure.sdk.version>3.3.4</fs.azure.sdk.version>
+ <fs.azure.api.version>1.16.0</fs.azure.api.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.fluss</groupId>
+ <artifactId>fluss-common</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- The Hadoop file system adapter classes (bundled) -->
+ <dependency>
+ <groupId>org.apache.fluss</groupId>
+ <artifactId>fluss-fs-hadoop</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.fluss</groupId>
+ <artifactId>fluss-fs-hadoop-shaded</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-azure</artifactId>
+ <version>${fs.hadoopshaded.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.microsoft.azure</groupId>
+ <artifactId>azure</artifactId>
+ </exclusion>
+ <exclusion>
+ <!-- provided by fluss-fs-hadoop-shaded -->
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>ch.qos.reload4j</groupId>
+ <artifactId>reload4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-reload4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.microsoft.azure</groupId>
+ <artifactId>azure</artifactId>
+ <version>${fs.azure.api.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <!-- for the behavior test suite -->
+ <dependency>
+ <groupId>org.apache.fluss</groupId>
+ <artifactId>fluss-common</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.fluss</groupId>
+ <artifactId>fluss-test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <archive>
+ <manifestEntries>
+ <!-- jaxb-api is packaged as an optional
dependency that is only accessible on Java 11 -->
+ <Multi-Release>true</Multi-Release>
+ </manifestEntries>
+ </archive>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-javax-jars</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>copy</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <artifactItems>
+ <artifactItem>
+ <groupId>javax.xml.bind</groupId>
+ <artifactId>jaxb-api</artifactId>
+ <version>${jaxb.api.version}</version>
+ <type>jar</type>
+ <overWrite>true</overWrite>
+ </artifactItem>
+ </artifactItems>
+
<outputDirectory>${project.build.directory}/temporary</outputDirectory>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>unpack-javax-libraries</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <target>
+ <echo message="unpacking javax jars"/>
+ <unzip
dest="${project.build.directory}/classes/META-INF/versions/11">
+ <fileset
dir="${project.build.directory}/temporary">
+ <include name="*"/>
+ </fileset>
+ </unzip>
+ </target>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade-fluss</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <artifactSet>
+ <includes>
+ <include>*:*</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>*</artifact>
+ <excludes>
+ <exclude>.gitkeep</exclude>
+ <exclude>mime.types</exclude>
+ <exclude>mozilla/**</exclude>
+ <exclude>META-INF/maven/**</exclude>
+ <exclude>META-INF/LICENSE.txt</exclude>
+ </excludes>
+ </filter>
+ <filter>
+
<artifact>org.apache.fluss:fluss-fs-hadoop</artifact>
+ <excludes>
+ <exclude>META-INF/**</exclude>
+ </excludes>
+ </filter>
+ <filter>
+ <artifact>*</artifact>
+ <excludes>
+ <exclude>properties.dtd</exclude>
+ <exclude>PropertyList-1.0.dtd</exclude>
+
<exclude>META-INF/services/javax.xml.stream.*</exclude>
+ <exclude>META-INF/LICENSE.txt</exclude>
+ <!-- Remove files tripping CI
forbidden GPL regex in transitive deps -->
+
<exclude>com/sun/xml/bind/**/Messages.properties</exclude>
+
<exclude>com/sun/jersey/json/impl/impl.properties</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git
a/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/AbfsFileSystemPlugin.java
b/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/AbfsFileSystemPlugin.java
new file mode 100644
index 000000000..dce6b0b1d
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/AbfsFileSystemPlugin.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.azure;
+
+/**
+ * FileSystem plugin for Azure Blob Storage using the ABFS (Azure Blob File
System) driver.
+ * Registered for the {@code abfs://} scheme.
+ *
+ * <p>ABFS is the recommended driver for accessing Azure Data Lake Storage
Gen2. Use this scheme for
+ * non-SSL connections to ADLS Gen2 storage accounts.
+ *
+ * <p>URI format: {@code
abfs://<container>@<storage-account>.dfs.core.windows.net/<path>}
+ */
+public class AbfsFileSystemPlugin extends AzureFileSystemPlugin {
+
+ @Override
+ public String getScheme() {
+ return "abfs";
+ }
+}
diff --git
a/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/AbfssFileSystemPlugin.java
b/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/AbfssFileSystemPlugin.java
new file mode 100644
index 000000000..e441b4f3e
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/AbfssFileSystemPlugin.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.azure;
+
+/**
+ * FileSystem plugin for Azure Blob Storage using the ABFS driver with SSL/TLS
encryption.
+ * Registered for the {@code abfss://} scheme.
+ *
+ * <p>This is the secure (SSL-enabled) variant of ABFS, recommended for
production use with Azure
+ * Data Lake Storage Gen2.
+ *
+ * <p>URI format: {@code
abfss://<container>@<storage-account>.dfs.core.windows.net/<path>}
+ */
+public class AbfssFileSystemPlugin extends AzureFileSystemPlugin {
+
+ @Override
+ public String getScheme() {
+ return "abfss";
+ }
+}
diff --git
a/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/AzureFileSystem.java
b/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/AzureFileSystem.java
new file mode 100644
index 000000000..e87f0bd3c
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/AzureFileSystem.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.azure;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.fs.FileSystem;
+import org.apache.fluss.fs.azure.token.AzureDelegationTokenProvider;
+import org.apache.fluss.fs.hdfs.HadoopFileSystem;
+import org.apache.fluss.fs.token.ObtainedSecurityToken;
+
+import java.io.IOException;
+
+/**
+ * Implementation of the Fluss {@link FileSystem} interface for Azure Blob
Storage. This class
+ * implements the common behavior implemented directly by Fluss and delegates
common calls to an
+ * implementation of Hadoop's filesystem abstraction.
+ */
+public class AzureFileSystem extends HadoopFileSystem {
+
+ private final String scheme;
+ private final Configuration conf;
+
+ private volatile AzureDelegationTokenProvider delegationTokenProvider;
+
+ /**
+ * Wraps the given Hadoop File System object as a Fluss File System
object. The given Hadoop
+ * file system object is expected to be initialized already.
+ *
+ * @param hadoopFileSystem The Hadoop FileSystem that will be used under
the hood.
+ */
+ public AzureFileSystem(
+ String scheme, org.apache.hadoop.fs.FileSystem hadoopFileSystem,
Configuration conf) {
+ super(hadoopFileSystem);
+ this.scheme = scheme;
+ this.conf = conf;
+ }
+
+ @Override
+ public ObtainedSecurityToken obtainSecurityToken() throws IOException {
+ if (delegationTokenProvider == null) {
+ synchronized (this) {
+ if (delegationTokenProvider == null) {
+ delegationTokenProvider = new
AzureDelegationTokenProvider(scheme, conf);
+ }
+ }
+ }
+
+ return delegationTokenProvider.obtainSecurityToken();
+ }
+}
diff --git
a/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/AzureFileSystemOptions.java
b/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/AzureFileSystemOptions.java
new file mode 100644
index 000000000..ace3976e5
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/AzureFileSystemOptions.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.azure;
+
+import org.apache.fluss.annotation.PublicEvolving;
+import org.apache.fluss.config.ConfigOption;
+
+import static org.apache.fluss.config.ConfigBuilder.key;
+
+/** Config options for Azure FileSystem. */
+@PublicEvolving
+public class AzureFileSystemOptions {
+
+ public static final ConfigOption<String> ACCOUNT_KEY =
+ key("fs.azure.account.key")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The Azure storage account key.");
+
+ public static final ConfigOption<String> CLIENT_ID =
+ key("fs.azure.account.oauth2.client.id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The Azure OAuth2 client ID.");
+
+ public static final ConfigOption<String> CLIENT_SECRET =
+ key("fs.azure.account.oauth2.client.secret")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The Azure OAuth2 client secret.");
+
+ public static final ConfigOption<String> ENDPOINT_KEY =
+ key("fs.azure.account.oauth2.client.endpoint")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The Azure OAuth2 client endpoint.");
+
+ public static final ConfigOption<String> PROVIDER_CONFIG_NAME =
+ key("fs.azure.account.oauth.provider.type")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The Azure OAuth provider type.");
+}
diff --git
a/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/AzureFileSystemPlugin.java
b/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/AzureFileSystemPlugin.java
new file mode 100644
index 000000000..318198fe3
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/AzureFileSystemPlugin.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.azure;
+
+import org.apache.fluss.config.ConfigBuilder;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.fs.FileSystem;
+import org.apache.fluss.fs.FileSystemPlugin;
+import org.apache.fluss.fs.azure.token.AbfsDelegationTokenReceiver;
+import org.apache.fluss.fs.azure.token.AbfssDelegationTokenReceiver;
+import org.apache.fluss.fs.azure.token.WasbDelegationTokenReceiver;
+import org.apache.fluss.fs.azure.token.WasbsDelegationTokenReceiver;
+
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Objects;
+
+import static org.apache.fluss.fs.azure.AzureFileSystemOptions.ACCOUNT_KEY;
+import static org.apache.fluss.fs.azure.AzureFileSystemOptions.CLIENT_ID;
+import static
org.apache.fluss.fs.azure.AzureFileSystemOptions.PROVIDER_CONFIG_NAME;
+
+/**
+ * Abstract factory for creating Azure Blob Storage file systems. Supports
multiple URI schemes
+ * (abfs, abfss, wasb, wasbs) based on Azure HDFS support in the hadoop-azure
module.
+ */
+abstract class AzureFileSystemPlugin implements FileSystemPlugin {
+ private static final String[] FLUSS_CONFIG_PREFIXES = {"fs.azure."};
+
+ private static final String HADOOP_CONFIG_PREFIX = "fs.azure.";
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AbfsFileSystemPlugin.class);
+
+ @Override
+ public FileSystem create(URI fsUri, Configuration flussConfig) throws
IOException {
+ org.apache.hadoop.conf.Configuration hadoopConfig =
getHadoopConfiguration(flussConfig);
+
+ setCredentialProvider(hadoopConfig);
+
+ // create the Azure Hadoop FileSystem
+ org.apache.hadoop.fs.FileSystem fs = new AzureBlobFileSystem();
+ fs.initialize(getInitURI(fsUri, hadoopConfig), hadoopConfig);
+ return new AzureFileSystem(getScheme(), fs, flussConfig);
+ }
+
+ private void setCredentialProvider(org.apache.hadoop.conf.Configuration
hadoopConfig) {
+ if (hadoopConfig.get(ACCOUNT_KEY.key()) == null) {
+ if (Objects.equals(getScheme(), "abfs")) {
+ AbfsDelegationTokenReceiver.updateHadoopConfig(hadoopConfig);
+ } else if (Objects.equals(getScheme(), "abfss")) {
+ AbfssDelegationTokenReceiver.updateHadoopConfig(hadoopConfig);
+ } else if (Objects.equals(getScheme(), "wasb")) {
+ WasbDelegationTokenReceiver.updateHadoopConfig(hadoopConfig);
+ } else if (Objects.equals(getScheme(), "wasbs")) {
+ WasbsDelegationTokenReceiver.updateHadoopConfig(hadoopConfig);
+ } else {
+ throw new IllegalArgumentException("Unsupported scheme: " +
getScheme());
+ }
+ LOG.info(
+ "{} is not set, using credential provider {}.",
+ CLIENT_ID.key(),
+ hadoopConfig.get(PROVIDER_CONFIG_NAME.key()));
+ } else {
+ LOG.info("{} is set, using provided account key.",
ACCOUNT_KEY.key());
+ }
+ }
+
+ org.apache.hadoop.conf.Configuration getHadoopConfiguration(Configuration
flussConfig) {
+ org.apache.hadoop.conf.Configuration conf = new
org.apache.hadoop.conf.Configuration();
+ if (flussConfig == null) {
+ return conf;
+ }
+
+ for (String key : flussConfig.keySet()) {
+ for (String prefix : FLUSS_CONFIG_PREFIXES) {
+ if (key.startsWith(prefix)) {
+ String newKey = HADOOP_CONFIG_PREFIX +
key.substring(prefix.length());
+ String newValue =
+ flussConfig.getString(
+
ConfigBuilder.key(key).stringType().noDefaultValue(), null);
+ conf.set(newKey, newValue);
+
+ LOG.debug(
+ "Adding Fluss config entry for {} as {} to Hadoop
config", key, newKey);
+ }
+ }
+ }
+ return conf;
+ }
+
+ private URI getInitURI(URI fsUri, org.apache.hadoop.conf.Configuration
hadoopConfig) {
+ final String scheme = fsUri.getScheme();
+ final String authority = fsUri.getAuthority();
+
+ if (scheme == null && authority == null) {
+ fsUri =
org.apache.hadoop.fs.FileSystem.getDefaultUri(hadoopConfig);
+ } else if (scheme != null && authority == null) {
+ URI defaultUri =
org.apache.hadoop.fs.FileSystem.getDefaultUri(hadoopConfig);
+ if (scheme.equals(defaultUri.getScheme()) &&
defaultUri.getAuthority() != null) {
+ fsUri = defaultUri;
+ }
+ }
+ return fsUri;
+ }
+}
diff --git
a/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/WasbFileSystemPlugin.java
b/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/WasbFileSystemPlugin.java
new file mode 100644
index 000000000..0a81ed45b
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/WasbFileSystemPlugin.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.azure;
+
+/**
+ * FileSystem plugin for Azure Blob Storage using the WASB (Windows Azure
Storage Blob) driver.
+ * Registered for the {@code wasb://} scheme.
+ *
+ * <p>WASB is the legacy driver for accessing Azure Blob Storage. Consider
using ABFS for new
+ * deployments as it provides better performance and security features.
+ *
+ * <p>URI format: {@code
wasb://<container>@<storage-account>.blob.core.windows.net/<path>}
+ */
+public class WasbFileSystemPlugin extends AzureFileSystemPlugin {
+
+ @Override
+ public String getScheme() {
+ return "wasb";
+ }
+}
diff --git
a/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/WasbsFileSystemPlugin.java
b/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/WasbsFileSystemPlugin.java
new file mode 100644
index 000000000..1d38bf756
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/WasbsFileSystemPlugin.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.azure;
+
+/**
+ * FileSystem plugin for Azure Blob Storage using the WASB driver with SSL/TLS
encryption.
+ * Registered for the {@code wasbs://} scheme.
+ *
+ * <p>This is the secure (SSL-enabled) variant of WASB for legacy Azure Blob
Storage access. For new
+ * deployments, consider using {@code abfss://} with Azure Data Lake Storage
Gen2.
+ *
+ * <p>URI format: {@code
wasbs://<container>@<storage-account>.blob.core.windows.net/<path>}
+ */
+public class WasbsFileSystemPlugin extends AzureFileSystemPlugin {
+
+ @Override
+ public String getScheme() {
+ return "wasbs";
+ }
+}
diff --git
a/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/token/AbfsDelegationTokenReceiver.java
b/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/token/AbfsDelegationTokenReceiver.java
new file mode 100644
index 000000000..cae2b70ac
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/token/AbfsDelegationTokenReceiver.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.azure.token;
+
+/**
+ * SecurityTokenReceiver for Azure Blob Storage using the ABFS (Azure Blob
File System) driver.
+ * Registered for the {@code abfs://} scheme.
+ *
+ * <p>ABFS is the recommended driver for accessing Azure Data Lake Storage
Gen2. Use this scheme for
+ * non-SSL connections to ADLS Gen2 storage accounts.
+ *
+ * <p>URI format: {@code
abfs://<container>@<storage-account>.dfs.core.windows.net/<path>}
+ */
+public class AbfsDelegationTokenReceiver extends AzureDelegationTokenReceiver {
+
+ @Override
+ public String scheme() {
+ return "abfs";
+ }
+}
diff --git
a/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/token/AbfssDelegationTokenReceiver.java
b/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/token/AbfssDelegationTokenReceiver.java
new file mode 100644
index 000000000..83e7f7297
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/token/AbfssDelegationTokenReceiver.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.azure.token;
+
+/**
+ * SecurityTokenReceiver for Azure Blob Storage using the ABFS driver with
SSL/TLS encryption.
+ * Registered for the {@code abfss://} scheme.
+ *
+ * <p>This is the secure (SSL-enabled) variant of ABFS, recommended for
production use with Azure
+ * Data Lake Storage Gen2.
+ *
+ * <p>URI format: {@code
abfss://<container>@<storage-account>.dfs.core.windows.net/<path>}
+ */
+public class AbfssDelegationTokenReceiver extends AzureDelegationTokenReceiver
{
+
+ @Override
+ public String scheme() {
+ return "abfss";
+ }
+}
diff --git
a/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenProvider.java
b/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenProvider.java
new file mode 100644
index 000000000..fd47595e1
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenProvider.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.azure.token;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.fs.token.CredentialsJsonSerde;
+import org.apache.fluss.fs.token.ObtainedSecurityToken;
+
+import org.apache.hadoop.fs.azurebfs.oauth2.AzureADAuthenticator;
+import org.apache.hadoop.fs.azurebfs.oauth2.AzureADToken;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.fluss.fs.azure.AzureFileSystemOptions.CLIENT_ID;
+import static org.apache.fluss.fs.azure.AzureFileSystemOptions.CLIENT_SECRET;
+import static org.apache.fluss.fs.azure.AzureFileSystemOptions.ENDPOINT_KEY;
+
+/** Token provider for abfs Hadoop filesystems. */
+public class AzureDelegationTokenProvider {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AzureDelegationTokenProvider.class);
+
+ private final String scheme;
+ private final String clientId;
+ private final String clientSecret;
+
+ private final String authEndpoint;
+ private final Map<String, String> additionInfos;
+
+ public AzureDelegationTokenProvider(String scheme, Configuration conf) {
+ this.scheme = scheme;
+
+ this.clientId = conf.get(CLIENT_ID);
+ this.clientSecret = conf.get(CLIENT_SECRET);
+ this.authEndpoint = conf.get(ENDPOINT_KEY);
+ this.additionInfos = new HashMap<>();
+
+ LOG.info("Setting the endpoint key " + ENDPOINT_KEY.key());
+
+ if (conf.get(ENDPOINT_KEY) != null) {
+ additionInfos.put(ENDPOINT_KEY.key(), conf.get(ENDPOINT_KEY));
+ }
+ }
+
+ public ObtainedSecurityToken obtainSecurityToken() {
+ LOG.info("Obtaining session credentials token with access key: {}",
clientId);
+
+ try {
+ AzureADToken azureADToken =
+ AzureADAuthenticator.getTokenUsingClientCreds(
+ this.authEndpoint, this.clientId,
this.clientSecret);
+
+ LOG.info(
+ "Session credentials obtained successfully with
expiration: {}",
+ azureADToken.getExpiry());
+
+ return new ObtainedSecurityToken(
+ scheme,
+ toJson(azureADToken),
+ azureADToken.getExpiry().getTime(),
+ additionInfos);
+ } catch (Exception e) {
+ throw new FlussRuntimeException("Failed to obtain session
credentials token", e);
+ }
+ }
+
+ private byte[] toJson(AzureADToken accessToken) {
+ org.apache.fluss.fs.token.Credentials flussCredentials =
+ new org.apache.fluss.fs.token.Credentials(null, null,
accessToken.getAccessToken());
+ return CredentialsJsonSerde.toJson(flussCredentials);
+ }
+}
diff --git
a/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenReceiver.java
b/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenReceiver.java
new file mode 100644
index 000000000..f59444e83
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenReceiver.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.azure.token;
+
+import org.apache.fluss.fs.token.Credentials;
+import org.apache.fluss.fs.token.CredentialsJsonSerde;
+import org.apache.fluss.fs.token.ObtainedSecurityToken;
+import org.apache.fluss.fs.token.SecurityTokenReceiver;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+import static
org.apache.fluss.fs.azure.AzureFileSystemOptions.PROVIDER_CONFIG_NAME;
+
+/** Security token receiver for the abfs filesystem. */
+public abstract class AzureDelegationTokenReceiver implements
SecurityTokenReceiver {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AzureDelegationTokenReceiver.class);
+
+ static volatile Credentials credentials;
+ static volatile Long validUntil;
+ static volatile Map<String, String> additionInfos;
+
+ public static void updateHadoopConfig(org.apache.hadoop.conf.Configuration
hadoopConfig) {
+ LOG.info("Updating Hadoop configuration");
+
+ String providers = hadoopConfig.get(PROVIDER_CONFIG_NAME.key(), "");
+
+ if
(!providers.contains(DynamicTemporaryAzureCredentialsProvider.NAME)) {
+ if (providers.isEmpty()) {
+ LOG.debug("Setting provider");
+ providers = DynamicTemporaryAzureCredentialsProvider.NAME;
+ } else {
+ providers = DynamicTemporaryAzureCredentialsProvider.NAME +
"," + providers;
+ LOG.debug("Prepending provider, new providers value: {}",
providers);
+ }
+ hadoopConfig.set(PROVIDER_CONFIG_NAME.key(), providers);
+ } else {
+ LOG.debug("Provider already exists");
+ }
+
+ // then, set addition info
+ if (additionInfos == null) {
+ // if addition info is null, it also means we have not received
any token,
+ // we throw IllegalStateException
+ throw new
IllegalStateException(DynamicTemporaryAzureCredentialsProvider.COMPONENT);
+ } else {
+ for (Map.Entry<String, String> entry : additionInfos.entrySet()) {
+ hadoopConfig.set(entry.getKey(), entry.getValue());
+ }
+ }
+
+ LOG.info("Updated Hadoop configuration successfully");
+ }
+
+ @Override
+ public void onNewTokensObtained(ObtainedSecurityToken token) {
+ LOG.info("Updating session credentials");
+
+ byte[] tokenBytes = token.getToken();
+
+ credentials = CredentialsJsonSerde.fromJson(tokenBytes);
+ additionInfos = token.getAdditionInfos();
+ validUntil = token.getValidUntil().orElse(null);
+
+ LOG.debug("Session credentials updated successfully using with
securityToken");
+ }
+
+ public static Credentials getCredentials() {
+ return credentials;
+ }
+}
diff --git
a/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/token/DynamicTemporaryAzureCredentialsProvider.java
b/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/token/DynamicTemporaryAzureCredentialsProvider.java
new file mode 100644
index 000000000..39dbd9cad
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/token/DynamicTemporaryAzureCredentialsProvider.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.azure.token;
+
+import org.apache.fluss.fs.token.Credentials;
+
+import org.apache.hadoop.conf.Configuration;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.TokenAccessProviderException;
+import org.apache.hadoop.fs.azurebfs.extensions.CustomTokenProviderAdaptee;
+import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
+import org.apache.hadoop.fs.azurebfs.oauth2.AzureADToken;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Date;
+
+/**
+ * Support dynamic token for authenticating with Azure. Please note that users
may reference this
+ * class name from configuration property
fs.azure.account.oauth.provider.type. Therefore, changing
+ * the class name would be a backward-incompatible change. This credential
provider must not fail in
+ * creation because that will break a chain of credential providers.
+ */
+public class DynamicTemporaryAzureCredentialsProvider extends
AccessTokenProvider
+ implements CustomTokenProviderAdaptee {
+
+ public static final String NAME =
DynamicTemporaryAzureCredentialsProvider.class.getName();
+
+ public static final String COMPONENT = "Dynamic session credentials for
Fluss";
+
+ private static final Logger LOG =
+
LoggerFactory.getLogger(DynamicTemporaryAzureCredentialsProvider.class);
+
+ @Override
+ public void initialize(Configuration configuration, String s) throws
IOException {}
+
+ @Override
+ public String getAccessToken() throws IOException {
+ return getToken().getAccessToken();
+ }
+
+ @Override
+ public Date getExpiryTime() {
+ try {
+ return getToken().getExpiry();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected AzureADToken refreshToken() throws IOException {
+ Credentials credentials =
AzureDelegationTokenReceiver.getCredentials();
+ Long validUntil = AzureDelegationTokenReceiver.validUntil;
+
+ if (credentials == null) {
+ throw new TokenAccessProviderException(COMPONENT);
+ }
+
+ LOG.debug("Providing session credentials");
+
+ AzureADToken azureADToken = new AzureADToken();
+ azureADToken.setAccessToken(credentials.getSecurityToken());
+ azureADToken.setExpiry(new Date(validUntil));
+ return azureADToken;
+ }
+}
diff --git
a/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/token/WasbDelegationTokenReceiver.java
b/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/token/WasbDelegationTokenReceiver.java
new file mode 100644
index 000000000..aed48ce0b
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/token/WasbDelegationTokenReceiver.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.azure.token;
+
+/**
+ * SecurityTokenReceiver for Azure Blob Storage using the WASB (Windows Azure
Storage Blob) driver.
+ * Registered for the {@code wasb://} scheme.
+ *
+ * <p>WASB is the legacy driver for accessing Azure Blob Storage. Consider
using ABFS for new
+ * deployments as it provides better performance and security features.
+ *
+ * <p>URI format: {@code
wasb://<container>@<storage-account>.blob.core.windows.net/<path>}
+ */
+public class WasbDelegationTokenReceiver extends AzureDelegationTokenReceiver {
+
+ public String scheme() {
+ return "wasb";
+ }
+}
diff --git
a/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/token/WasbsDelegationTokenReceiver.java
b/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/token/WasbsDelegationTokenReceiver.java
new file mode 100644
index 000000000..f791d7960
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/token/WasbsDelegationTokenReceiver.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.azure.token;
+
+/**
+ * SecurityTokenReceiver for Azure Blob Storage using the WASB driver with
SSL/TLS encryption.
+ * Registered for the {@code wasbs://} scheme.
+ *
+ * <p>This is the secure (SSL-enabled) variant of WASB for legacy Azure Blob
Storage access. For new
+ * deployments, consider using {@code abfss://} with Azure Data Lake Storage
Gen2.
+ *
+ * <p>URI format: {@code
wasbs://<container>@<storage-account>.blob.core.windows.net/<path>}
+ */
+public class WasbsDelegationTokenReceiver extends AzureDelegationTokenReceiver
{
+
+ @Override
+ public String scheme() {
+ return "wasbs";
+ }
+}
diff --git
a/fluss-filesystems/fluss-fs-azure/src/main/resources/META-INF/NOTICE
b/fluss-filesystems/fluss-fs-azure/src/main/resources/META-INF/NOTICE
new file mode 100644
index 000000000..13d9703dd
--- /dev/null
+++ b/fluss-filesystems/fluss-fs-azure/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,80 @@
+fluss-fs-azure
+Copyright 2025-2026 The Apache Software Foundation
+
+This project bundles the following dependencies under the Apache Software
License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt).
+
+- com.fasterxml.jackson.core:jackson-annotations:2.15.3
+- com.fasterxml.jackson.core:jackson-core:2.15.3
+- com.fasterxml.jackson.core:jackson-databind:2.15.3
+- com.fasterxml.woodstox:woodstox-core:5.4.0
+- com.google.guava:failureaccess:1.0
+- com.google.guava:guava:27.0-jre
+- com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava
+- com.google.j2objc:j2objc-annotations:1.1
+- com.microsoft.azure:azure-keyvault-core:1.0.0
+- com.microsoft.azure:azure-storage:7.0.1
+- commons-beanutils:commons-beanutils:1.9.4
+- commons-codec:commons-codec:1.11
+- commons-collections:commons-collections:3.2.2
+- commons-io:commons-io:2.14.0
+- commons-logging:commons-logging:1.2
+- io.dropwizard.metrics:metrics-core:3.2.4
+- io.netty:netty-buffer:4.1.100.Final
+- io.netty:netty-codec:4.1.100.Final
+- io.netty:netty-common:4.1.100.Final
+- io.netty:netty-handler:4.1.100.Final
+- io.netty:netty-resolver:4.1.100.Final
+- io.netty:netty-transport:4.1.100.Final
+- io.netty:netty-transport-classes-epoll:4.1.100.Final
+- io.netty:netty-transport-native-epoll:4.1.100.Final
+- io.netty:netty-transport-native-unix-common:4.1.100.Final
+- org.apache.commons:commons-compress:1.24.0
+- org.apache.commons:commons-configuration2:2.8.0
+- org.apache.commons:commons-lang3:3.18.0
+- org.apache.commons:commons-text:1.10.0
+- org.apache.hadoop.thirdparty:hadoop-shaded-guava:1.2.0
+- org.apache.hadoop.thirdparty:hadoop-shaded-protobuf_3_21:1.2.0
+- org.apache.hadoop:hadoop-annotations:3.4.0
+- org.apache.hadoop:hadoop-auth:3.4.0
+- org.apache.hadoop:hadoop-azure:3.4.0
+- org.apache.hadoop:hadoop-common:3.4.0
+- org.apache.kerby:kerb-core:2.0.3
+- org.apache.kerby:kerby-asn1:2.0.3
+- org.apache.kerby:kerby-pkix:2.0.3
+- org.apache.kerby:kerby-util:2.0.3
+- org.apache.httpcomponents:httpclient:4.5.13
+- org.apache.httpcomponents:httpcore:4.4.13
+- org.codehaus.jettison:jettison:1.5.4
+- org.wildfly.openssl:wildfly-openssl:1.1.3.Final
+- org.xerial.snappy:snappy-java:1.1.10.4
+
+This project bundles the following dependencies under the BSD license. See
bundled license files for details.
+
+- dnsjava:dnsjava:3.4.0
+- org.codehaus.woodstox:stax2-api:4.2.1
+
+This project bundles the following dependencies under the Go License
(https://golang.org/LICENSE). See bundled license files for details.
+
+- com.google.re2j:re2j:1.1
+
+This project bundles the following dependencies under the CDDL 1.1 license.
+See bundled license files for details.
+
+- javax.xml.bind:jaxb-api:2.3.1
+
+This project bundles the following dependencies under the Eclipse Distribution
License (EDL) 1.0 (https://www.eclipse.org/org/documents/edl-v10.php). See
bundled license files for details.
+
+- jakarta.activation:jakarta.activation-api:1.2.1
+
+This project bundles the following dependencies under the MIT License
(https://opensource.org/license/mit). See bundled license files for details.
+
+- org.checkerframework:checker-qual:2.5.2
+- org.codehaus.mojo:animal-sniffer-annotations:1.17
+- org.bouncycastle:bcprov-jdk15on:1.70
+
+This project bundles the following dependencies under the Eclipse Public
License 2.0 and Apache License 2.0 (dual license). See bundled license files
for details.
+- EPL-2.0: https://www.eclipse.org/legal/epl-2.0/
+- Apache-2.0: http://www.apache.org/licenses/LICENSE-2.0.txt
+
+- org.eclipse.jetty:jetty-util-ajax:9.4.53.v20231009
+- org.eclipse.jetty:jetty-util:9.4.53.v20231009
\ No newline at end of file
diff --git
a/fluss-filesystems/fluss-fs-azure/src/main/resources/META-INF/licenses/LICENSE.animal-sniffer
b/fluss-filesystems/fluss-fs-azure/src/main/resources/META-INF/licenses/LICENSE.animal-sniffer
new file mode 100644
index 000000000..2062eb88b
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/main/resources/META-INF/licenses/LICENSE.animal-sniffer
@@ -0,0 +1,21 @@
+The MIT License
+
+Copyright (c) 2009 codehaus.org.
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
\ No newline at end of file
diff --git
a/fluss-filesystems/fluss-fs-azure/src/main/resources/META-INF/licenses/LICENSE.bcprov
b/fluss-filesystems/fluss-fs-azure/src/main/resources/META-INF/licenses/LICENSE.bcprov
new file mode 100644
index 000000000..66357f78d
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/main/resources/META-INF/licenses/LICENSE.bcprov
@@ -0,0 +1,18 @@
+Please note the Bouncy Caste License should be read in the same way as the MIT
license.
+
+Please also note this licensing model is made possible through funding from
donations and the sale of support contracts.
+
+Bouncy Castle License
+Copyright (c) 2000 - 2024 The Legion of the Bouncy Castle Inc.
(https://www.bouncycastle.org)
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
+
+Third party licenses
+The OpenPGP library and the MLS library both make use of additional open
source code:
+
+openpgp - includes modified BZIP2 library which is licensed under the Apache
Software License, Version 2.0.
+MLS - The MLS Client makes use of io.grpc licensed under Apache Software
License, Version 2.0, and com.google.protobuf which is licensed under the
3-Clause BSD License.
\ No newline at end of file
diff --git
a/fluss-filesystems/fluss-fs-azure/src/main/resources/META-INF/licenses/LICENSE.checker-qual
b/fluss-filesystems/fluss-fs-azure/src/main/resources/META-INF/licenses/LICENSE.checker-qual
new file mode 100644
index 000000000..7b59b5c98
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/main/resources/META-INF/licenses/LICENSE.checker-qual
@@ -0,0 +1,22 @@
+Checker Framework qualifiers
+Copyright 2004-present by the Checker Framework developers
+
+MIT License:
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
\ No newline at end of file
diff --git
a/fluss-filesystems/fluss-fs-azure/src/main/resources/META-INF/licenses/LICENSE.dnsjava
b/fluss-filesystems/fluss-fs-azure/src/main/resources/META-INF/licenses/LICENSE.dnsjava
new file mode 100644
index 000000000..8daf3fc25
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/main/resources/META-INF/licenses/LICENSE.dnsjava
@@ -0,0 +1,30 @@
+Copyright (c) 1998-2019, Brian Wellington
+Copyright (c) 2005 VeriSign. All rights reserved.
+Copyright (c) 2019-2021, dnsjava authors
+
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+1. Redistributions of source code must retain the above copyright notice, this
+ list of conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+3. Neither the name of the copyright holder nor the names of its
+ contributors may be used to endorse or promote products derived from
+ this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
\ No newline at end of file
diff --git
a/fluss-filesystems/fluss-fs-azure/src/main/resources/META-INF/licenses/LICENSE.jakarta.activation
b/fluss-filesystems/fluss-fs-azure/src/main/resources/META-INF/licenses/LICENSE.jakarta.activation
new file mode 100644
index 000000000..0dea72127
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/main/resources/META-INF/licenses/LICENSE.jakarta.activation
@@ -0,0 +1,28 @@
+Copyright (c) 2018 Oracle and/or its affiliates. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions
+are met:
+
+ - Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+
+ - Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+
+ - Neither the name of the Eclipse Foundation, Inc. nor the names of its
+ contributors may be used to endorse or promote products derived
+ from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
+IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git
a/fluss-filesystems/fluss-fs-azure/src/main/resources/META-INF/licenses/LICENSE.jaxb
b/fluss-filesystems/fluss-fs-azure/src/main/resources/META-INF/licenses/LICENSE.jaxb
new file mode 100644
index 000000000..fd16ea954
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/main/resources/META-INF/licenses/LICENSE.jaxb
@@ -0,0 +1,135 @@
+COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL)Version 1.1
+
+1. Definitions.
+
+ 1.1. "Contributor" means each individual or entity that creates or
contributes to the creation of Modifications.
+
+ 1.2. "Contributor Version" means the combination of the Original
Software, prior Modifications used by a Contributor (if any), and the
Modifications made by that particular Contributor.
+
+ 1.3. "Covered Software" means (a) the Original Software, or (b)
Modifications, or (c) the combination of files containing Original Software
with files containing Modifications, in each case including portions thereof.
+
+ 1.4. "Executable" means the Covered Software in any form other than
Source Code.
+
+ 1.5. "Initial Developer" means the individual or entity that first makes
Original Software available under this License.
+
+ 1.6. "Larger Work" means a work which combines Covered Software or
portions thereof with code not governed by the terms of this License.
+
+ 1.7. "License" means this document.
+
+ 1.8. "Licensable" means having the right to grant, to the maximum extent
possible, whether at the time of the initial grant or subsequently acquired,
any and all of the rights conveyed herein.
+
+ 1.9. "Modifications" means the Source Code and Executable form of any of
the following:
+
+ A. Any file that results from an addition to, deletion from or
modification of the contents of a file containing Original Software or previous
Modifications;
+
+ B. Any new file that contains any part of the Original Software or
previous Modification; or
+
+ C. Any new file that is contributed or otherwise made available under the
terms of this License.
+
+ 1.10. "Original Software" means the Source Code and Executable form of
computer software code that is originally released under this License.
+
+ 1.11. "Patent Claims" means any patent claim(s), now owned or hereafter
acquired, including without limitation, method, process, and apparatus claims,
in any patent Licensable by grantor.
+
+ 1.12. "Source Code" means (a) the common form of computer software code
in which modifications are made and (b) associated documentation included in or
with such code.
+
+ 1.13. "You" (or "Your") means an individual or a legal entity exercising
rights under, and complying with all of the terms of, this License. For legal
entities, "You" includes any entity which controls, is controlled by, or is
under common control with You. For purposes of this definition, "control" means
(a) the power, direct or indirect, to cause the direction or management of such
entity, whether by contract or otherwise, or (b) ownership of more than fifty
percent (50%) of the o [...]
+
+2. License Grants.
+
+ 2.1. The Initial Developer Grant.
+
+ Conditioned upon Your compliance with Section 3.1 below and subject to
third party intellectual property claims, the Initial Developer hereby grants
You a world-wide, royalty-free, non-exclusive license:
+
+ (a) under intellectual property rights (other than patent or trademark)
Licensable by Initial Developer, to use, reproduce, modify, display, perform,
sublicense and distribute the Original Software (or portions thereof), with or
without Modifications, and/or as part of a Larger Work; and
+
+ (b) under Patent Claims infringed by the making, using or selling of
Original Software, to make, have made, use, practice, sell, and offer for sale,
and/or otherwise dispose of the Original Software (or portions thereof).
+
+ (c) The licenses granted in Sections 2.1(a) and (b) are effective on the
date Initial Developer first distributes or otherwise makes the Original
Software available to a third party under the terms of this License.
+
+ (d) Notwithstanding Section 2.1(b) above, no patent license is granted:
(1) for code that You delete from the Original Software, or (2) for
infringements caused by: (i) the modification of the Original Software, or (ii)
the combination of the Original Software with other software or devices.
+
+ 2.2. Contributor Grant.
+
+ Conditioned upon Your compliance with Section 3.1 below and subject to
third party intellectual property claims, each Contributor hereby grants You a
world-wide, royalty-free, non-exclusive license:
+
+ (a) under intellectual property rights (other than patent or trademark)
Licensable by Contributor to use, reproduce, modify, display, perform,
sublicense and distribute the Modifications created by such Contributor (or
portions thereof), either on an unmodified basis, with other Modifications, as
Covered Software and/or as part of a Larger Work; and
+
+ (b) under Patent Claims infringed by the making, using, or selling of
Modifications made by that Contributor either alone and/or in combination with
its Contributor Version (or portions of such combination), to make, use, sell,
offer for sale, have made, and/or otherwise dispose of: (1) Modifications made
by that Contributor (or portions thereof); and (2) the combination of
Modifications made by that Contributor with its Contributor Version (or
portions of such combination).
+
+ (c) The licenses granted in Sections 2.2(a) and 2.2(b) are effective on
the date Contributor first distributes or otherwise makes the Modifications
available to a third party.
+
+ (d) Notwithstanding Section 2.2(b) above, no patent license is granted:
(1) for any code that Contributor has deleted from the Contributor Version; (2)
for infringements caused by: (i) third party modifications of Contributor
Version, or (ii) the combination of Modifications made by that Contributor with
other software (except as part of the Contributor Version) or other devices; or
(3) under Patent Claims infringed by Covered Software in the absence of
Modifications made by that Co [...]
+
+3. Distribution Obligations.
+
+ 3.1. Availability of Source Code.
+
+ Any Covered Software that You distribute or otherwise make available in
Executable form must also be made available in Source Code form and that Source
Code form must be distributed only under the terms of this License. You must
include a copy of this License with every copy of the Source Code form of the
Covered Software You distribute or otherwise make available. You must inform
recipients of any such Covered Software in Executable form as to how they can
obtain such Covered Softw [...]
+
+ 3.2. Modifications.
+
+ The Modifications that You create or to which You contribute are governed
by the terms of this License. You represent that You believe Your Modifications
are Your original creation(s) and/or You have sufficient rights to grant the
rights conveyed by this License.
+
+ 3.3. Required Notices.
+
+ You must include a notice in each of Your Modifications that identifies
You as the Contributor of the Modification. You may not remove or alter any
copyright, patent or trademark notices contained within the Covered Software,
or any notices of licensing or any descriptive text giving attribution to any
Contributor or the Initial Developer.
+
+ 3.4. Application of Additional Terms.
+
+ You may not offer or impose any terms on any Covered Software in Source
Code form that alters or restricts the applicable version of this License or
the recipients' rights hereunder. You may choose to offer, and to charge a fee
for, warranty, support, indemnity or liability obligations to one or more
recipients of Covered Software. However, you may do so only on Your own behalf,
and not on behalf of the Initial Developer or any Contributor. You must make it
absolutely clear that any [...]
+
+ 3.5. Distribution of Executable Versions.
+
+ You may distribute the Executable form of the Covered Software under the
terms of this License or under the terms of a license of Your choice, which may
contain terms different from this License, provided that You are in compliance
with the terms of this License and that the license for the Executable form
does not attempt to limit or alter the recipient's rights in the Source Code
form from the rights set forth in this License. If You distribute the Covered
Software in Executable f [...]
+
+ 3.6. Larger Works.
+
+ You may create a Larger Work by combining Covered Software with other
code not governed by the terms of this License and distribute the Larger Work
as a single product. In such a case, You must make sure the requirements of
this License are fulfilled for the Covered Software.
+
+4. Versions of the License.
+
+ 4.1. New Versions.
+
+ Oracle is the initial license steward and may publish revised and/or new
versions of this License from time to time. Each version will be given a
distinguishing version number. Except as provided in Section 4.3, no one other
than the license steward has the right to modify this License.
+
+ 4.2. Effect of New Versions.
+
+ You may always continue to use, distribute or otherwise make the Covered
Software available under the terms of the version of the License under which
You originally received the Covered Software. If the Initial Developer includes
a notice in the Original Software prohibiting it from being distributed or
otherwise made available under any subsequent version of the License, You must
distribute and make the Covered Software available under the terms of the
version of the License under [...]
+
+ 4.3. Modified Versions.
+
+ When You are an Initial Developer and You want to create a new license
for Your Original Software, You may create and use a modified version of this
License if You: (a) rename the license and remove any references to the name of
the license steward (except to note that the license differs from this
License); and (b) otherwise make it clear that the license contains terms which
differ from this License.
+
+5. DISCLAIMER OF WARRANTY.
+
+ COVERED SOFTWARE IS PROVIDED UNDER THIS LICENSE ON AN "AS IS" BASIS,
WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, WITHOUT
LIMITATION, WARRANTIES THAT THE COVERED SOFTWARE IS FREE OF DEFECTS,
MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE OR NON-INFRINGING. THE ENTIRE RISK
AS TO THE QUALITY AND PERFORMANCE OF THE COVERED SOFTWARE IS WITH YOU. SHOULD
ANY COVERED SOFTWARE PROVE DEFECTIVE IN ANY RESPECT, YOU (NOT THE INITIAL
DEVELOPER OR ANY OTHER CONTRIBUTOR) ASSUM [...]
+
+6. TERMINATION.
+
+ 6.1. This License and the rights granted hereunder will terminate
automatically if You fail to comply with terms herein and fail to cure such
breach within 30 days of becoming aware of the breach. Provisions which, by
their nature, must remain in effect beyond the termination of this License
shall survive.
+
+ 6.2. If You assert a patent infringement claim (excluding declaratory
judgment actions) against Initial Developer or a Contributor (the Initial
Developer or Contributor against whom You assert such claim is referred to as
"Participant") alleging that the Participant Software (meaning the Contributor
Version where the Participant is a Contributor or the Original Software where
the Participant is the Initial Developer) directly or indirectly infringes any
patent, then any and all righ [...]
+
+ 6.3. If You assert a patent infringement claim against Participant
alleging that the Participant Software directly or indirectly infringes any
patent where such claim is resolved (such as by license or settlement) prior to
the initiation of patent infringement litigation, then the reasonable value of
the licenses granted by such Participant under Sections 2.1 or 2.2 shall be
taken into account in determining the amount or value of any payment or license.
+
+ 6.4. In the event of termination under Sections 6.1 or 6.2 above, all end
user licenses that have been validly granted by You or any distributor
hereunder prior to termination (excluding licenses granted to You by any
distributor) shall survive termination.
+
+7. LIMITATION OF LIABILITY.
+
+ UNDER NO CIRCUMSTANCES AND UNDER NO LEGAL THEORY, WHETHER TORT (INCLUDING
NEGLIGENCE), CONTRACT, OR OTHERWISE, SHALL YOU, THE INITIAL DEVELOPER, ANY
OTHER CONTRIBUTOR, OR ANY DISTRIBUTOR OF COVERED SOFTWARE, OR ANY SUPPLIER OF
ANY OF SUCH PARTIES, BE LIABLE TO ANY PERSON FOR ANY INDIRECT, SPECIAL,
INCIDENTAL, OR CONSEQUENTIAL DAMAGES OF ANY CHARACTER INCLUDING, WITHOUT
LIMITATION, DAMAGES FOR LOSS OF GOODWILL, WORK STOPPAGE, COMPUTER FAILURE OR
MALFUNCTION, OR ANY AND ALL OTHER COMM [...]
+
+8. U.S. GOVERNMENT END USERS.
+
+ The Covered Software is a "commercial item," as that term is defined in
48 C.F.R. 2.101 (Oct. 1995), consisting of "commercial computer software" (as
that term is defined at 48 C.F.R. ? 252.227-7014(a)(1)) and "commercial
computer software documentation" as such terms are used in 48 C.F.R. 12.212
(Sept. 1995). Consistent with 48 C.F.R. 12.212 and 48 C.F.R. 227.7202-1 through
227.7202-4 (June 1995), all U.S. Government End Users acquire Covered Software
with only those rights set for [...]
+
+9. MISCELLANEOUS.
+
+ This License represents the complete agreement concerning subject matter
hereof. If any provision of this License is held to be unenforceable, such
provision shall be reformed only to the extent necessary to make it
enforceable. This License shall be governed by the law of the jurisdiction
specified in a notice contained within the Original Software (except to the
extent applicable law, if any, provides otherwise), excluding such
jurisdiction's conflict-of-law provisions. Any litiga [...]
+
+10. RESPONSIBILITY FOR CLAIMS.
+
+ As between Initial Developer and the Contributors, each party is
responsible for claims and damages arising, directly or indirectly, out of its
utilization of rights under this License and You agree to work with Initial
Developer and Contributors to distribute such responsibility on an equitable
basis. Nothing herein is intended or shall be deemed to constitute any
admission of liability.
+
+----------
+NOTICE PURSUANT TO SECTION 9 OF THE COMMON DEVELOPMENT AND DISTRIBUTION
LICENSE (CDDL)
+The code released under the CDDL shall be governed by the laws of the State of
California (excluding conflict-of-law provisions). Any litigation relating to
this License shall be subject to the jurisdiction of the Federal Courts of the
Northern District of California and the state courts of the State of
California, with venue lying in Santa Clara County, California.
diff --git
a/fluss-filesystems/fluss-fs-azure/src/main/resources/META-INF/licenses/LICENSE.re2j
b/fluss-filesystems/fluss-fs-azure/src/main/resources/META-INF/licenses/LICENSE.re2j
new file mode 100644
index 000000000..b620ae68f
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/main/resources/META-INF/licenses/LICENSE.re2j
@@ -0,0 +1,32 @@
+This is a work derived from Russ Cox's RE2 in Go, whose license
+http://golang.org/LICENSE is as follows:
+
+Copyright (c) 2009 The Go Authors. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+
+ * Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in
+ the documentation and/or other materials provided with the
+ distribution.
+
+ * Neither the name of Google Inc. nor the names of its contributors
+ may be used to endorse or promote products derived from this
+ software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git
a/fluss-filesystems/fluss-fs-azure/src/main/resources/META-INF/licenses/LICENSE.stax2api
b/fluss-filesystems/fluss-fs-azure/src/main/resources/META-INF/licenses/LICENSE.stax2api
new file mode 100644
index 000000000..0ed636169
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/main/resources/META-INF/licenses/LICENSE.stax2api
@@ -0,0 +1,22 @@
+Copyright woodstox stax2api contributors.
+
+Redistribution and use in source and binary forms, with or without
modification,
+are permitted provided that the following conditions are met:
+
+1. Redistributions of source code must retain the above copyright notice,
+ this list of conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED.
+IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY
DIRECT,
+INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA,
+OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY,
+WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
OTHERWISE)
+ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+POSSIBILITY OF SUCH DAMAGE.
diff --git
a/fluss-filesystems/fluss-fs-azure/src/main/resources/META-INF/services/org.apache.fluss.fs.FileSystemPlugin
b/fluss-filesystems/fluss-fs-azure/src/main/resources/META-INF/services/org.apache.fluss.fs.FileSystemPlugin
new file mode 100644
index 000000000..36090b54e
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/main/resources/META-INF/services/org.apache.fluss.fs.FileSystemPlugin
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.fluss.fs.azure.AbfsFileSystemPlugin
+org.apache.fluss.fs.azure.AbfssFileSystemPlugin
+org.apache.fluss.fs.azure.WasbFileSystemPlugin
+org.apache.fluss.fs.azure.WasbsFileSystemPlugin
diff --git
a/fluss-filesystems/fluss-fs-azure/src/main/resources/META-INF/services/org.apache.fluss.fs.token.SecurityTokenReceiver
b/fluss-filesystems/fluss-fs-azure/src/main/resources/META-INF/services/org.apache.fluss.fs.token.SecurityTokenReceiver
new file mode 100644
index 000000000..62ce94a4b
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/main/resources/META-INF/services/org.apache.fluss.fs.token.SecurityTokenReceiver
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.fluss.fs.azure.token.AbfsDelegationTokenReceiver
+org.apache.fluss.fs.azure.token.AbfssDelegationTokenReceiver
+org.apache.fluss.fs.azure.token.WasbDelegationTokenReceiver
+org.apache.fluss.fs.azure.token.WasbsDelegationTokenReceiver
\ No newline at end of file
diff --git
a/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AbfsFileSystemBehaviorITCase.java
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AbfsFileSystemBehaviorITCase.java
new file mode 100644
index 000000000..41d5846cc
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AbfsFileSystemBehaviorITCase.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.azure;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.fs.FileSystem;
+import org.apache.fluss.fs.FileSystemBehaviorTestSuite;
+import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.fs.azure.token.MockAuthServer;
+
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+
+import java.io.IOException;
+import java.net.URI;
+
+/** Tests that validate the behavior of the Azure File System Plugin. */
+class AbfsFileSystemBehaviorITCase extends FileSystemBehaviorTestSuite {
+
+ private static final String CONFIG_PREFIX = "fs.azure.account";
+ private static final String CLIENT_ID = "testClientId";
+ private static final String CLIENT_SECRET = "testClientSecret";
+
+ private static final String AZURE_ACCOUNT_KEY = "ZmFrZS1rZXkK";
+ private static final String ENDPOINT_KEY = "http://localhost:8080";
+ public static final String ABFS_FS_PATH =
"abfs://[email protected]/test";
+
+ private static MockAuthServer mockAuthServer;
+
+ @BeforeAll
+ static void setup() {
+ mockAuthServer = MockAuthServer.create();
+ final Configuration configuration = new Configuration();
+ configuration.setString(CONFIG_PREFIX + ".oauth2.client.id",
CLIENT_ID);
+ configuration.setString(CONFIG_PREFIX + ".oauth2.client.secret",
CLIENT_SECRET);
+ configuration.setString(CONFIG_PREFIX + ".oauth2.client.endpoint",
ENDPOINT_KEY);
+ configuration.setString(CONFIG_PREFIX + ".key", AZURE_ACCOUNT_KEY);
+ FileSystem.initialize(configuration, null);
+ }
+
+ @Override
+ protected FileSystem getFileSystem() throws IOException {
+ return getBasePath().getFileSystem();
+ }
+
+ @Override
+ protected FsPath getBasePath() throws IOException {
+ FsPath fsPath = new FsPath(ABFS_FS_PATH);
+ applyMockStorage(fsPath.getFileSystem());
+ return fsPath;
+ }
+
+ private static void applyMockStorage(FileSystem fileSystem) throws
IOException {
+ try {
+ MemoryFileSystem memoryFileSystem = new
MemoryFileSystem(URI.create(ABFS_FS_PATH));
+ FieldUtils.writeField(fileSystem, "fs", memoryFileSystem, true);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @AfterAll
+ static void tearDown() throws IOException {
+ mockAuthServer.close();
+ }
+}
diff --git
a/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AzureFileSystemPluginTest.java
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AzureFileSystemPluginTest.java
new file mode 100644
index 000000000..4a25bb4bd
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AzureFileSystemPluginTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.azure;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.fs.azure.token.AbfsDelegationTokenReceiver;
+import org.apache.fluss.fs.token.Credentials;
+import org.apache.fluss.fs.token.CredentialsJsonSerde;
+import org.apache.fluss.fs.token.ObtainedSecurityToken;
+
+import org.junit.jupiter.api.Test;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.fluss.fs.azure.AzureFileSystemOptions.ACCOUNT_KEY;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link AzureFileSystemPlugin}. */
+public class AzureFileSystemPluginTest {
+
+ @Test
+ void testGetHadoopConfiguration() {
+ AzureFileSystemPlugin plugin = new AbfsFileSystemPlugin();
+ Configuration flussConfig = new Configuration();
+ flussConfig.setString("fs.azure.some.prop", "some-value");
+ flussConfig.setString("other.prop", "other-value");
+
+ org.apache.hadoop.conf.Configuration hadoopConfig =
+ plugin.getHadoopConfiguration(flussConfig);
+
+
assertThat(hadoopConfig.get("fs.azure.some.prop")).isEqualTo("some-value");
+ assertThat(hadoopConfig.get("other.prop")).isNull();
+ }
+
+ @Test
+ void testGetHadoopConfigurationNull() {
+ AzureFileSystemPlugin plugin = new AbfsFileSystemPlugin();
+ org.apache.hadoop.conf.Configuration hadoopConfig =
plugin.getHadoopConfiguration(null);
+ assertThat(hadoopConfig).isNotNull();
+ }
+
+ @Test
+ void testCreateWithAccountKey() throws Exception {
+ AzureFileSystemPlugin plugin = new AbfsFileSystemPlugin();
+ Configuration flussConfig = new Configuration();
+ flussConfig.setString(ACCOUNT_KEY.key(), "some-key");
+
+ // This will try to initialize AzureBlobFileSystem which might fail in
some environments
+ // but we want to check if it reaches the right logic.
+ // Actually, AzureBlobFileSystem.initialize might fail because it
tries to parse the URI.
+
+ URI uri = new URI("abfs://[email protected]/");
+ // We don't necessarily need to call create() if we can test the
private methods or if they
+ // are called.
+ // Since they are private, we call create().
+
+ try {
+ plugin.create(uri, flussConfig);
+ } catch (Exception e) {
+ // expected or ignored, we just want coverage
+ }
+ }
+
+ @Test
+ void testCreateWithoutAccountKey() throws Exception {
+ AzureFileSystemPlugin plugin = new AbfsFileSystemPlugin();
+ Configuration flussConfig = new Configuration();
+
+ // Prepare credentials so updateHadoopConfig doesn't throw
IllegalStateException
+ Credentials credentials = new Credentials("id", "secret", "token");
+ Map<String, String> additionInfos = new HashMap<>();
+ additionInfos.put("some", "info");
+ ObtainedSecurityToken token =
+ new ObtainedSecurityToken(
+ "abfs", CredentialsJsonSerde.toJson(credentials),
100L, additionInfos);
+ new AbfsDelegationTokenReceiver().onNewTokensObtained(token);
+
+ URI uri = new URI("abfs://[email protected]/");
+ try {
+ plugin.create(uri, flussConfig);
+ } catch (Exception e) {
+ // expected or ignored
+ }
+ }
+
+ @Test
+ void testUnsupportedScheme() {
+ AzureFileSystemPlugin plugin =
+ new AzureFileSystemPlugin() {
+ @Override
+ public String getScheme() {
+ return "unsupported";
+ }
+ };
+
+ Configuration flussConfig = new Configuration();
+ org.apache.hadoop.conf.Configuration hadoopConfig =
+ new org.apache.hadoop.conf.Configuration();
+
+ // Accessing setCredentialProvider via reflection or by making it
package-private.
+ // In the code it is private. Let's see if we can trigger it via
create.
+ assertThatThrownBy(() -> plugin.create(new URI("unsupported://foo"),
flussConfig))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Unsupported scheme: unsupported");
+ }
+}
diff --git
a/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AzureFileSystemTest.java
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AzureFileSystemTest.java
new file mode 100644
index 000000000..399478275
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AzureFileSystemTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.azure;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.FlussRuntimeException;
+
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link AzureFileSystem}. */
+public class AzureFileSystemTest {
+
+ @Test
+ void testObtainSecurityToken() {
+ Configuration conf = new Configuration();
+ AzureFileSystem fs = new AzureFileSystem("abfs", new
LocalFileSystem(), conf);
+
+ assertThatThrownBy(fs::obtainSecurityToken)
+ .isInstanceOf(FlussRuntimeException.class)
+ .hasMessageContaining("Failed to obtain session credentials
token");
+ }
+
+ @Test
+ void testConstructor() {
+ Configuration conf = new Configuration();
+ LocalFileSystem localFs = new LocalFileSystem();
+ AzureFileSystem fs = new AzureFileSystem("abfs", localFs, conf);
+ assertThat(fs).isNotNull();
+ }
+}
diff --git
a/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/MemoryFileSystem.java
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/MemoryFileSystem.java
new file mode 100644
index 000000000..f8e3a05e1
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/MemoryFileSystem.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.azure;
+
+import org.apache.fluss.utils.MapUtils;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.ByteArrayOutputStream;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/** Util file system abstraction. */
+public class MemoryFileSystem extends FileSystem {
+
+ private final URI uri;
+ private final Map<Path, byte[]> files = MapUtils.newConcurrentHashMap();
+ private final Set<Path> directories =
+ Collections.newSetFromMap(MapUtils.newConcurrentHashMap());
+
+ public MemoryFileSystem(URI uri) {
+ this.uri = uri;
+ }
+
+ @Override
+ public boolean exists(Path f) throws IOException {
+ return files.containsKey(f) || directories.contains(f);
+ }
+
+ @Override
+ public URI getUri() {
+ return uri;
+ }
+
+ @Override
+ public FSDataInputStream open(Path f) throws IOException {
+ return open(f, -1);
+ }
+
+ @Override
+ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+ byte[] data = files.get(f);
+
+ if (data == null) {
+ throw new IOException(f.toString());
+ }
+
+ return new FSDataInputStream(
+ new FSInputStream() {
+ private int pos = 0;
+
+ @Override
+ public void seek(long pos) {
+ this.pos = (int) pos;
+ }
+
+ @Override
+ public long getPos() {
+ return pos;
+ }
+
+ @Override
+ public boolean seekToNewSource(long targetPos) {
+ return false;
+ }
+
+ @Override
+ public int read() {
+ return pos < data.length ? (data[pos++] & 0xff) : -1;
+ }
+ });
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, boolean overwrite) throws
IOException {
+ return create(f, overwrite, -1, (short) -1, -1, null);
+ }
+
+ @Override
+ public FSDataOutputStream create(
+ Path f,
+ boolean overwrite,
+ int bufferSize,
+ short replication,
+ long blockSize,
+ Progressable progress)
+ throws IOException {
+
+ if (!overwrite && files.containsKey(f)) {
+ throw new IOException("File exists: " + f);
+ }
+
+ directories.add(f.getParent());
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ final Path toRet = f;
+ return new FSDataOutputStream(
+ new FilterOutputStream(baos) {
+ @Override
+ public void close() throws IOException {
+ super.close();
+ files.put(toRet, baos.toByteArray());
+ }
+ },
+ null);
+ }
+
+ @Override
+ public FSDataOutputStream create(
+ Path path,
+ FsPermission fsPermission,
+ boolean b,
+ int i,
+ short i1,
+ long l,
+ Progressable progressable)
+ throws IOException {
+ return create(path, b, i, i1, l, progressable);
+ }
+
+ @Override
+ public FSDataOutputStream append(Path path, int i, Progressable
progressable)
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean rename(Path path, Path path1) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean delete(Path f, boolean recursive) throws IOException {
+ if (files.remove(f) != null) {
+ return true;
+ }
+
+ if (!recursive) {
+ boolean hasChildren =
+ files.keySet().stream()
+ .anyMatch(p ->
p.getParent().toString().startsWith(f.toString()));
+ if (hasChildren) {
+ throw new IOException();
+ }
+ }
+
+ directories.removeIf(d -> d.toString().startsWith(f.toString()));
+ files.keySet().removeIf(p -> p.toString().startsWith(f.toString()));
+ return true;
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path f) {
+ if (files.containsKey(f)) {
+ return new FileStatus[] {new FileStatus(files.get(f).length,
false, 1, 1, 0, f)};
+ }
+
+ if (directories.contains(f)) {
+ List<FileStatus> statusList = new ArrayList<>();
+
+ for (Path p : files.keySet()) {
+ if (p.getParent().equals(f)) {
+ statusList.add(new FileStatus(files.get(p).length, false,
1, 1, 0, p));
+ }
+ }
+
+ for (Path d : directories) {
+ if (d.getParent() != null && d.getParent().equals(f) &&
!d.equals(f)) {
+ statusList.add(new FileStatus(0, true, 1, 1, 0, d));
+ }
+ }
+
+ return statusList.toArray(new FileStatus[0]);
+ }
+
+ return new FileStatus[0];
+ }
+
+ @Override
+ public void setWorkingDirectory(Path path) {}
+
+ @Override
+ public Path getWorkingDirectory() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+ Path parent = f;
+ while (parent != null) {
+ if (files.containsKey(parent)) {
+ throw new IOException();
+ }
+ parent = parent.getParent();
+ }
+
+ directories.add(f);
+ return true;
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path f) throws IOException {
+ if (files.containsKey(f)) {
+ return new FileStatus(files.get(f).length, false, 1, 1, 0, f);
+ }
+ if (directories.contains(f)) {
+ return new FileStatus(0, true, 1, 1, 0, f);
+ }
+
+ throw new IOException(f.toString());
+ }
+}
diff --git
a/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/AbfsDelegationTokenReceiverTest.java
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/AbfsDelegationTokenReceiverTest.java
new file mode 100644
index 000000000..9181c4429
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/AbfsDelegationTokenReceiverTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.azure.token;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link AbfsDelegationTokenReceiver}. */
+public class AbfsDelegationTokenReceiverTest {
+
+ @Test
+ void testScheme() {
+ AbfsDelegationTokenReceiver receiver = new
AbfsDelegationTokenReceiver();
+ assertThat(receiver.scheme()).isEqualTo("abfs");
+ }
+}
diff --git
a/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/AbfssDelegationTokenReceiverTest.java
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/AbfssDelegationTokenReceiverTest.java
new file mode 100644
index 000000000..03bb811af
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/AbfssDelegationTokenReceiverTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.azure.token;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link AbfssDelegationTokenReceiver}. */
+public class AbfssDelegationTokenReceiverTest {
+
+ @Test
+ void testScheme() {
+ AbfssDelegationTokenReceiver receiver = new
AbfssDelegationTokenReceiver();
+ assertThat(receiver.scheme()).isEqualTo("abfss");
+ }
+}
diff --git
a/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/AuthServerHandler.java
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/AuthServerHandler.java
new file mode 100644
index 000000000..26a7af986
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/AuthServerHandler.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.azure.token;
+
+import org.apache.fluss.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import
org.apache.fluss.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import
org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
+import
org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
+import org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.HttpObject;
+import org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import
org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.fluss.shaded.netty4.io.netty.util.AsciiString;
+import org.apache.fluss.utils.IOUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+
+import static
org.apache.fluss.shaded.guava32.com.google.common.net.HttpHeaders.CONTENT_ENCODING;
+import static
org.apache.fluss.shaded.guava32.com.google.common.net.HttpHeaders.CONTENT_LENGTH;
+import static
org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
+import static
org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
+import static
org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.HttpHeaderValues.APPLICATION_JSON;
+import static
org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.HttpHeaderValues.CLOSE;
+import static
org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
+import static
org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK;
+
+/** Netty Handler for facilitating the Azure auth token generation. */
+public class AuthServerHandler extends SimpleChannelInboundHandler<HttpObject>
{
+
+ @Override
+ public void channelReadComplete(ChannelHandlerContext ctx) {
+ ctx.flush();
+ }
+
+ @Override
+ public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
+ if (msg instanceof HttpRequest) {
+ HttpRequest req = (HttpRequest) msg;
+
+ try {
+ URI url = URI.create(req.uri());
+ if (req.method().equals(HttpMethod.POST)) {
+ postRequest(ctx, url, req);
+ } else {
+ getRequest(ctx, url, req);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private void postRequest(ChannelHandlerContext ctx, URI url, HttpRequest
req)
+ throws IOException {
+ if (url.getPath().endsWith("/")) {
+ jsonResponse(ctx, req, "create-token.json");
+ } else {
+ response(ctx, req, new byte[] {}, NOT_FOUND, APPLICATION_JSON);
+ }
+ }
+
+ private void getRequest(ChannelHandlerContext ctx, URI url, HttpRequest
req)
+ throws IOException {
+ if (url.getPath().endsWith("/token")) {
+ jsonResponse(ctx, req, "create-token.json");
+ } else {
+ response(ctx, req, new byte[] {}, NOT_FOUND, APPLICATION_JSON);
+ }
+ }
+
+ private void jsonResponse(ChannelHandlerContext ctx, HttpRequest req,
String path)
+ throws IOException {
+ jsonResponse(ctx, req, path, OK);
+ }
+
+ private void jsonResponse(
+ ChannelHandlerContext ctx,
+ HttpRequest req,
+ String path,
+ HttpResponseStatus responseStatus)
+ throws IOException {
+ response(ctx, req, readFromResources(path), responseStatus,
APPLICATION_JSON);
+ }
+
+ private static void response(
+ ChannelHandlerContext ctx,
+ HttpRequest req,
+ byte[] bytes,
+ HttpResponseStatus status,
+ AsciiString contentType) {
+ FullHttpResponse response =
+ new DefaultFullHttpResponse(
+ req.protocolVersion(), status,
Unpooled.wrappedBuffer(bytes));
+ response.headers()
+ .set(CONTENT_TYPE, contentType)
+ .setInt(CONTENT_LENGTH, response.content().readableBytes());
+
+ response.headers().remove(CONTENT_ENCODING);
+ response.headers().set(CONNECTION, CLOSE);
+ ctx.write(response);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ ctx.close();
+ }
+
+ private byte[] readFromResources(String path) throws IOException {
+ InputStream inputStream =
+
AuthServerHandler.class.getClassLoader().getResourceAsStream(path);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ IOUtils.copyBytes(inputStream, out, true);
+ return out.toByteArray();
+ }
+}
diff --git
a/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenProviderTest.java
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenProviderTest.java
new file mode 100644
index 000000000..ad716e1c4
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenProviderTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.azure.token;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.fs.token.Credentials;
+import org.apache.fluss.fs.token.CredentialsJsonSerde;
+import org.apache.fluss.fs.token.ObtainedSecurityToken;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.fluss.fs.azure.AzureFileSystemOptions.CLIENT_ID;
+import static org.apache.fluss.fs.azure.AzureFileSystemOptions.CLIENT_SECRET;
+import static org.apache.fluss.fs.azure.AzureFileSystemOptions.ENDPOINT_KEY;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link AzureDelegationTokenProvider}. */
+public class AzureDelegationTokenProviderTest {
+
+ private static final String TEST_CLIENT_ID = "testClientId";
+ private static final String TEST_CLIENT_SECRET = "testClientSecret";
+
+ private static final String TEST_ENDPOINT = "http://localhost:8080";
+
+ private static MockAuthServer mockAuthServer;
+
+ @BeforeAll
+ static void setup() {
+ mockAuthServer = MockAuthServer.create();
+ }
+
+ @Test
+ void obtainSecurityTokenShouldReturnSecurityToken() {
+ Configuration configuration = new Configuration();
+ configuration.set(CLIENT_ID, TEST_CLIENT_ID);
+ configuration.set(CLIENT_SECRET, TEST_CLIENT_SECRET);
+ configuration.set(ENDPOINT_KEY, TEST_ENDPOINT);
+ AzureDelegationTokenProvider azureDelegationTokenProvider =
+ new AzureDelegationTokenProvider("abfs", configuration);
+ ObtainedSecurityToken obtainedSecurityToken =
+ azureDelegationTokenProvider.obtainSecurityToken();
+ byte[] token = obtainedSecurityToken.getToken();
+ Credentials credentials = CredentialsJsonSerde.fromJson(token);
+ assertThat(credentials.getAccessKeyId()).isEqualTo("null");
+ assertThat(credentials.getSecretAccessKey()).isEqualTo("null");
+ assertThat(credentials.getSecurityToken()).isEqualTo("token");
+ }
+
+ @AfterAll
+ static void tearDown() {
+ mockAuthServer.close();
+ }
+}
diff --git
a/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenReceiverTest.java
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenReceiverTest.java
new file mode 100644
index 000000000..c17ebc6c0
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenReceiverTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.azure.token;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+
+import static
org.apache.fluss.fs.azure.AzureFileSystemOptions.PROVIDER_CONFIG_NAME;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link AzureDelegationTokenReceiver}. */
+class AzureDelegationTokenReceiverTest {
+
+ private static final String PROVIDER_CLASS_NAME = "TestProvider";
+
+ @BeforeEach
+ void beforeEach() {
+ AzureDelegationTokenReceiver.additionInfos = new HashMap<>();
+ }
+
+ @AfterEach
+ void afterEach() {
+ AzureDelegationTokenReceiver.additionInfos = null;
+ }
+
+ @Test
+ void updateHadoopConfigShouldFailOnEmptyAdditionalInfo() {
+ AzureDelegationTokenReceiver.additionInfos = null;
+ org.apache.hadoop.conf.Configuration hadoopConfiguration =
+ new org.apache.hadoop.conf.Configuration();
+ hadoopConfiguration.set(PROVIDER_CONFIG_NAME.key(), "");
+ assertThatThrownBy(
+ () ->
AzureDelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration))
+ .isInstanceOf(IllegalStateException.class);
+ }
+
+ @Test
+ void updateHadoopConfigShouldSetProviderWhenEmpty() {
+ org.apache.hadoop.conf.Configuration hadoopConfiguration =
+ new org.apache.hadoop.conf.Configuration();
+ hadoopConfiguration.set(PROVIDER_CONFIG_NAME.key(), "");
+ AzureDelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
+ assertThat(hadoopConfiguration.get(PROVIDER_CONFIG_NAME.key()))
+ .isEqualTo(DynamicTemporaryAzureCredentialsProvider.NAME);
+ }
+
+ @Test
+ void updateHadoopConfigShouldPrependProviderWhenNotEmpty() {
+ org.apache.hadoop.conf.Configuration hadoopConfiguration =
+ new org.apache.hadoop.conf.Configuration();
+ hadoopConfiguration.set(PROVIDER_CONFIG_NAME.key(),
PROVIDER_CLASS_NAME);
+ AzureDelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
+ String[] providers =
hadoopConfiguration.get(PROVIDER_CONFIG_NAME.key()).split(",");
+ assertThat(providers.length).isEqualTo(2);
+
assertThat(providers[0]).isEqualTo(DynamicTemporaryAzureCredentialsProvider.NAME);
+ assertThat(providers[1]).isEqualTo(PROVIDER_CLASS_NAME);
+ }
+
+ @Test
+ void updateHadoopConfigShouldNotAddProviderWhenAlreadyExists() {
+ org.apache.hadoop.conf.Configuration hadoopConfiguration =
+ new org.apache.hadoop.conf.Configuration();
+ hadoopConfiguration.set(
+ PROVIDER_CONFIG_NAME.key(),
DynamicTemporaryAzureCredentialsProvider.NAME);
+ AzureDelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
+ assertThat(hadoopConfiguration.get(PROVIDER_CONFIG_NAME.key()))
+ .isEqualTo(DynamicTemporaryAzureCredentialsProvider.NAME);
+ }
+}
diff --git
a/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/DynamicTemporaryAzureCredentialsProviderTest.java
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/DynamicTemporaryAzureCredentialsProviderTest.java
new file mode 100644
index 000000000..9f36b96de
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/DynamicTemporaryAzureCredentialsProviderTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.azure.token;
+
+import org.apache.fluss.fs.token.Credentials;
+import org.apache.fluss.fs.token.CredentialsJsonSerde;
+import org.apache.fluss.fs.token.ObtainedSecurityToken;
+
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.TokenAccessProviderException;
+import org.apache.hadoop.fs.azurebfs.oauth2.AzureADToken;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Date;
+import java.util.HashMap;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link DynamicTemporaryAzureCredentialsProvider}. */
+class DynamicTemporaryAzureCredentialsProviderTest {
+
+ private static final String CLIENT_ID = null;
+ private static final String CLIENT_SECRET = null;
+
+ private static final String SESSION_TOKEN = "sessionToken";
+
+ @AfterEach
+ void tearDown() {
+ AzureDelegationTokenReceiver.credentials = null;
+ AzureDelegationTokenReceiver.validUntil = null;
+ AzureDelegationTokenReceiver.additionInfos = null;
+ }
+
+ @Test
+ void getCredentialsShouldThrowExceptionWhenNoCredentials() {
+ DynamicTemporaryAzureCredentialsProvider provider =
+ new DynamicTemporaryAzureCredentialsProvider();
+
+
assertThatThrownBy(provider::getToken).isInstanceOf(TokenAccessProviderException.class);
+ }
+
+ @Test
+ void getCredentialsShouldStoreCredentialsWhenCredentialsProvided() throws
Exception {
+ DynamicTemporaryAzureCredentialsProvider provider =
+ new DynamicTemporaryAzureCredentialsProvider();
+ Credentials credentials = new Credentials(CLIENT_ID, CLIENT_SECRET,
SESSION_TOKEN);
+
+ AzureDelegationTokenReceiver receiver = new
AbfsDelegationTokenReceiver();
+
+ byte[] json = CredentialsJsonSerde.toJson(credentials);
+
+ ObtainedSecurityToken obtainedSecurityToken =
+ new ObtainedSecurityToken("abfs", json, 1L, new HashMap<>());
+ receiver.onNewTokensObtained(obtainedSecurityToken);
+
+ AzureADToken azureADToken = provider.getToken();
+
assertThat(azureADToken.getAccessToken()).isEqualTo(credentials.getSecurityToken());
+ assertThat(azureADToken.getExpiry()).isEqualTo(new Date(1L));
+ }
+}
diff --git
a/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/MockAuthServer.java
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/MockAuthServer.java
new file mode 100644
index 000000000..318f1eaa7
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/MockAuthServer.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.azure.token;
+
+import org.apache.fluss.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
+import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelInitializer;
+import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelOption;
+import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelPipeline;
+import org.apache.fluss.shaded.netty4.io.netty.channel.EventLoopGroup;
+import org.apache.fluss.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.fluss.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import
org.apache.fluss.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
+import
org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec;
+import
org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.HttpServerExpectContinueHandler;
+import org.apache.fluss.shaded.netty4.io.netty.handler.logging.LogLevel;
+import org.apache.fluss.shaded.netty4.io.netty.handler.logging.LoggingHandler;
+
+import java.io.Closeable;
+
+/** Mock Netty Auth Server for facilitating the Azure auth token generation. */
+public class MockAuthServer implements Closeable {
+
+ private final EventLoopGroup bossGroup;
+ private final EventLoopGroup workerGroup;
+
+ private ChannelFuture channelFuture;
+
+ MockAuthServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup) {
+ this.bossGroup = bossGroup;
+ this.workerGroup = workerGroup;
+ this.channelFuture = run();
+ }
+
+ public ChannelFuture run() {
+ try {
+ ServerBootstrap b = new ServerBootstrap();
+ b.option(ChannelOption.SO_BACKLOG, 1024);
+ b.group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .handler(new LoggingHandler(LogLevel.INFO))
+ .childHandler(
+ new ChannelInitializer<SocketChannel>() {
+ @Override
+ protected void initChannel(SocketChannel ch) {
+ ChannelPipeline p = ch.pipeline();
+ p.addLast(new HttpServerCodec());
+ p.addLast(new
HttpServerExpectContinueHandler());
+ p.addLast(new AuthServerHandler());
+ }
+ });
+
+ channelFuture = b.bind(8080).sync();
+ return channelFuture;
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static MockAuthServer create() {
+ return new MockAuthServer(new NioEventLoopGroup(1), new
NioEventLoopGroup());
+ }
+
+ @Override
+ public void close() {
+ try {
+ bossGroup.shutdownGracefully().sync();
+ workerGroup.shutdownGracefully().sync();
+ channelFuture.channel().closeFuture().sync();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/WasbDelegationTokenReceiverTest.java
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/WasbDelegationTokenReceiverTest.java
new file mode 100644
index 000000000..ace84723c
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/WasbDelegationTokenReceiverTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.azure.token;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link WasbDelegationTokenReceiver}. */
+public class WasbDelegationTokenReceiverTest {
+
+ @Test
+ void testScheme() {
+ WasbDelegationTokenReceiver receiver = new
WasbDelegationTokenReceiver();
+ assertThat(receiver.scheme()).isEqualTo("wasb");
+ }
+}
diff --git
a/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/WasbsDelegationTokenReceiverTest.java
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/WasbsDelegationTokenReceiverTest.java
new file mode 100644
index 000000000..77d6a9dae
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/WasbsDelegationTokenReceiverTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.azure.token;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link WasbsDelegationTokenReceiver}. */
+public class WasbsDelegationTokenReceiverTest {
+
+ @Test
+ void testScheme() {
+ WasbsDelegationTokenReceiver receiver = new
WasbsDelegationTokenReceiver();
+ assertThat(receiver.scheme()).isEqualTo("wasbs");
+ }
+}
diff --git
a/fluss-filesystems/fluss-fs-azure/src/test/resources/create-token.json
b/fluss-filesystems/fluss-fs-azure/src/test/resources/create-token.json
new file mode 100644
index 000000000..b199febb0
--- /dev/null
+++ b/fluss-filesystems/fluss-fs-azure/src/test/resources/create-token.json
@@ -0,0 +1,5 @@
+{
+ "access_token": "token",
+ "expires_in": 3599,
+ "token_type": "Bearer"
+}
\ No newline at end of file
diff --git a/fluss-filesystems/pom.xml b/fluss-filesystems/pom.xml
index 2a2719455..0fd2e67d5 100644
--- a/fluss-filesystems/pom.xml
+++ b/fluss-filesystems/pom.xml
@@ -34,6 +34,7 @@
<module>fluss-fs-oss</module>
<module>fluss-fs-s3</module>
<module>fluss-fs-gs</module>
+ <module>fluss-fs-azure</module>
<module>fluss-fs-obs</module>
<module>fluss-fs-hdfs</module>
</modules>