This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0acac5391bb9c8e9de88a3bfab24ba6c2e8d41be Author: Srinivasulu Punuru <[email protected]> AuthorDate: Tue Aug 10 16:24:52 2021 -0700 [FLINK-18562][fs] Adding ABFS support to access ADLS Gen2 storage accounts --- docs/content/docs/deployment/filesystems/azure.md | 43 ++++++++++++++++++++-- .../docs/deployment/filesystems/overview.md | 5 ++- .../java/org/apache/flink/core/fs/FileSystem.java | 2 + .../flink/fs/azurefs/AbstractAzureFSFactory.java | 28 +++++--------- ...Factory.java => AzureBlobStorageFSFactory.java} | 10 ++++- ...y.java => AzureDataLakeStoreGen2FSFactory.java} | 14 +++++-- ...y.java => SecureAzureBlobStorageFSFactory.java} | 10 ++++- ... => SecureAzureDataLakeStoreGen2FSFactory.java} | 14 +++++-- .../org.apache.flink.core.fs.FileSystemFactory | 6 ++- ...est.java => AzureBlobStorageFSFactoryTest.java} | 6 ++- ...va => AzureDataLakeStoreGen2FSFactoryTest.java} | 33 ++++------------- 11 files changed, 110 insertions(+), 61 deletions(-) diff --git a/docs/content/docs/deployment/filesystems/azure.md b/docs/content/docs/deployment/filesystems/azure.md index dcb2038..3e0f41d 100644 --- a/docs/content/docs/deployment/filesystems/azure.md +++ b/docs/content/docs/deployment/filesystems/azure.md @@ -30,13 +30,31 @@ under the License. [Azure Blob Storage](https://docs.microsoft.com/en-us/azure/storage/) is a Microsoft-managed service providing cloud storage for a variety of use cases. You can use Azure Blob Storage with Flink for **reading** and **writing data** as well in conjunction with the [streaming **state backends**]({{< ref "docs/ops/state/state_backends" >}}) +Flink supports accessing Azure Blob Storage using both [wasb://](https://hadoop.apache.org/docs/stable/hadoop-azure/index.html) or [abfs://](https://hadoop.apache.org/docs/stable/hadoop-azure/abfs.html). + +{{< hint info >}} +Azure recommends using abfs:// for accessing ADLS Gen2 storage accounts even though wasb:// works through backward compatibility. +{{< /hint >}} + +{{< hint warning >}} +abfs:// can be used for accessing the ADLS Gen2 storage accounts only. Please visit Azure documentation on how to identify ADLS Gen2 storage account. +{{< /hint >}} + + You can use Azure Blob Storage objects like regular files by specifying paths in the following format: ```plain +// WASB unencrypted access wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path> -// SSL encrypted access +// WASB SSL encrypted access wasbs://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path> + +// ABFS unecrypted access +abfs://<your-container>@$<your-azure-account>.dfs.core.windows.net/<object-path> + +// ABFS SSL encrypted access +abfss://<your-container>@$<your-azure-account>.dfs.core.windows.net/<object-path> ``` See below for how to use Azure Blob Storage in a Flink job: @@ -63,9 +81,11 @@ cp ./opt/flink-azure-fs-hadoop-{{< version >}}.jar ./plugins/azure-fs-hadoop/ `flink-azure-fs-hadoop` registers default FileSystem wrappers for URIs with the *wasb://* and *wasbs://* (SSL encrypted access) scheme. -### Credentials Configuration +## Credentials Configuration -Hadoop's Azure Filesystem supports configuration of credentials via the Hadoop configuration as +### WASB + +Hadoop's WASB Azure Filesystem supports configuration of credentials via the Hadoop configuration as outlined in the [Hadoop Azure Blob Storage documentation](https://hadoop.apache.org/docs/current/hadoop-azure/index.html#Configuring_Credentials). For convenience Flink forwards all Flink configurations with a key prefix of `fs.azure` to the Hadoop configuration of the filesystem. Consequentially, the azure blob storage key can be configured @@ -83,4 +103,21 @@ environment variable `AZURE_STORAGE_KEY` by setting the following configuration fs.azure.account.keyprovider.<account_name>.blob.core.windows.net: org.apache.flink.fs.azurefs.EnvironmentVariableKeyProvider ``` +### ABFS + +Hadoop's ABFS Azure Filesystem supports several ways of configuring authentication. Please visit the [Hadoop ABFS documentation](https://hadoop.apache.org/docs/stable/hadoop-azure/abfs.html#Authentication) documentation on how to configure. + +{{< hint info >}} +Azure recommends using Azure managed identity to access the ADLS Gen2 storage accounts using abfs. Please refer to [Azure managed identities documentation](https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/) for more details. + +Please visit the [page](https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/services-support-managed-identities#azure-services-that-support-managed-identities-for-azure-resources) for the list of services that support Managed Identities. Flink clusters deployed in those Azure services can take advantage of Managed Identities. +{{< /hint >}} + +##### Accessing ABFS using storage Keys (Discouraged) +Azure blob storage key can be configured in `flink-conf.yaml` via: + +```yaml +fs.azure.account.key.<account_name>.dfs.core.windows.net: <azure_storage_key> +``` + {{< top >}} diff --git a/docs/content/docs/deployment/filesystems/overview.md b/docs/content/docs/deployment/filesystems/overview.md index aec8b53..138a64c 100644 --- a/docs/content/docs/deployment/filesystems/overview.md +++ b/docs/content/docs/deployment/filesystems/overview.md @@ -56,9 +56,12 @@ The Apache Flink project supports the following file systems: - **[Aliyun Object Storage Service]({{< ref "docs/deployment/filesystems/oss" >}})** is supported by `flink-oss-fs-hadoop` and registered under the *oss://* URI scheme. The implementation is based on the [Hadoop Project](https://hadoop.apache.org/) but is self-contained with no dependency footprint. + - **[Azure Data Lake Store Gen2]({{< ref "docs/deployment/filesystems/azure" >}})** is supported by `flink-azure-fs-hadoop` and registered under the *abfs(s)://* URI schemes. + The implementation is based on the [Hadoop Project](https://hadoop.apache.org/) but is self-contained with no dependency footprint. + - **[Azure Blob Storage]({{< ref "docs/deployment/filesystems/azure" >}})** is supported by `flink-azure-fs-hadoop` and registered under the *wasb(s)://* URI schemes. The implementation is based on the [Hadoop Project](https://hadoop.apache.org/) but is self-contained with no dependency footprint. - + - **[Google Cloud Storage]({{< ref "docs/deployment/filesystems/gcs" >}})** is supported by `gcs-connector` and registered under the *gs://* URI scheme. The implementation is based on the [Hadoop Project](https://hadoop.apache.org/) but is self-contained with no dependency footprint. diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java index 8958695..b77c2f5 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java @@ -248,6 +248,8 @@ public abstract class FileSystem { ImmutableMultimap.<String, String>builder() .put("wasb", "flink-fs-azure-hadoop") .put("wasbs", "flink-fs-azure-hadoop") + .put("abfs", "flink-fs-azure-hadoop") + .put("abfss", "flink-fs-azure-hadoop") .put("oss", "flink-oss-fs-hadoop") .put("s3", "flink-s3-fs-hadoop") .put("s3", "flink-s3-fs-presto") diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AbstractAzureFSFactory.java b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AbstractAzureFSFactory.java index 5fbad53..d4f358e 100644 --- a/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AbstractAzureFSFactory.java +++ b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AbstractAzureFSFactory.java @@ -24,7 +24,6 @@ import org.apache.flink.core.fs.FileSystemFactory; import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; import org.apache.flink.runtime.util.HadoopConfigLoader; -import org.apache.hadoop.fs.azure.NativeAzureFileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,12 +35,12 @@ import java.util.Set; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * Abstract factory for AzureFS. Subclasses override to specify the correct scheme (wasb / wasbs). - * Based on Azure HDFS support in the <a + * Abstract factory for AzureFS. Subclasses override to specify the correct scheme (wasb / wasbs / + * abfs/ abfss). Based on Azure HDFS support in the <a * href="https://hadoop.apache.org/docs/current/hadoop-azure/index.html">hadoop-azure</a> module. */ public abstract class AbstractAzureFSFactory implements FileSystemFactory { - private static final Logger LOG = LoggerFactory.getLogger(AzureFSFactory.class); + private static final Logger LOG = LoggerFactory.getLogger(AzureBlobStorageFSFactory.class); private static final String[] FLINK_CONFIG_PREFIXES = {"fs.azure.", "azure."}; private static final String HADOOP_CONFIG_PREFIX = "fs.azure."; @@ -53,8 +52,6 @@ public abstract class AbstractAzureFSFactory implements FileSystemFactory { private final HadoopConfigLoader configLoader; - private Configuration flinkConfig; - public AbstractAzureFSFactory() { this.configLoader = new HadoopConfigLoader( @@ -68,25 +65,18 @@ public abstract class AbstractAzureFSFactory implements FileSystemFactory { @Override public void configure(Configuration config) { - flinkConfig = config; configLoader.setFlinkConfig(config); } + abstract org.apache.hadoop.fs.FileSystem createAzureFS(); + @Override public FileSystem create(URI fsUri) throws IOException { checkNotNull(fsUri, "passed file system URI object should not be null"); - LOG.info("Trying to load and instantiate Azure File System"); - return new HadoopFileSystem(createInitializedAzureFS(fsUri, flinkConfig)); - } - - // uri is of the form: wasb(s)://[email protected]/testDir - private org.apache.hadoop.fs.FileSystem createInitializedAzureFS( - URI fsUri, Configuration flinkConfig) throws IOException { + LOG.info("Trying to load and instantiate Azure File System for {}", fsUri); org.apache.hadoop.conf.Configuration hadoopConfig = configLoader.getOrLoadHadoopConfig(); - - org.apache.hadoop.fs.FileSystem azureFS = new NativeAzureFileSystem(); - azureFS.initialize(fsUri, hadoopConfig); - - return azureFS; + org.apache.hadoop.fs.FileSystem fs = createAzureFS(); + fs.initialize(fsUri, hadoopConfig); + return new HadoopFileSystem(fs); } } diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureFSFactory.java b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobStorageFSFactory.java similarity index 78% copy from flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureFSFactory.java copy to flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobStorageFSFactory.java index 1520569..c7e28a5 100644 --- a/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureFSFactory.java +++ b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobStorageFSFactory.java @@ -18,11 +18,19 @@ package org.apache.flink.fs.azurefs; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.azure.NativeAzureFileSystem; + /** A factory for the Azure file system over HTTP. */ -public class AzureFSFactory extends AbstractAzureFSFactory { +public class AzureBlobStorageFSFactory extends AbstractAzureFSFactory { @Override public String getScheme() { return "wasb"; } + + @Override + FileSystem createAzureFS() { + return new NativeAzureFileSystem(); + } } diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureFSFactory.java b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureDataLakeStoreGen2FSFactory.java similarity index 73% copy from flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureFSFactory.java copy to flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureDataLakeStoreGen2FSFactory.java index 1520569..37a5b75 100644 --- a/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureFSFactory.java +++ b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureDataLakeStoreGen2FSFactory.java @@ -18,11 +18,19 @@ package org.apache.flink.fs.azurefs; -/** A factory for the Azure file system over HTTP. */ -public class AzureFSFactory extends AbstractAzureFSFactory { +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; + +/** Abfs azureFs implementation. */ +public class AzureDataLakeStoreGen2FSFactory extends AbstractAzureFSFactory { @Override public String getScheme() { - return "wasb"; + return "abfs"; + } + + @Override + FileSystem createAzureFS() { + return new AzureBlobFileSystem(); } } diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/SecureAzureFSFactory.java b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/SecureAzureBlobStorageFSFactory.java similarity index 78% rename from flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/SecureAzureFSFactory.java rename to flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/SecureAzureBlobStorageFSFactory.java index bdd0a6d..0ce3be9 100644 --- a/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/SecureAzureFSFactory.java +++ b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/SecureAzureBlobStorageFSFactory.java @@ -18,11 +18,19 @@ package org.apache.flink.fs.azurefs; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.azure.NativeAzureFileSystem; + /** A factory for the Azure file system over HTTPs. */ -public class SecureAzureFSFactory extends AbstractAzureFSFactory { +public class SecureAzureBlobStorageFSFactory extends AbstractAzureFSFactory { @Override public String getScheme() { return "wasbs"; } + + @Override + FileSystem createAzureFS() { + return new NativeAzureFileSystem(); + } } diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureFSFactory.java b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/SecureAzureDataLakeStoreGen2FSFactory.java similarity index 72% rename from flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureFSFactory.java rename to flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/SecureAzureDataLakeStoreGen2FSFactory.java index 1520569..1566195 100644 --- a/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureFSFactory.java +++ b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/SecureAzureDataLakeStoreGen2FSFactory.java @@ -18,11 +18,19 @@ package org.apache.flink.fs.azurefs; -/** A factory for the Azure file system over HTTP. */ -public class AzureFSFactory extends AbstractAzureFSFactory { +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; + +/** Secure ABFS AzureFS implementation. */ +public class SecureAzureDataLakeStoreGen2FSFactory extends AbstractAzureFSFactory { @Override public String getScheme() { - return "wasb"; + return "abfss"; + } + + @Override + FileSystem createAzureFS() { + return new AzureBlobFileSystem(); } } diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory b/flink-filesystems/flink-azure-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory index 4d6a19a..3b35a28 100644 --- a/flink-filesystems/flink-azure-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory +++ b/flink-filesystems/flink-azure-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory @@ -13,5 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.flink.fs.azurefs.AzureFSFactory -org.apache.flink.fs.azurefs.SecureAzureFSFactory +org.apache.flink.fs.azurefs.AzureBlobStorageFSFactory +org.apache.flink.fs.azurefs.SecureAzureBlobStorageFSFactory +org.apache.flink.fs.azurefs.AzureDataLakeStoreGen2FSFactory +org.apache.flink.fs.azurefs.SecureAzureDataLakeStoreGen2FSFactory diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFSFactoryTest.java b/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureBlobStorageFSFactoryTest.java similarity index 93% copy from flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFSFactoryTest.java copy to flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureBlobStorageFSFactoryTest.java index 326e85a..d39e7df 100644 --- a/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFSFactoryTest.java +++ b/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureBlobStorageFSFactoryTest.java @@ -34,7 +34,7 @@ import java.util.List; /** Tests for the AzureFSFactory. */ @RunWith(Parameterized.class) -public class AzureFSFactoryTest extends TestLogger { +public class AzureBlobStorageFSFactoryTest extends TestLogger { @Parameterized.Parameter public String scheme; @@ -46,7 +46,9 @@ public class AzureFSFactoryTest extends TestLogger { @Rule public final ExpectedException exception = ExpectedException.none(); private AbstractAzureFSFactory getFactory(String scheme) { - return scheme.equals("wasb") ? new AzureFSFactory() : new SecureAzureFSFactory(); + return scheme.equals("wasb") + ? new AzureBlobStorageFSFactory() + : new SecureAzureBlobStorageFSFactory(); } @Test diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFSFactoryTest.java b/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureDataLakeStoreGen2FSFactoryTest.java similarity index 67% rename from flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFSFactoryTest.java rename to flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureDataLakeStoreGen2FSFactoryTest.java index 326e85a..0c73181 100644 --- a/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFSFactoryTest.java +++ b/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureDataLakeStoreGen2FSFactoryTest.java @@ -19,9 +19,7 @@ package org.apache.flink.fs.azurefs; import org.apache.flink.configuration.Configuration; -import org.apache.flink.util.TestLogger; -import org.apache.hadoop.fs.azure.AzureException; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -32,21 +30,22 @@ import java.net.URI; import java.util.Arrays; import java.util.List; -/** Tests for the AzureFSFactory. */ +/** Tests for the ABFSAzureFSFactory. */ @RunWith(Parameterized.class) -public class AzureFSFactoryTest extends TestLogger { - +public class AzureDataLakeStoreGen2FSFactoryTest { @Parameterized.Parameter public String scheme; @Parameterized.Parameters(name = "Scheme = {0}") public static List<String> parameters() { - return Arrays.asList("wasb", "wasbs"); + return Arrays.asList("abfs", "abfss"); } @Rule public final ExpectedException exception = ExpectedException.none(); private AbstractAzureFSFactory getFactory(String scheme) { - return scheme.equals("wasb") ? new AzureFSFactory() : new SecureAzureFSFactory(); + return scheme.equals("abfs") + ? new AzureDataLakeStoreGen2FSFactory() + : new SecureAzureDataLakeStoreGen2FSFactory(); } @Test @@ -60,31 +59,13 @@ public class AzureFSFactoryTest extends TestLogger { factory.create(uri); } - // missing credentials - @Test - public void testCreateFsWithAuthorityMissingCreds() throws Exception { - String uriString = - String.format( - "%s://[email protected]/testDir", scheme); - final URI uri = URI.create(uriString); - - exception.expect(AzureException.class); - - AbstractAzureFSFactory factory = getFactory(scheme); - Configuration config = new Configuration(); - config.setInteger("fs.azure.io.retry.max.retries", 0); - factory.configure(config); - factory.create(uri); - } - @Test public void testCreateFsWithMissingAuthority() throws Exception { String uriString = String.format("%s:///my/path", scheme); final URI uri = URI.create(uriString); exception.expect(IllegalArgumentException.class); - exception.expectMessage( - "Cannot initialize WASB file system, URI authority not recognized."); + exception.expectMessage(String.format("%s has invalid authority.", uriString)); AbstractAzureFSFactory factory = getFactory(scheme); factory.configure(new Configuration());
