This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b7f84e913b Update Azure fileshare hook to use azure-storage-file-share
instead of azure-storage-file (#33904)
b7f84e913b is described below
commit b7f84e913b6aa4cee7fa63009082b0608b3a0bf1
Author: Pankaj Singh <[email protected]>
AuthorDate: Sat Sep 2 17:45:39 2023 +0530
Update Azure fileshare hook to use azure-storage-file-share instead of
azure-storage-file (#33904)
* Update Azure fileshare hook to use azure-storage-file-share instead of
azure-storage-file
---
.../cloud/transfers/azure_fileshare_to_gcs.py | 34 ++-
airflow/providers/microsoft/azure/CHANGELOG.rst | 23 ++
.../providers/microsoft/azure/hooks/fileshare.py | 315 +++++++++------------
airflow/providers/microsoft/azure/provider.yaml | 3 +-
.../connections/azure_fileshare.rst | 7 +-
generated/provider_dependencies.json | 2 +-
.../cloud/transfers/test_azure_fileshare_to_gcs.py | 2 +-
.../microsoft/azure/hooks/test_azure_fileshare.py | 218 ++++----------
tests/test_utils/azure_system_helpers.py | 37 ++-
9 files changed, 271 insertions(+), 370 deletions(-)
diff --git a/airflow/providers/google/cloud/transfers/azure_fileshare_to_gcs.py
b/airflow/providers/google/cloud/transfers/azure_fileshare_to_gcs.py
index e8410cb5da..a4ce95d289 100644
--- a/airflow/providers/google/cloud/transfers/azure_fileshare_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/azure_fileshare_to_gcs.py
@@ -17,10 +17,12 @@
# under the License.
from __future__ import annotations
+import warnings
from tempfile import NamedTemporaryFile
from typing import TYPE_CHECKING, Sequence
from airflow import AirflowException
+from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.gcs import GCSHook, _parse_gcs_url,
gcs_object_is_directory
from airflow.providers.microsoft.azure.hooks.fileshare import
AzureFileShareHook
@@ -73,6 +75,7 @@ class AzureFileShareToGCSOperator(BaseOperator):
share_name: str,
dest_gcs: str,
directory_name: str | None = None,
+ directory_path: str | None = None,
prefix: str = "",
azure_fileshare_conn_id: str = "azure_fileshare_default",
gcp_conn_id: str = "google_cloud_default",
@@ -84,7 +87,15 @@ class AzureFileShareToGCSOperator(BaseOperator):
super().__init__(**kwargs)
self.share_name = share_name
+ self.directory_path = directory_path
self.directory_name = directory_name
+ if self.directory_path is None:
+ self.directory_path = directory_name
+ warnings.warn(
+ "Use 'directory_path' instead of 'directory_name'.",
+ AirflowProviderDeprecationWarning,
+ stacklevel=2,
+ )
self.prefix = prefix
self.azure_fileshare_conn_id = azure_fileshare_conn_id
self.gcp_conn_id = gcp_conn_id
@@ -106,10 +117,12 @@ class AzureFileShareToGCSOperator(BaseOperator):
def execute(self, context: Context):
self._check_inputs()
- azure_fileshare_hook = AzureFileShareHook(self.azure_fileshare_conn_id)
- files = azure_fileshare_hook.list_files(
- share_name=self.share_name, directory_name=self.directory_name
+ azure_fileshare_hook = AzureFileShareHook(
+ share_name=self.share_name,
+ azure_fileshare_conn_id=self.azure_fileshare_conn_id,
+ directory_path=self.directory_path,
)
+ files = azure_fileshare_hook.list_files()
gcs_hook = GCSHook(
gcp_conn_id=self.gcp_conn_id,
@@ -141,16 +154,17 @@ class AzureFileShareToGCSOperator(BaseOperator):
if files:
self.log.info("%s files are going to be synced.", len(files))
- if self.directory_name is None:
+ if self.directory_path is None:
raise RuntimeError("The directory_name must be set!.")
for file in files:
+ azure_fileshare_hook = AzureFileShareHook(
+ share_name=self.share_name,
+ azure_fileshare_conn_id=self.azure_fileshare_conn_id,
+ directory_path=self.directory_path,
+ file_path=file,
+ )
with NamedTemporaryFile() as temp_file:
- azure_fileshare_hook.get_file_to_stream(
- stream=temp_file,
- share_name=self.share_name,
- directory_name=self.directory_name,
- file_name=file,
- )
+ azure_fileshare_hook.get_file_to_stream(stream=temp_file)
temp_file.flush()
# There will always be a '/' before file because it is
diff --git a/airflow/providers/microsoft/azure/CHANGELOG.rst
b/airflow/providers/microsoft/azure/CHANGELOG.rst
index 1fb2742811..ea8001ff59 100644
--- a/airflow/providers/microsoft/azure/CHANGELOG.rst
+++ b/airflow/providers/microsoft/azure/CHANGELOG.rst
@@ -27,6 +27,29 @@
Changelog
---------
+7.0.0
+.....
+
+Breaking changes
+~~~~~~~~~~~~~~~~
+
+.. warning::
+ In this version of the provider, we have changed AzureFileShareHook to use
azure-storage-file-share library instead
+ of azure-storage-file this change has impact on existing hook method see
below for details, removed deprecated
+ extra__azure_fileshare__ prefix from connection extras param and removed
protocol param from connection extras
+
+* get_conn from AzureFileShareHook return None instead FileService
+* Remove protocol param from Azure fileshare connection extras
+* Remove deprecated extra__azure_fileshare__ prefix from Azure fileshare
connection extras, list_files
+* Remove share_name, directory_name param from AzureFileShareHook method
check_for_directory,
+ list_directories_and_files, create_directory in favor of AzureFileShareHook
share_name and directory_path param
+* AzureFileShareHook method create_share and delete_share accept kwargs from
ShareServiceClient.create_share
+ and ShareServiceClient.delete_share
+* Remove share_name, directory_name, file_name param from AzureFileShareHook
method get_file, get_file_to_stream
+ and load_file in favor of AzureFileShareHook share_name and file_path
+* Remove AzureFileShareHook.check_for_file method
+* Remove AzureFileShareHook.load_string, AzureFileShareHook.load_stream in
favor of AzureFileShareHook.load_data
+
6.3.0
.....
diff --git a/airflow/providers/microsoft/azure/hooks/fileshare.py
b/airflow/providers/microsoft/azure/hooks/fileshare.py
index e23319734e..9cd1ec78c4 100644
--- a/airflow/providers/microsoft/azure/hooks/fileshare.py
+++ b/airflow/providers/microsoft/azure/hooks/fileshare.py
@@ -17,12 +17,11 @@
# under the License.
from __future__ import annotations
-import warnings
from typing import IO, Any
-from azure.storage.file import File, FileService
+from azure.identity import DefaultAzureCredential
+from azure.storage.fileshare import FileProperties, ShareDirectoryClient,
ShareFileClient, ShareServiceClient
-from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.hooks.base import BaseHook
@@ -31,9 +30,8 @@ class AzureFileShareHook(BaseHook):
Interacts with Azure FileShare Storage.
:param azure_fileshare_conn_id: Reference to the
- :ref:`Azure Container Volume connection
id<howto/connection:azure_fileshare>`
- of an Azure account of which container volumes should be used.
-
+ :ref:`Azure FileShare connection id<howto/connection:azure_fileshare>`
+ of an Azure account of which file share should be used.
"""
conn_name_attr = "azure_fileshare_conn_id"
@@ -41,10 +39,22 @@ class AzureFileShareHook(BaseHook):
conn_type = "azure_fileshare"
hook_name = "Azure FileShare"
- def __init__(self, azure_fileshare_conn_id: str =
"azure_fileshare_default") -> None:
+ def __init__(
+ self,
+ share_name: str | None = None,
+ file_path: str | None = None,
+ directory_path: str | None = None,
+ azure_fileshare_conn_id: str = "azure_fileshare_default",
+ ) -> None:
super().__init__()
- self.conn_id = azure_fileshare_conn_id
- self._conn = None
+ self._conn_id = azure_fileshare_conn_id
+ self.share_name = share_name
+ self.file_path = file_path
+ self.directory_path = directory_path
+ self._account_url: str | None = None
+ self._connection_string: str | None = None
+ self._account_access_key: str | None = None
+ self._sas_token: str | None = None
@staticmethod
def get_connection_form_widgets() -> dict[str, Any]:
@@ -58,9 +68,6 @@ class AzureFileShareHook(BaseHook):
"connection_string": StringField(
lazy_gettext("Connection String (optional)"),
widget=BS3TextFieldWidget()
),
- "protocol": StringField(
- lazy_gettext("Account URL or token (optional)"),
widget=BS3TextFieldWidget()
- ),
}
@staticmethod
@@ -73,218 +80,172 @@ class AzureFileShareHook(BaseHook):
"password": "Blob Storage Key (optional)",
},
"placeholders": {
- "login": "account name",
+ "login": "account name or account url",
"password": "secret",
"sas_token": "account url or token (optional)",
"connection_string": "account url or token (optional)",
- "protocol": "account url or token (optional)",
},
}
- def get_conn(self) -> FileService:
- """Return the FileService object."""
-
- def check_for_conflict(key):
- backcompat_key = f"{backcompat_prefix}{key}"
- if backcompat_key in extras:
- warnings.warn(
- f"Conflicting params `{key}` and `{backcompat_key}` found
in extras for conn "
- f"{self.conn_id}. Using value for `{key}`. Please ensure
this is the correct value "
- f"and remove the backcompat key `{backcompat_key}`."
- )
-
- backcompat_prefix = "extra__azure_fileshare__"
- if self._conn:
- return self._conn
- conn = self.get_connection(self.conn_id)
+ def get_conn(self) -> None:
+ conn = self.get_connection(self._conn_id)
extras = conn.extra_dejson
- service_options = {}
- for key, value in extras.items():
- if value == "":
- continue
- if not key.startswith("extra__"):
- service_options[key] = value
- check_for_conflict(key)
- elif key.startswith(backcompat_prefix):
- short_name = key[len(backcompat_prefix) :]
- warnings.warn(
- f"`{key}` is deprecated in azure connection extra please
use `{short_name}` instead",
- AirflowProviderDeprecationWarning,
- stacklevel=2,
- )
- if short_name not in service_options: # prefer values
provided with short name
- service_options[short_name] = value
- else:
- warnings.warn(f"Extra param `{key}` not recognized; ignoring.")
- self._conn = FileService(account_name=conn.login,
account_key=conn.password, **service_options)
- return self._conn
-
- def check_for_directory(self, share_name: str, directory_name: str,
**kwargs) -> bool:
- """
- Check if a directory exists on Azure File Share.
-
- :param share_name: Name of the share.
- :param directory_name: Name of the directory.
- :param kwargs: Optional keyword arguments that
- `FileService.exists()` takes.
- :return: True if the file exists, False otherwise.
- """
- return self.get_conn().exists(share_name, directory_name, **kwargs)
-
- def check_for_file(self, share_name: str, directory_name: str, file_name:
str, **kwargs) -> bool:
- """
- Check if a file exists on Azure File Share.
-
- :param share_name: Name of the share.
- :param directory_name: Name of the directory.
- :param file_name: Name of the file.
- :param kwargs: Optional keyword arguments that
- `FileService.exists()` takes.
- :return: True if the file exists, False otherwise.
- """
- return self.get_conn().exists(share_name, directory_name, file_name,
**kwargs)
-
- def list_directories_and_files(
- self, share_name: str, directory_name: str | None = None, **kwargs
- ) -> list:
- """
- Return the list of directories and files stored on a Azure File Share.
-
- :param share_name: Name of the share.
- :param directory_name: Name of the directory.
- :param kwargs: Optional keyword arguments that
- `FileService.list_directories_and_files()` takes.
- :return: A list of files and directories
- """
- return self.get_conn().list_directories_and_files(share_name,
directory_name, **kwargs)
+ self._connection_string = extras.get("connection_string")
+ if conn.login:
+ self._account_url = self._parse_account_url(conn.login)
+ self._sas_token = extras.get("sas_token")
+ self._account_access_key = conn.password
- def list_files(self, share_name: str, directory_name: str | None = None,
**kwargs) -> list[str]:
- """
- Return the list of files stored on a Azure File Share.
-
- :param share_name: Name of the share.
- :param directory_name: Name of the directory.
- :param kwargs: Optional keyword arguments that
- `FileService.list_directories_and_files()` takes.
- :return: A list of files
- """
- return [
- obj.name
- for obj in self.list_directories_and_files(share_name,
directory_name, **kwargs)
- if isinstance(obj, File)
- ]
+ @staticmethod
+ def _parse_account_url(account_url: str) -> str:
+ if not account_url.lower().startswith("https"):
+ return f"https://{account_url}.file.core.windows.net"
+ return account_url
+
+ @property
+ def share_service_client(self):
+ self.get_conn()
+ if self._connection_string:
+ return ShareServiceClient.from_connection_string(
+ conn_str=self._connection_string,
+ )
+ elif self._account_url and (self._sas_token or
self._account_access_key):
+ credential = self._sas_token or self._account_access_key
+ return ShareServiceClient(account_url=self._account_url,
credential=credential)
+ else:
+ return ShareServiceClient(
+ account_url=self._account_url,
credential=DefaultAzureCredential(), token_intent="backup"
+ )
+
+ @property
+ def share_directory_client(self):
+ if self._connection_string:
+ return ShareDirectoryClient.from_connection_string(
+ conn_str=self._connection_string,
+ share_name=self.share_name,
+ directory_path=self.directory_path,
+ )
+ elif self._account_url and (self._sas_token or
self._account_access_key):
+ credential = self._sas_token or self._account_access_key
+ return ShareDirectoryClient(
+ account_url=self._account_url,
+ share_name=self.share_name,
+ directory_path=self.directory_path,
+ credential=credential,
+ )
+ else:
+ return ShareDirectoryClient(
+ account_url=self._account_url,
+ share_name=self.share_name,
+ directory_path=self.directory_path,
+ credential=DefaultAzureCredential(),
+ token_intent="backup",
+ )
+
+ @property
+ def share_file_client(self):
+ if self._connection_string:
+ return ShareFileClient.from_connection_string(
+ conn_str=self._connection_string,
+ share_name=self.share_name,
+ file_path=self.file_path,
+ )
+ elif self._account_url and (self._sas_token or
self._account_access_key):
+ credential = self._sas_token or self._account_access_key
+ return ShareFileClient(
+ account_url=self._account_url,
+ share_name=self.share_name,
+ file_path=self.file_path,
+ credential=credential,
+ )
+ else:
+ return ShareFileClient(
+ account_url=self._account_url,
+ share_name=self.share_name,
+ file_path=self.file_path,
+ credential=DefaultAzureCredential(),
+ token_intent="backup",
+ )
+
+ def check_for_directory(self) -> bool:
+ """Check if a directory exists on Azure File Share."""
+ return self.share_directory_client.exists()
+
+ def list_directories_and_files(self) -> list:
+ """Return the list of directories and files stored on a Azure File
Share."""
+ return list(self.share_directory_client.list_directories_and_files())
+
+ def list_files(self) -> list[str]:
+ """Return the list of files stored on a Azure File Share."""
+ return [obj.name for obj in self.list_directories_and_files() if
isinstance(obj, FileProperties)]
def create_share(self, share_name: str, **kwargs) -> bool:
"""
Create new Azure File Share.
:param share_name: Name of the share.
- :param kwargs: Optional keyword arguments that
- `FileService.create_share()` takes.
:return: True if share is created, False if share already exists.
"""
- return self.get_conn().create_share(share_name, **kwargs)
+ try:
+ self.share_service_client.create_share(share_name, **kwargs)
+ except Exception as e:
+ self.log.warning(e)
+ return False
+ return True
def delete_share(self, share_name: str, **kwargs) -> bool:
"""
Delete existing Azure File Share.
:param share_name: Name of the share.
- :param kwargs: Optional keyword arguments that
- `FileService.delete_share()` takes.
:return: True if share is deleted, False if share does not exist.
"""
- return self.get_conn().delete_share(share_name, **kwargs)
+ try:
+ self.share_service_client.delete_share(share_name, **kwargs)
+ except Exception as e:
+ self.log.warning(e)
+ return False
+ return True
- def create_directory(self, share_name: str, directory_name: str, **kwargs)
-> list:
- """
- Create a new directory on a Azure File Share.
+ def create_directory(self, **kwargs) -> Any:
+ """Create a new directory on a Azure File Share."""
+ return self.share_directory_client.create_directory(**kwargs)
- :param share_name: Name of the share.
- :param directory_name: Name of the directory.
- :param kwargs: Optional keyword arguments that
- `FileService.create_directory()` takes.
- :return: A list of files and directories
- """
- return self.get_conn().create_directory(share_name, directory_name,
**kwargs)
-
- def get_file(
- self, file_path: str, share_name: str, directory_name: str, file_name:
str, **kwargs
- ) -> None:
+ def get_file(self, file_path: str, **kwargs) -> None:
"""
Download a file from Azure File Share.
:param file_path: Where to store the file.
- :param share_name: Name of the share.
- :param directory_name: Name of the directory.
- :param file_name: Name of the file.
- :param kwargs: Optional keyword arguments that
- `FileService.get_file_to_path()` takes.
"""
- self.get_conn().get_file_to_path(share_name, directory_name,
file_name, file_path, **kwargs)
+ with open(file_path, "wb") as file_handle:
+ data = self.share_file_client.download_file(**kwargs)
+ data.readinto(file_handle)
- def get_file_to_stream(
- self, stream: IO, share_name: str, directory_name: str, file_name:
str, **kwargs
- ) -> None:
+ def get_file_to_stream(self, stream: IO, **kwargs) -> None:
"""
Download a file from Azure File Share.
:param stream: A filehandle to store the file to.
- :param share_name: Name of the share.
- :param directory_name: Name of the directory.
- :param file_name: Name of the file.
- :param kwargs: Optional keyword arguments that
- `FileService.get_file_to_stream()` takes.
"""
- self.get_conn().get_file_to_stream(share_name, directory_name,
file_name, stream, **kwargs)
+ data = self.share_file_client.download_file(**kwargs)
+ data.readinto(stream)
- def load_file(
- self, file_path: str, share_name: str, directory_name: str, file_name:
str, **kwargs
- ) -> None:
+ def load_file(self, file_path: str, **kwargs) -> None:
"""
Upload a file to Azure File Share.
:param file_path: Path to the file to load.
- :param share_name: Name of the share.
- :param directory_name: Name of the directory.
- :param file_name: Name of the file.
- :param kwargs: Optional keyword arguments that
- `FileService.create_file_from_path()` takes.
"""
- self.get_conn().create_file_from_path(share_name, directory_name,
file_name, file_path, **kwargs)
+ with open(file_path, "rb") as source_file:
+ self.share_file_client.upload_file(source_file, **kwargs)
- def load_string(
- self, string_data: str, share_name: str, directory_name: str,
file_name: str, **kwargs
- ) -> None:
+ def load_data(self, string_data: bytes | str | IO, **kwargs) -> None:
"""
Upload a string to Azure File Share.
- :param string_data: String to load.
- :param share_name: Name of the share.
- :param directory_name: Name of the directory.
- :param file_name: Name of the file.
- :param kwargs: Optional keyword arguments that
- `FileService.create_file_from_text()` takes.
- """
- self.get_conn().create_file_from_text(share_name, directory_name,
file_name, string_data, **kwargs)
-
- def load_stream(
- self, stream: str, share_name: str, directory_name: str, file_name:
str, count: str, **kwargs
- ) -> None:
- """
- Upload a stream to Azure File Share.
-
- :param stream: Opened file/stream to upload as the file content.
- :param share_name: Name of the share.
- :param directory_name: Name of the directory.
- :param file_name: Name of the file.
- :param count: Size of the stream in bytes
- :param kwargs: Optional keyword arguments that
- `FileService.create_file_from_stream()` takes.
+ :param string_data: String/Stream to load.
"""
- self.get_conn().create_file_from_stream(
- share_name, directory_name, file_name, stream, count, **kwargs
- )
+ self.share_file_client.upload_file(string_data, **kwargs)
def test_connection(self):
"""Test Azure FileShare connection."""
@@ -292,7 +253,7 @@ class AzureFileShareHook(BaseHook):
try:
# Attempt to retrieve file share information
- next(iter(self.get_conn().list_shares()))
+ next(iter(self.share_service_client.list_shares()))
return success
except StopIteration:
# If the iterator returned is empty it should still be considered
a successful connection since
diff --git a/airflow/providers/microsoft/azure/provider.yaml
b/airflow/providers/microsoft/azure/provider.yaml
index 40673fb1eb..75f90d7d57 100644
--- a/airflow/providers/microsoft/azure/provider.yaml
+++ b/airflow/providers/microsoft/azure/provider.yaml
@@ -21,6 +21,7 @@ description: |
`Microsoft Azure <https://azure.microsoft.com/>`__
suspended: false
versions:
+ - 7.0.0
- 6.3.0
- 6.2.4
- 6.2.3
@@ -74,8 +75,8 @@ dependencies:
- azure-mgmt-resource>=2.2.0
- azure-storage-blob>=12.14.0
- azure-storage-common>=2.1.0
- - azure-storage-file>=2.1.0
- azure-mgmt-storage>=16.0.0
+ - azure-storage-file-share
- azure-servicebus>=7.6.1
- azure-synapse-spark
- adal>=1.2.7
diff --git
a/docs/apache-airflow-providers-microsoft-azure/connections/azure_fileshare.rst
b/docs/apache-airflow-providers-microsoft-azure/connections/azure_fileshare.rst
index 39b827d4d1..6510978719 100644
---
a/docs/apache-airflow-providers-microsoft-azure/connections/azure_fileshare.rst
+++
b/docs/apache-airflow-providers-microsoft-azure/connections/azure_fileshare.rst
@@ -30,13 +30,13 @@ Authenticating to Azure File Share
There are four ways to connect to Azure File Share using Airflow.
1. Use `token credentials
-
<https://docs.microsoft.com/en-us/azure/developer/python/azure-sdk-authenticate?tabs=cmd#authenticate-with-token-credentials>`_
+
<https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/identity/azure-identity>`_
i.e. add specific credentials (client_id, secret) and subscription id to
the Airflow connection.
2. Use a `SAS Token
-
<https://docs.microsoft.com/en-us/rest/api/storageservices/create-account-sas>`_
+
<https://learn.microsoft.com/en-gb/azure/storage/common/storage-sas-overview>`_
i.e. add a key config to ``sas_token`` in the Airflow connection.
3. Use a `Connection String
-
<https://docs.microsoft.com/en-us/azure/data-explorer/kusto/api/connection-strings/storage>`_
+
<https://learn.microsoft.com/en-us/azure/storage/common/storage-configure-connection-string>`_
i.e. add connection string to ``connection_string`` in the Airflow
connection.
Only one authorization method can be used at a time. If you need to manage
multiple credentials or keys then you should
@@ -66,7 +66,6 @@ Extra (optional)
* ``connection_string``: Connection string for use with connection string
authentication.
* ``sas_token``: SAS Token for use with SAS Token authentication.
- * ``protocol``: Specify the protocol to use (default is ``https``).
When specifying the connection in environment variable you should specify
it using URI syntax.
diff --git a/generated/provider_dependencies.json
b/generated/provider_dependencies.json
index 014a738668..d130259abb 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -566,7 +566,7 @@
"azure-storage-blob>=12.14.0",
"azure-storage-common>=2.1.0",
"azure-storage-file-datalake>=12.9.1",
- "azure-storage-file>=2.1.0",
+ "azure-storage-file-share",
"azure-synapse-spark"
],
"cross-providers-deps": [
diff --git
a/tests/providers/google/cloud/transfers/test_azure_fileshare_to_gcs.py
b/tests/providers/google/cloud/transfers/test_azure_fileshare_to_gcs.py
index 09d2162868..0ef2f4a5b9 100644
--- a/tests/providers/google/cloud/transfers/test_azure_fileshare_to_gcs.py
+++ b/tests/providers/google/cloud/transfers/test_azure_fileshare_to_gcs.py
@@ -80,7 +80,7 @@ class TestAzureFileShareToGCSOperator:
any_order=True,
)
-
azure_fileshare_mock_hook.assert_called_once_with(AZURE_FILESHARE_CONN_ID)
+ assert
azure_fileshare_mock_hook.return_value.get_file_to_stream.call_count == 3
gcs_mock_hook.assert_called_once_with(
gcp_conn_id=GCS_CONN_ID,
diff --git a/tests/providers/microsoft/azure/hooks/test_azure_fileshare.py
b/tests/providers/microsoft/azure/hooks/test_azure_fileshare.py
index a99ca9e90b..8456cd3602 100644
--- a/tests/providers/microsoft/azure/hooks/test_azure_fileshare.py
+++ b/tests/providers/microsoft/azure/hooks/test_azure_fileshare.py
@@ -15,23 +15,13 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""
-This module contains integration with Azure File Share.
-
-Cloud variant of a SMB file share. Make sure that a Airflow connection of
-type `wasb` exists. Authorization can be done by supplying a login (=Storage
account name)
-and password (=Storage account key), or login and SAS token in the extra field
-(see connection `azure_fileshare_default` for an example).
-"""
from __future__ import annotations
-import os
+import io
from unittest import mock
-from unittest.mock import patch
import pytest
-from azure.storage.file import Directory, File
-from pytest import param
+from azure.storage.fileshare import DirectoryProperties, FileProperties,
ShareServiceClient
from airflow.models import Connection
from airflow.providers.microsoft.azure.hooks.fileshare import
AzureFileShareHook
@@ -51,7 +41,7 @@ class TestAzureFileshareHook:
conn_id="azure_fileshare_extras",
conn_type="azure_fileshare",
login="login",
- extra={"sas_token": "token", "protocol": "http"},
+ extra={"sas_token": "token"},
),
# Neither password nor sas_token present
Connection(
@@ -68,203 +58,119 @@ class TestAzureFileshareHook:
)
def test_key_and_connection(self):
- from azure.storage.file import FileService
hook =
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_test_key")
- assert hook.conn_id == "azure_fileshare_test_key"
- assert hook._conn is None
- print(hook.get_conn())
- assert isinstance(hook.get_conn(), FileService)
+ assert hook._conn_id == "azure_fileshare_test_key"
+ share_client = hook.share_service_client
+ assert isinstance(share_client, ShareServiceClient)
def test_sas_token(self):
- from azure.storage.file import FileService
-
- hook =
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_extras")
- assert hook.conn_id == "azure_fileshare_extras"
- assert isinstance(hook.get_conn(), FileService)
-
- def test_wrong_extras(self):
- hook =
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_extras_wrong")
- assert hook.conn_id == "azure_fileshare_extras_wrong"
- with pytest.raises(TypeError, match=".*wrong_key.*"):
- hook.get_conn()
- def test_missing_credentials(self):
- hook =
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_missing_credentials")
- assert hook.conn_id == "azure_fileshare_missing_credentials"
- with pytest.raises(ValueError, match=".*account_key or sas_token.*"):
- hook.get_conn()
-
-
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService",
autospec=True)
- def test_check_for_file(self, mock_service):
- mock_instance = mock_service.return_value
- mock_instance.exists.return_value = True
hook =
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_extras")
- assert hook.check_for_file("share", "directory", "file", timeout=3)
- mock_instance.exists.assert_called_once_with("share", "directory",
"file", timeout=3)
+ assert hook._conn_id == "azure_fileshare_extras"
+ share_client = hook.share_service_client
+ assert isinstance(share_client, ShareServiceClient)
-
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService",
autospec=True)
+
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.ShareDirectoryClient",
autospec=True)
def test_check_for_directory(self, mock_service):
mock_instance = mock_service.return_value
mock_instance.exists.return_value = True
- hook =
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_extras")
- assert hook.check_for_directory("share", "directory", timeout=3)
- mock_instance.exists.assert_called_once_with("share", "directory",
timeout=3)
-
-
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService",
autospec=True)
- def test_load_file(self, mock_service):
- mock_instance = mock_service.return_value
- hook =
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_extras")
- hook.load_file("path", "share", "directory", "file", max_connections=1)
- mock_instance.create_file_from_path.assert_called_once_with(
- "share", "directory", "file", "path", max_connections=1
- )
-
-
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService",
autospec=True)
- def test_load_string(self, mock_service):
- mock_instance = mock_service.return_value
- hook =
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_extras")
- hook.load_string("big string", "share", "directory", "file", timeout=1)
- mock_instance.create_file_from_text.assert_called_once_with(
- "share", "directory", "file", "big string", timeout=1
+ hook = AzureFileShareHook(
+ azure_fileshare_conn_id="azure_fileshare_extras",
share_name="share", directory_path="directory"
)
+ assert hook.check_for_directory()
+ mock_instance.exists.assert_called_once_with()
-
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService",
autospec=True)
- def test_load_stream(self, mock_service):
+
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.ShareFileClient",
autospec=True)
+ def test_load_data(self, mock_service):
mock_instance = mock_service.return_value
- hook =
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_extras")
- hook.load_stream("stream", "share", "directory", "file", 42, timeout=1)
- mock_instance.create_file_from_stream.assert_called_once_with(
- "share", "directory", "file", "stream", 42, timeout=1
+ hook = AzureFileShareHook(
+ azure_fileshare_conn_id="azure_fileshare_extras",
share_name="share", file_path="file"
)
+ hook.load_data("big string")
+ mock_instance.upload_file.assert_called_once_with("big string")
-
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService",
autospec=True)
+
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.ShareDirectoryClient",
autospec=True)
def test_list_directories_and_files(self, mock_service):
mock_instance = mock_service.return_value
- hook =
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_extras")
- hook.list_directories_and_files("share", "directory", timeout=1)
-
mock_instance.list_directories_and_files.assert_called_once_with("share",
"directory", timeout=1)
+ hook = AzureFileShareHook(
+ azure_fileshare_conn_id="azure_fileshare_extras",
share_name="share", directory_path="directory"
+ )
+ hook.list_directories_and_files()
+ mock_instance.list_directories_and_files.assert_called_once_with()
-
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService",
autospec=True)
+
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.ShareDirectoryClient",
autospec=True)
def test_list_files(self, mock_service):
mock_instance = mock_service.return_value
mock_instance.list_directories_and_files.return_value = [
- File("file1"),
- File("file2"),
- Directory("dir1"),
- Directory("dir2"),
+ FileProperties(name="file1"),
+ FileProperties(name="file2"),
+ DirectoryProperties(name="dir1"),
+ DirectoryProperties(name="dir2"),
]
- hook =
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_extras")
- files = hook.list_files("share", "directory", timeout=1)
+ hook = AzureFileShareHook(
+ azure_fileshare_conn_id="azure_fileshare_extras",
share_name="share", directory_path="directory"
+ )
+ files = hook.list_files()
assert files == ["file1", "file2"]
-
mock_instance.list_directories_and_files.assert_called_once_with("share",
"directory", timeout=1)
+ mock_instance.list_directories_and_files.assert_called_once_with()
-
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService",
autospec=True)
+
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.ShareDirectoryClient",
autospec=True)
def test_create_directory(self, mock_service):
mock_instance = mock_service.return_value
- hook =
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_extras")
- hook.create_directory("share", "directory", timeout=1)
- mock_instance.create_directory.assert_called_once_with("share",
"directory", timeout=1)
+ hook = AzureFileShareHook(
+ azure_fileshare_conn_id="azure_fileshare_extras",
share_name="share", directory_path="directory"
+ )
+ hook.create_directory()
+ mock_instance.create_directory.assert_called_once_with()
-
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService",
autospec=True)
+
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.ShareFileClient",
autospec=True)
def test_get_file(self, mock_service):
mock_instance = mock_service.return_value
- hook =
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_extras")
- hook.get_file("path", "share", "directory", "file", max_connections=1)
- mock_instance.get_file_to_path.assert_called_once_with(
- "share", "directory", "file", "path", max_connections=1
+ hook = AzureFileShareHook(
+ azure_fileshare_conn_id="azure_fileshare_extras",
share_name="share", file_path="file"
)
+ hook.get_file("path")
+ mock_instance.download_file.assert_called_once_with()
-
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService",
autospec=True)
+
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.ShareFileClient",
autospec=True)
def test_get_file_to_stream(self, mock_service):
mock_instance = mock_service.return_value
- hook =
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_extras")
- hook.get_file_to_stream("stream", "share", "directory", "file",
max_connections=1)
- mock_instance.get_file_to_stream.assert_called_once_with(
- "share", "directory", "file", "stream", max_connections=1
+ hook = AzureFileShareHook(
+ azure_fileshare_conn_id="azure_fileshare_extras",
share_name="share", file_path="file"
)
+ data = io.StringIO("stream")
+ hook.get_file_to_stream(stream=data)
+ mock_instance.download_file.assert_called_once_with()
-
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService",
autospec=True)
+
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.ShareServiceClient",
autospec=True)
def test_create_share(self, mock_service):
mock_instance = mock_service.return_value
hook =
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_extras")
- hook.create_share("my_share")
+ hook.create_share(share_name="my_share")
mock_instance.create_share.assert_called_once_with("my_share")
-
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService",
autospec=True)
+
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.ShareServiceClient",
autospec=True)
def test_delete_share(self, mock_service):
mock_instance = mock_service.return_value
hook =
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_extras")
hook.delete_share("my_share")
mock_instance.delete_share.assert_called_once_with("my_share")
-
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService",
autospec=True)
+
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.ShareServiceClient",
autospec=True)
def test_connection_success(self, mock_service):
+ mock_instance = mock_service.return_value
hook =
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_extras")
- hook.get_conn().list_shares.return_value = ["test_container"]
+ mock_instance.list_shares.return_value = ["test_container"]
status, msg = hook.test_connection()
assert status is True
assert msg == "Successfully connected to Azure File Share."
-
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService",
autospec=True)
+
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.ShareServiceClient",
autospec=True)
def test_connection_failure(self, mock_service):
+ mock_instance = mock_service.return_value
hook =
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_extras")
- hook.get_conn().list_shares.side_effect = Exception("Test Connection
Failure")
+ mock_instance.list_shares.side_effect = Exception("Test Connection
Failure")
status, msg = hook.test_connection()
assert status is False
assert msg == "Test Connection Failure"
-
- @pytest.mark.parametrize(
- "uri",
- [
- param(
-
"a://?extra__azure_fileshare__anything=abc&extra__azure_fileshare__other_thing=abc",
- id="prefix",
- ),
- param("a://?anything=abc&other_thing=abc", id="no-prefix"),
- ],
- )
- @patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService")
- def test_backcompat_prefix_works(self, mock_service, uri):
- with patch.dict(os.environ, AIRFLOW_CONN_MY_CONN=uri):
- hook = AzureFileShareHook("my_conn")
- hook.get_conn()
- mock_service.assert_called_with(
- account_key=None, account_name=None, anything="abc",
other_thing="abc"
- )
-
- @patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService")
- def test_backcompat_prefix_both_causes_warning(self, mock_service):
- with patch.dict(
- os.environ,
-
AIRFLOW_CONN_MY_CONN="a://?anything=non-prefixed&extra__azure_fileshare__anything=prefixed",
- ):
- hook = AzureFileShareHook("my_conn")
- with pytest.warns(Warning, match="Using value for `anything`"):
- hook.get_conn()
- mock_service.assert_called_with(account_key=None,
account_name=None, anything="non-prefixed")
-
- @patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService")
- def test_empty_string_ignored(self, mock_service):
- with patch.dict(
- os.environ,
- AIRFLOW_CONN_MY_CONN='{"extra": {"anything": "", "other_thing":
"a"}}',
- ):
- hook = AzureFileShareHook("my_conn")
- hook.get_conn()
- mock_service.assert_called_with(account_key=None,
account_name=None, other_thing="a")
-
- @patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService")
- def test_extra_params_forwarded_to_service_options(self, fs_mock):
- with patch.dict(os.environ,
AIRFLOW_CONN_MY_CONN="a://login@?a=1&b=2&c=3"):
- hook = AzureFileShareHook("my_conn")
- hook.get_conn()
- fs_mock.assert_called_with(account_key=None, account_name="login",
a="1", b="2", c="3")
-
- @patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService")
- def test_unrecognized_extra_warns(self, fs_mock):
- with patch.dict(os.environ,
AIRFLOW_CONN_MY_CONN="a://login:password@?extra__wasb__hello=hi&foo=bar"):
- hook = AzureFileShareHook("my_conn")
- with pytest.warns(Warning, match="Extra param `extra__wasb__hello`
not recognized; ignoring."):
- hook.get_conn()
- fs_mock.assert_called_with(account_key="password",
account_name="login", foo="bar")
diff --git a/tests/test_utils/azure_system_helpers.py
b/tests/test_utils/azure_system_helpers.py
index 85c700537a..033399235a 100644
--- a/tests/test_utils/azure_system_helpers.py
+++ b/tests/test_utils/azure_system_helpers.py
@@ -113,12 +113,14 @@ class AzureSystemTest(SystemTest):
@classmethod
def delete_share(cls, share_name: str, azure_fileshare_conn_id: str):
hook =
AzureFileShareHook(azure_fileshare_conn_id=azure_fileshare_conn_id)
- hook.delete_share(share_name)
+ hook.delete_share(share_name=share_name)
@classmethod
def create_directory(cls, share_name: str, azure_fileshare_conn_id: str,
directory: str):
- hook =
AzureFileShareHook(azure_fileshare_conn_id=azure_fileshare_conn_id)
- hook.create_directory(share_name=share_name, directory_name=directory)
+ hook = AzureFileShareHook(
+ azure_fileshare_conn_id=azure_fileshare_conn_id,
share_name=share_name, directory_path=directory
+ )
+ hook.create_directory()
@classmethod
def upload_file_from_string(
@@ -127,30 +129,25 @@ class AzureSystemTest(SystemTest):
share_name: str,
azure_fileshare_conn_id: str,
file_name: str,
- directory: str,
):
- hook =
AzureFileShareHook(azure_fileshare_conn_id=azure_fileshare_conn_id)
- hook.load_string(
- string_data=string_data,
- share_name=share_name,
- directory_name=directory,
- file_name=file_name,
+ hook = AzureFileShareHook(
+ azure_fileshare_conn_id=azure_fileshare_conn_id,
share_name=share_name, file_path=file_name
)
+ hook.load_data(string_data=string_data)
@classmethod
def prepare_share(cls, share_name: str, azure_fileshare_conn_id: str,
file_name: str, directory: str):
"""
Create share with a file in given directory. If directory is None,
file is in root dir.
"""
- cls.create_share(share_name=share_name,
azure_fileshare_conn_id=azure_fileshare_conn_id)
- cls.create_directory(
- share_name=share_name,
azure_fileshare_conn_id=azure_fileshare_conn_id, directory=directory
- )
- string_data = "".join(random.choices(string.ascii_letters, k=1024))
- cls.upload_file_from_string(
- string_data=string_data,
- share_name=share_name,
+ hook = AzureFileShareHook(
azure_fileshare_conn_id=azure_fileshare_conn_id,
- file_name=file_name,
- directory=directory,
+ share_name=share_name,
+ directory_path=directory,
+ file_path=file_name,
)
+ hook.create_share(share_name)
+ hook.create_directory()
+
+ string_data = "".join(random.choices(string.ascii_letters, k=1024))
+ hook.load_data(string_data)