This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new ad7f8e09f8 Add hook for Azure Data Lake Storage Gen2 (#28262) ad7f8e09f8 is described below commit ad7f8e09f8e6e87df2665abdedb22b3e8a469b49 Author: Bharanidharan <94612827+bharanidhara...@users.noreply.github.com> AuthorDate: Thu Jan 5 19:33:27 2023 +0530 Add hook for Azure Data Lake Storage Gen2 (#28262) Created hook for supporting ADLS gen2, which uses the WASB connection and connects to ADLS gen2 storage Relates to #28223 --- .../providers/microsoft/azure/hooks/data_lake.py | 311 ++++++++++++++++++++- airflow/providers/microsoft/azure/provider.yaml | 10 + .../connections/adls_v2.rst | 68 +++++ generated/provider_dependencies.json | 1 + .../microsoft/azure/hooks/test_azure_data_lake.py | 96 ++++++- 5 files changed, 477 insertions(+), 9 deletions(-) diff --git a/airflow/providers/microsoft/azure/hooks/data_lake.py b/airflow/providers/microsoft/azure/hooks/data_lake.py index 4a9bc98f92..ad5e9818a9 100644 --- a/airflow/providers/microsoft/azure/hooks/data_lake.py +++ b/airflow/providers/microsoft/azure/hooks/data_lake.py @@ -15,19 +15,21 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -""" -This module contains integration with Azure Data Lake. - -AzureDataLakeHook communicates via a REST API compatible with WebHDFS. Make sure that a -Airflow connection of type `azure_data_lake` exists. Authorization can be done by supplying a -login (=Client ID), password (=Client Secret) and extra fields tenant (Tenant) and account_name (Account Name) -(see connection `azure_data_lake_default` for an example). -""" from __future__ import annotations from typing import Any +from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError from azure.datalake.store import core, lib, multithread +from azure.identity import ClientSecretCredential +from azure.storage.filedatalake import ( + DataLakeDirectoryClient, + DataLakeFileClient, + DataLakeServiceClient, + DirectoryProperties, + FileSystemClient, + FileSystemProperties, +) from airflow.exceptions import AirflowException from airflow.hooks.base import BaseHook @@ -36,6 +38,13 @@ from airflow.providers.microsoft.azure.utils import _ensure_prefixes, get_field class AzureDataLakeHook(BaseHook): """ + This module contains integration with Azure Data Lake. + + AzureDataLakeHook communicates via a REST API compatible with WebHDFS. Make sure that a + Airflow connection of type `azure_data_lake` exists. Authorization can be done by supplying a + login (=Client ID), password (=Client Secret) and extra fields tenant (Tenant) and account_name + (Account Name)(see connection `azure_data_lake_default` for an example). + Interacts with Azure Data Lake. Client ID and client secret should be in user and password parameters. @@ -230,3 +239,289 @@ class AzureDataLakeHook(BaseHook): self.log.info("File %s not found", path) else: raise AirflowException(f"File {path} not found") + + +class AzureDataLakeStorageV2Hook(BaseHook): + """ + This Hook interacts with ADLS gen2 storage account it mainly helps to create and manage + directories and files in storage accounts that have a hierarchical namespace. Using Adls_v2 connection + details create DataLakeServiceClient object + + Due to Wasb is marked as legacy and and retirement of the (ADLS1) it would be nice to + implement ADLS gen2 hook for interacting with the storage account. + + .. seealso:: + https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-directory-file-acl-python + + :param adls_conn_id: Reference to the :ref:`adls connection <howto/connection:adls>`. + :param public_read: Whether an anonymous public read access should be used. default is False + """ + + conn_name_attr = "adls_conn_id" + default_conn_name = "adls_default" + conn_type = "adls" + hook_name = "Azure Date Lake Storage V2" + + @staticmethod + def get_connection_form_widgets() -> dict[str, Any]: + """Returns connection widgets to add to connection form""" + from flask_appbuilder.fieldwidgets import BS3PasswordFieldWidget, BS3TextFieldWidget + from flask_babel import lazy_gettext + from wtforms import PasswordField, StringField + + return { + "connection_string": PasswordField( + lazy_gettext("ADLS gen2 Connection String (optional)"), widget=BS3PasswordFieldWidget() + ), + "tenant_id": StringField( + lazy_gettext("Tenant Id (Active Directory Auth)"), widget=BS3TextFieldWidget() + ), + } + + @staticmethod + def get_ui_field_behaviour() -> dict[str, Any]: + """Returns custom field behaviour""" + return { + "hidden_fields": ["schema", "port"], + "relabeling": { + "login": "ADLS gen2 Storage Login (optional)", + "password": "ADLS gen2 Storage Key (optional)", + "host": "Account Name (Active Directory Auth)", + }, + "placeholders": { + "extra": "additional options for use with FileService and AzureFileVolume", + "login": "account name", + "password": "secret", + "host": "account url", + "connection_string": "connection string auth", + "tenant_id": "tenant", + }, + } + + def __init__(self, adls_conn_id: str, public_read: bool = False) -> None: + super().__init__() + self.conn_id = adls_conn_id + self.public_read = public_read + self.service_client = self.get_conn() + + def get_conn(self) -> DataLakeServiceClient: # type: ignore[override] + """Return the DataLakeServiceClient object.""" + conn = self.get_connection(self.conn_id) + extra = conn.extra_dejson or {} + + connection_string = self._get_field(extra, "connection_string") + if connection_string: + # connection_string auth takes priority + return DataLakeServiceClient.from_connection_string(connection_string, **extra) + + tenant = self._get_field(extra, "tenant_id") + if tenant: + # use Active Directory auth + app_id = conn.login + app_secret = conn.password + token_credential = ClientSecretCredential(tenant, app_id, app_secret) + return DataLakeServiceClient( + account_url=f"https://{conn.login}.dfs.core.windows.net", credential=token_credential, **extra + ) + credential = conn.password + return DataLakeServiceClient( + account_url=f"https://{conn.login}.dfs.core.windows.net", + credential=credential, + **extra, + ) + + def _get_field(self, extra_dict, field_name): + prefix = "extra__adls__" + if field_name.startswith("extra__"): + raise ValueError( + f"Got prefixed name {field_name}; please remove the '{prefix}' prefix " + f"when using this method." + ) + if field_name in extra_dict: + return extra_dict[field_name] or None + return extra_dict.get(f"{prefix}{field_name}") or None + + def create_file_system(self, file_system_name: str) -> None: + """ + A container acts as a file system for your files. Creates a new file system under + the specified account. + + If the file system with the same name already exists, a ResourceExistsError will + be raised. This method returns a client with which to interact with the newly + created file system. + """ + try: + file_system_client = self.service_client.create_file_system(file_system=file_system_name) + self.log.info("Created file system: %s", file_system_client.file_system_name) + except ResourceExistsError: + self.log.info("Attempted to create file system %r but it already exists.", file_system_name) + except Exception as e: + self.log.info("Error while attempting to create file system %r: %s", file_system_name, e) + raise + + def get_file_system(self, file_system: FileSystemProperties | str) -> FileSystemClient: + """ + Get a client to interact with the specified file system + + :param file_system: This can either be the name of the file system + or an instance of FileSystemProperties. + """ + try: + file_system_client = self.service_client.get_file_system_client(file_system=file_system) + return file_system_client + except ResourceNotFoundError: + self.log.info("file system %r doesn't exists.", file_system) + raise + except Exception as e: + self.log.info("Error while attempting to get file system %r: %s", file_system, e) + raise + + def create_directory( + self, file_system_name: FileSystemProperties | str, directory_name: str, **kwargs + ) -> DataLakeDirectoryClient: + """ + Create a directory under the specified file system. + + :param file_system_name: Name of the file system or instance of FileSystemProperties. + :param directory_name: Name of the directory which needs to be created in the file system. + """ + result = self.get_file_system(file_system_name).create_directory(directory_name, kwargs) + return result + + def get_directory_client( + self, + file_system_name: FileSystemProperties | str, + directory_name: DirectoryProperties | str, + ) -> DataLakeDirectoryClient: + """ + Get the specific directory under the specified file system. + + :param file_system_name: Name of the file system or instance of FileSystemProperties. + :param directory_name: Name of the directory or instance of DirectoryProperties which needs to be + retrieved from the file system. + """ + try: + directory_client = self.get_file_system(file_system_name).get_directory_client(directory_name) + return directory_client + except ResourceNotFoundError: + self.log.info( + "Directory %s doesn't exists in the file system %s", directory_name, file_system_name + ) + raise + except Exception as e: + self.log.info(e) + raise + + def create_file(self, file_system_name: FileSystemProperties | str, file_name: str) -> DataLakeFileClient: + """ + Creates a file under the file system + + :param file_system_name: Name of the file system or instance of FileSystemProperties. + :param file_name: Name of the file which needs to be created in the file system. + """ + file_client = self.get_file_system(file_system_name).create_file(file_name) + return file_client + + def upload_file( + self, + file_system_name: FileSystemProperties | str, + file_name: str, + file_path: str, + overwrite: bool = False, + **kwargs: Any, + ) -> None: + """ + Create a file with data in the file system + + :param file_system_name: Name of the file system or instance of FileSystemProperties. + :param file_name: Name of the file to be created with name. + :param file_path: Path to the file to load. + :param overwrite: Boolean flag to overwrite an existing file or not. + """ + file_client = self.create_file(file_system_name, file_name) + with open(file_path, "rb") as data: + file_client.upload_data(data, overwrite=overwrite, kwargs=kwargs) + + def upload_file_to_directory( + self, + file_system_name: str, + directory_name: str, + file_name: str, + file_path: str, + overwrite: bool = False, + **kwargs: Any, + ) -> None: + """ + Create a new file and return the file client to be interacted with and then + upload data to a file + + :param file_system_name: Name of the file system or instance of FileSystemProperties. + :param directory_name: Name of the directory. + :param file_name: Name of the file to be created with name. + :param file_path: Path to the file to load. + :param overwrite: Boolean flag to overwrite an existing file or not. + """ + directory_client = self.get_directory_client(file_system_name, directory_name=directory_name) + file_client = directory_client.create_file(file_name, kwargs=kwargs) + with open(file_path, "rb") as data: + file_client.upload_data(data, overwrite=overwrite, kwargs=kwargs) + + def list_files_directory( + self, file_system_name: FileSystemProperties | str, directory_name: str + ) -> list[str]: + """ + Get the list of files or directories under the specified file system + + :param file_system_name: Name of the file system or instance of FileSystemProperties. + :param directory_name: Name of the directory. + """ + paths = self.get_file_system(file_system=file_system_name).get_paths(directory_name) + directory_lists = [] + for path in paths: + directory_lists.append(path.name) + return directory_lists + + def list_file_system( + self, prefix: str | None = None, include_metadata: bool = False, **kwargs: Any + ) -> list[str]: + """ + Get the list the file systems under the specified account. + + :param prefix: + Filters the results to return only file systems whose names + begin with the specified prefix. + :param include_metadata: Specifies that file system metadata be returned in the response. + The default value is `False`. + """ + file_system = self.service_client.list_file_systems( + name_starts_with=prefix, include_metadata=include_metadata + ) + file_system_list = [] + for fs in file_system: + file_system_list.append(fs.name) + return file_system_list + + def delete_file_system(self, file_system_name: FileSystemProperties | str) -> None: + """ + Deletes the file system + + :param file_system_name: Name of the file system or instance of FileSystemProperties. + """ + try: + self.service_client.delete_file_system(file_system_name) + self.log.info("Deleted file system: %s", file_system_name) + except ResourceNotFoundError: + self.log.info("file system %r doesn't exists.", file_system_name) + except Exception as e: + self.log.info("Error while attempting to deleting file system %r: %s", file_system_name, e) + raise + + def delete_directory(self, file_system_name: FileSystemProperties | str, directory_name: str) -> None: + """ + Deletes specified directory in file system + + :param file_system_name: Name of the file system or instance of FileSystemProperties. + :param directory_name: Name of the directory. + """ + directory_client = self.get_directory_client(file_system_name, directory_name) + directory_client.delete_directory() diff --git a/airflow/providers/microsoft/azure/provider.yaml b/airflow/providers/microsoft/azure/provider.yaml index 4962863668..e3af9984c6 100644 --- a/airflow/providers/microsoft/azure/provider.yaml +++ b/airflow/providers/microsoft/azure/provider.yaml @@ -67,6 +67,7 @@ dependencies: - azure-servicebus>=7.6.1 - azure-synapse-spark - adal>=1.2.7 + - azure-storage-file-datalake>=12.9.1 integrations: - integration-name: Microsoft Azure Batch @@ -124,6 +125,10 @@ integrations: how-to-guide: - /docs/apache-airflow-providers-microsoft-azure/operators/azure_synapse.rst tags: [azure] + - integration-name: Microsoft Azure Data Lake Storage Client Gen2 + external-doc-url: https://azure.microsoft.com/en-us/products/storage/data-lake-storage/ + logo: /integration-logos/azure/Data Lake Storage.svg + tags: [azure] operators: - integration-name: Microsoft Azure Data Lake Storage @@ -198,6 +203,9 @@ hooks: - integration-name: Microsoft Azure Service Bus python-modules: - airflow.providers.microsoft.azure.hooks.asb + - integration-name: Microsoft Azure Data Lake Storage Client Gen2 + python-modules: + - airflow.providers.microsoft.azure.hooks.data_lake - integration-name: Microsoft Azure Synapse python-modules: - airflow.providers.microsoft.azure.hooks.synapse @@ -252,6 +260,8 @@ connection-types: connection-type: azure_service_bus - hook-class-name: airflow.providers.microsoft.azure.hooks.synapse.AzureSynapseHook connection-type: azure_synapse + - hook-class-name: airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook + connection-type: adls secrets-backends: - airflow.providers.microsoft.azure.secrets.key_vault.AzureKeyVaultBackend diff --git a/docs/apache-airflow-providers-microsoft-azure/connections/adls_v2.rst b/docs/apache-airflow-providers-microsoft-azure/connections/adls_v2.rst new file mode 100644 index 0000000000..9ec4015679 --- /dev/null +++ b/docs/apache-airflow-providers-microsoft-azure/connections/adls_v2.rst @@ -0,0 +1,68 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _howto/connection:adls: + +Microsoft Azure Data Lake Storage Gen2 Connection +================================================== + +The Microsoft Azure Data Lake Storage Gen2 connection type enables the ADLS gen2 Integrations. + +Authenticating to Azure Data Lake Storage Gen2 +---------------------------------------------- + +Currently, there are two ways to connect to Azure Data Lake Storage Gen2 using Airflow. + +1. Use `token credentials + <https://docs.microsoft.com/en-us/azure/developer/python/azure-sdk-authenticate?tabs=cmd#authenticate-with-token-credentials>`_ + i.e. add specific credentials (client_id, secret, tenant) and subscription id to the Airflow connection. +2. Use a `Connection String + <https://docs.microsoft.com/en-us/azure/data-explorer/kusto/api/connection-strings/storage>`_ + i.e. add connection string to ``connection_string`` in the Airflow connection. + +Only one authorization method can be used at a time. If you need to manage multiple credentials or keys then you should +configure multiple connections. + +Default Connection IDs +---------------------- + +All hooks and operators related to Microsoft Azure Blob Storage use ``azure_data_lake_default`` by default. + +Configuring the Connection +-------------------------- + +Login (optional) + Specify the login used for azure blob storage. For use with Shared Key Credential and SAS Token authentication. + +Password (optional) + Specify the password used for azure blob storage. For use with + Active Directory (token credential) and shared key authentication. + +Host (optional) + Specify the account url for anonymous public read, Active Directory, shared access key authentication. + +Extra (optional) + Specify the extra parameters (as json dictionary) that can be used in Azure connection. + The following parameters are all optional: + + * ``tenant_id``: Specify the tenant to use. Needed for Active Directory (token) authentication. + * ``connection_string``: Connection string for use with connection string authentication. + +When specifying the connection in environment variable you should specify +it using URI syntax. + +Note that all components of the URI should be URL-encoded. diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index f814543523..d7729e1a4b 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -452,6 +452,7 @@ "azure-servicebus>=7.6.1", "azure-storage-blob>=12.14.0", "azure-storage-common>=2.1.0", + "azure-storage-file-datalake>=12.9.1", "azure-storage-file>=2.1.0", "azure-synapse-spark" ], diff --git a/tests/providers/microsoft/azure/hooks/test_azure_data_lake.py b/tests/providers/microsoft/azure/hooks/test_azure_data_lake.py index f2b2d5a654..e8d378ab5c 100644 --- a/tests/providers/microsoft/azure/hooks/test_azure_data_lake.py +++ b/tests/providers/microsoft/azure/hooks/test_azure_data_lake.py @@ -21,7 +21,7 @@ import json from unittest import mock from airflow.models import Connection -from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook +from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook, AzureDataLakeStorageV2Hook from airflow.utils import db from tests.test_utils.providers import get_provider_min_airflow_version @@ -151,3 +151,97 @@ class TestAzureDataLakeHook: "You must now remove `_ensure_prefixes` from azure utils." " The functionality is now taken care of by providers manager." ) + + +class TestAzureDataLakeStorageV2Hook: + def setup_class(self) -> None: + self.conn_id: str = "adls_conn_id" + self.file_system_name = "test_file_system" + self.directory_name = "test_directory" + self.file_name = "test_file_name" + + @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_conn") + def test_create_file_system(self, mock_conn): + hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.conn_id) + hook.create_file_system("test_file_system") + expected_calls = [mock.call().create_file_system(file_system=self.file_system_name)] + mock_conn.assert_has_calls(expected_calls) + + @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.FileSystemClient") + @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_conn") + def test_get_file_system(self, mock_conn, mock_file_system): + mock_conn.return_value.get_file_system_client.return_value = mock_file_system + hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.conn_id) + result = hook.get_file_system(self.file_system_name) + assert result == mock_file_system + + @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.DataLakeDirectoryClient") + @mock.patch( + "airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_file_system" + ) + @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_conn") + def test_create_directory(self, mock_conn, mock_get_file_system, mock_directory_client): + mock_get_file_system.return_value.create_directory.return_value = mock_directory_client + hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.conn_id) + result = hook.create_directory(self.file_system_name, self.directory_name) + assert result == mock_directory_client + + @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.DataLakeDirectoryClient") + @mock.patch( + "airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_file_system" + ) + @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_conn") + def test_get_directory(self, mock_conn, mock_get_file_system, mock_directory_client): + mock_get_file_system.return_value.get_directory_client.return_value = mock_directory_client + hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.conn_id) + result = hook.get_directory_client(self.file_system_name, self.directory_name) + assert result == mock_directory_client + + @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.DataLakeFileClient") + @mock.patch( + "airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_file_system" + ) + @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_conn") + def test_create_file(self, mock_conn, mock_get_file_system, mock_file_client): + mock_get_file_system.return_value.create_file.return_value = mock_file_client + hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.conn_id) + result = hook.create_file(self.file_system_name, self.file_name) + assert result == mock_file_client + + @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_conn") + def test_delete_file_system(self, mock_conn): + hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.conn_id) + hook.delete_file_system(self.file_system_name) + expected_calls = [mock.call().delete_file_system(self.file_system_name)] + mock_conn.assert_has_calls(expected_calls) + + @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.DataLakeDirectoryClient") + @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_conn") + def test_delete_directory(self, mock_conn, mock_directory_client): + mock_conn.return_value.get_directory_client.return_value = mock_directory_client + hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.conn_id) + hook.delete_directory(self.file_system_name, self.directory_name) + expected_calls = [ + mock.call() + .get_file_system_client(self.file_system_name) + .get_directory_client(self.directory_name) + .delete_directory() + ] + mock_conn.assert_has_calls(expected_calls) + + @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_conn") + def test_list_file_system(self, mock_conn): + hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.conn_id) + hook.list_file_system(prefix="prefix") + mock_conn.return_value.list_file_systems.assert_called_once_with( + name_starts_with="prefix", include_metadata=False + ) + + @mock.patch( + "airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_file_system" + ) + @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_conn") + def test_list_files_directory(self, mock_conn, mock_get_file_system): + hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.conn_id) + hook.list_files_directory(self.file_system_name, self.directory_name) + mock_get_file_system.return_value.get_paths.assert_called_once_with(self.directory_name)