This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b7f84e913b Update Azure fileshare hook to use azure-storage-file-share 
instead of azure-storage-file (#33904)
b7f84e913b is described below

commit b7f84e913b6aa4cee7fa63009082b0608b3a0bf1
Author: Pankaj Singh <[email protected]>
AuthorDate: Sat Sep 2 17:45:39 2023 +0530

    Update Azure fileshare hook to use azure-storage-file-share instead of 
azure-storage-file (#33904)
    
    * Update Azure fileshare hook to use azure-storage-file-share instead of 
azure-storage-file
---
 .../cloud/transfers/azure_fileshare_to_gcs.py      |  34 ++-
 airflow/providers/microsoft/azure/CHANGELOG.rst    |  23 ++
 .../providers/microsoft/azure/hooks/fileshare.py   | 315 +++++++++------------
 airflow/providers/microsoft/azure/provider.yaml    |   3 +-
 .../connections/azure_fileshare.rst                |   7 +-
 generated/provider_dependencies.json               |   2 +-
 .../cloud/transfers/test_azure_fileshare_to_gcs.py |   2 +-
 .../microsoft/azure/hooks/test_azure_fileshare.py  | 218 ++++----------
 tests/test_utils/azure_system_helpers.py           |  37 ++-
 9 files changed, 271 insertions(+), 370 deletions(-)

diff --git a/airflow/providers/google/cloud/transfers/azure_fileshare_to_gcs.py 
b/airflow/providers/google/cloud/transfers/azure_fileshare_to_gcs.py
index e8410cb5da..a4ce95d289 100644
--- a/airflow/providers/google/cloud/transfers/azure_fileshare_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/azure_fileshare_to_gcs.py
@@ -17,10 +17,12 @@
 # under the License.
 from __future__ import annotations
 
+import warnings
 from tempfile import NamedTemporaryFile
 from typing import TYPE_CHECKING, Sequence
 
 from airflow import AirflowException
+from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.gcs import GCSHook, _parse_gcs_url, 
gcs_object_is_directory
 from airflow.providers.microsoft.azure.hooks.fileshare import 
AzureFileShareHook
@@ -73,6 +75,7 @@ class AzureFileShareToGCSOperator(BaseOperator):
         share_name: str,
         dest_gcs: str,
         directory_name: str | None = None,
+        directory_path: str | None = None,
         prefix: str = "",
         azure_fileshare_conn_id: str = "azure_fileshare_default",
         gcp_conn_id: str = "google_cloud_default",
@@ -84,7 +87,15 @@ class AzureFileShareToGCSOperator(BaseOperator):
         super().__init__(**kwargs)
 
         self.share_name = share_name
+        self.directory_path = directory_path
         self.directory_name = directory_name
+        if self.directory_path is None:
+            self.directory_path = directory_name
+            warnings.warn(
+                "Use 'directory_path' instead of 'directory_name'.",
+                AirflowProviderDeprecationWarning,
+                stacklevel=2,
+            )
         self.prefix = prefix
         self.azure_fileshare_conn_id = azure_fileshare_conn_id
         self.gcp_conn_id = gcp_conn_id
@@ -106,10 +117,12 @@ class AzureFileShareToGCSOperator(BaseOperator):
 
     def execute(self, context: Context):
         self._check_inputs()
-        azure_fileshare_hook = AzureFileShareHook(self.azure_fileshare_conn_id)
-        files = azure_fileshare_hook.list_files(
-            share_name=self.share_name, directory_name=self.directory_name
+        azure_fileshare_hook = AzureFileShareHook(
+            share_name=self.share_name,
+            azure_fileshare_conn_id=self.azure_fileshare_conn_id,
+            directory_path=self.directory_path,
         )
+        files = azure_fileshare_hook.list_files()
 
         gcs_hook = GCSHook(
             gcp_conn_id=self.gcp_conn_id,
@@ -141,16 +154,17 @@ class AzureFileShareToGCSOperator(BaseOperator):
 
         if files:
             self.log.info("%s files are going to be synced.", len(files))
-            if self.directory_name is None:
+            if self.directory_path is None:
                 raise RuntimeError("The directory_name must be set!.")
             for file in files:
+                azure_fileshare_hook = AzureFileShareHook(
+                    share_name=self.share_name,
+                    azure_fileshare_conn_id=self.azure_fileshare_conn_id,
+                    directory_path=self.directory_path,
+                    file_path=file,
+                )
                 with NamedTemporaryFile() as temp_file:
-                    azure_fileshare_hook.get_file_to_stream(
-                        stream=temp_file,
-                        share_name=self.share_name,
-                        directory_name=self.directory_name,
-                        file_name=file,
-                    )
+                    azure_fileshare_hook.get_file_to_stream(stream=temp_file)
                     temp_file.flush()
 
                     # There will always be a '/' before file because it is
diff --git a/airflow/providers/microsoft/azure/CHANGELOG.rst 
b/airflow/providers/microsoft/azure/CHANGELOG.rst
index 1fb2742811..ea8001ff59 100644
--- a/airflow/providers/microsoft/azure/CHANGELOG.rst
+++ b/airflow/providers/microsoft/azure/CHANGELOG.rst
@@ -27,6 +27,29 @@
 Changelog
 ---------
 
+7.0.0
+.....
+
+Breaking changes
+~~~~~~~~~~~~~~~~
+
+.. warning::
+  In this version of the provider, we have changed AzureFileShareHook to use 
azure-storage-file-share library instead
+  of azure-storage-file this change has impact on existing hook method see 
below for details, removed deprecated
+  extra__azure_fileshare__ prefix from connection extras param and removed 
protocol param from connection extras
+
+* get_conn from AzureFileShareHook return None instead FileService
+* Remove protocol param from Azure fileshare connection extras
+* Remove deprecated extra__azure_fileshare__ prefix from Azure fileshare 
connection extras, list_files
+* Remove share_name, directory_name param from AzureFileShareHook method 
check_for_directory,
+  list_directories_and_files, create_directory in favor of AzureFileShareHook 
share_name and directory_path param
+* AzureFileShareHook method create_share and delete_share accept kwargs from 
ShareServiceClient.create_share
+  and ShareServiceClient.delete_share
+* Remove share_name, directory_name, file_name param from AzureFileShareHook 
method get_file, get_file_to_stream
+  and load_file in favor of AzureFileShareHook share_name and file_path
+* Remove AzureFileShareHook.check_for_file method
+* Remove AzureFileShareHook.load_string, AzureFileShareHook.load_stream in 
favor of AzureFileShareHook.load_data
+
 6.3.0
 .....
 
diff --git a/airflow/providers/microsoft/azure/hooks/fileshare.py 
b/airflow/providers/microsoft/azure/hooks/fileshare.py
index e23319734e..9cd1ec78c4 100644
--- a/airflow/providers/microsoft/azure/hooks/fileshare.py
+++ b/airflow/providers/microsoft/azure/hooks/fileshare.py
@@ -17,12 +17,11 @@
 # under the License.
 from __future__ import annotations
 
-import warnings
 from typing import IO, Any
 
-from azure.storage.file import File, FileService
+from azure.identity import DefaultAzureCredential
+from azure.storage.fileshare import FileProperties, ShareDirectoryClient, 
ShareFileClient, ShareServiceClient
 
-from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.hooks.base import BaseHook
 
 
@@ -31,9 +30,8 @@ class AzureFileShareHook(BaseHook):
     Interacts with Azure FileShare Storage.
 
     :param azure_fileshare_conn_id: Reference to the
-        :ref:`Azure Container Volume connection 
id<howto/connection:azure_fileshare>`
-        of an Azure account of which container volumes should be used.
-
+        :ref:`Azure FileShare connection id<howto/connection:azure_fileshare>`
+        of an Azure account of which file share should be used.
     """
 
     conn_name_attr = "azure_fileshare_conn_id"
@@ -41,10 +39,22 @@ class AzureFileShareHook(BaseHook):
     conn_type = "azure_fileshare"
     hook_name = "Azure FileShare"
 
-    def __init__(self, azure_fileshare_conn_id: str = 
"azure_fileshare_default") -> None:
+    def __init__(
+        self,
+        share_name: str | None = None,
+        file_path: str | None = None,
+        directory_path: str | None = None,
+        azure_fileshare_conn_id: str = "azure_fileshare_default",
+    ) -> None:
         super().__init__()
-        self.conn_id = azure_fileshare_conn_id
-        self._conn = None
+        self._conn_id = azure_fileshare_conn_id
+        self.share_name = share_name
+        self.file_path = file_path
+        self.directory_path = directory_path
+        self._account_url: str | None = None
+        self._connection_string: str | None = None
+        self._account_access_key: str | None = None
+        self._sas_token: str | None = None
 
     @staticmethod
     def get_connection_form_widgets() -> dict[str, Any]:
@@ -58,9 +68,6 @@ class AzureFileShareHook(BaseHook):
             "connection_string": StringField(
                 lazy_gettext("Connection String (optional)"), 
widget=BS3TextFieldWidget()
             ),
-            "protocol": StringField(
-                lazy_gettext("Account URL or token (optional)"), 
widget=BS3TextFieldWidget()
-            ),
         }
 
     @staticmethod
@@ -73,218 +80,172 @@ class AzureFileShareHook(BaseHook):
                 "password": "Blob Storage Key (optional)",
             },
             "placeholders": {
-                "login": "account name",
+                "login": "account name or account url",
                 "password": "secret",
                 "sas_token": "account url or token (optional)",
                 "connection_string": "account url or token (optional)",
-                "protocol": "account url or token (optional)",
             },
         }
 
-    def get_conn(self) -> FileService:
-        """Return the FileService object."""
-
-        def check_for_conflict(key):
-            backcompat_key = f"{backcompat_prefix}{key}"
-            if backcompat_key in extras:
-                warnings.warn(
-                    f"Conflicting params `{key}` and `{backcompat_key}` found 
in extras for conn "
-                    f"{self.conn_id}. Using value for `{key}`.  Please ensure 
this is the correct value "
-                    f"and remove the backcompat key `{backcompat_key}`."
-                )
-
-        backcompat_prefix = "extra__azure_fileshare__"
-        if self._conn:
-            return self._conn
-        conn = self.get_connection(self.conn_id)
+    def get_conn(self) -> None:
+        conn = self.get_connection(self._conn_id)
         extras = conn.extra_dejson
-        service_options = {}
-        for key, value in extras.items():
-            if value == "":
-                continue
-            if not key.startswith("extra__"):
-                service_options[key] = value
-                check_for_conflict(key)
-            elif key.startswith(backcompat_prefix):
-                short_name = key[len(backcompat_prefix) :]
-                warnings.warn(
-                    f"`{key}` is deprecated in azure connection extra please 
use `{short_name}` instead",
-                    AirflowProviderDeprecationWarning,
-                    stacklevel=2,
-                )
-                if short_name not in service_options:  # prefer values 
provided with short name
-                    service_options[short_name] = value
-            else:
-                warnings.warn(f"Extra param `{key}` not recognized; ignoring.")
-        self._conn = FileService(account_name=conn.login, 
account_key=conn.password, **service_options)
-        return self._conn
-
-    def check_for_directory(self, share_name: str, directory_name: str, 
**kwargs) -> bool:
-        """
-        Check if a directory exists on Azure File Share.
-
-        :param share_name: Name of the share.
-        :param directory_name: Name of the directory.
-        :param kwargs: Optional keyword arguments that
-            `FileService.exists()` takes.
-        :return: True if the file exists, False otherwise.
-        """
-        return self.get_conn().exists(share_name, directory_name, **kwargs)
-
-    def check_for_file(self, share_name: str, directory_name: str, file_name: 
str, **kwargs) -> bool:
-        """
-        Check if a file exists on Azure File Share.
-
-        :param share_name: Name of the share.
-        :param directory_name: Name of the directory.
-        :param file_name: Name of the file.
-        :param kwargs: Optional keyword arguments that
-            `FileService.exists()` takes.
-        :return: True if the file exists, False otherwise.
-        """
-        return self.get_conn().exists(share_name, directory_name, file_name, 
**kwargs)
-
-    def list_directories_and_files(
-        self, share_name: str, directory_name: str | None = None, **kwargs
-    ) -> list:
-        """
-        Return the list of directories and files stored on a Azure File Share.
-
-        :param share_name: Name of the share.
-        :param directory_name: Name of the directory.
-        :param kwargs: Optional keyword arguments that
-            `FileService.list_directories_and_files()` takes.
-        :return: A list of files and directories
-        """
-        return self.get_conn().list_directories_and_files(share_name, 
directory_name, **kwargs)
+        self._connection_string = extras.get("connection_string")
+        if conn.login:
+            self._account_url = self._parse_account_url(conn.login)
+        self._sas_token = extras.get("sas_token")
+        self._account_access_key = conn.password
 
-    def list_files(self, share_name: str, directory_name: str | None = None, 
**kwargs) -> list[str]:
-        """
-        Return the list of files stored on a Azure File Share.
-
-        :param share_name: Name of the share.
-        :param directory_name: Name of the directory.
-        :param kwargs: Optional keyword arguments that
-            `FileService.list_directories_and_files()` takes.
-        :return: A list of files
-        """
-        return [
-            obj.name
-            for obj in self.list_directories_and_files(share_name, 
directory_name, **kwargs)
-            if isinstance(obj, File)
-        ]
+    @staticmethod
+    def _parse_account_url(account_url: str) -> str:
+        if not account_url.lower().startswith("https"):
+            return f"https://{account_url}.file.core.windows.net";
+        return account_url
+
+    @property
+    def share_service_client(self):
+        self.get_conn()
+        if self._connection_string:
+            return ShareServiceClient.from_connection_string(
+                conn_str=self._connection_string,
+            )
+        elif self._account_url and (self._sas_token or 
self._account_access_key):
+            credential = self._sas_token or self._account_access_key
+            return ShareServiceClient(account_url=self._account_url, 
credential=credential)
+        else:
+            return ShareServiceClient(
+                account_url=self._account_url, 
credential=DefaultAzureCredential(), token_intent="backup"
+            )
+
+    @property
+    def share_directory_client(self):
+        if self._connection_string:
+            return ShareDirectoryClient.from_connection_string(
+                conn_str=self._connection_string,
+                share_name=self.share_name,
+                directory_path=self.directory_path,
+            )
+        elif self._account_url and (self._sas_token or 
self._account_access_key):
+            credential = self._sas_token or self._account_access_key
+            return ShareDirectoryClient(
+                account_url=self._account_url,
+                share_name=self.share_name,
+                directory_path=self.directory_path,
+                credential=credential,
+            )
+        else:
+            return ShareDirectoryClient(
+                account_url=self._account_url,
+                share_name=self.share_name,
+                directory_path=self.directory_path,
+                credential=DefaultAzureCredential(),
+                token_intent="backup",
+            )
+
+    @property
+    def share_file_client(self):
+        if self._connection_string:
+            return ShareFileClient.from_connection_string(
+                conn_str=self._connection_string,
+                share_name=self.share_name,
+                file_path=self.file_path,
+            )
+        elif self._account_url and (self._sas_token or 
self._account_access_key):
+            credential = self._sas_token or self._account_access_key
+            return ShareFileClient(
+                account_url=self._account_url,
+                share_name=self.share_name,
+                file_path=self.file_path,
+                credential=credential,
+            )
+        else:
+            return ShareFileClient(
+                account_url=self._account_url,
+                share_name=self.share_name,
+                file_path=self.file_path,
+                credential=DefaultAzureCredential(),
+                token_intent="backup",
+            )
+
+    def check_for_directory(self) -> bool:
+        """Check if a directory exists on Azure File Share."""
+        return self.share_directory_client.exists()
+
+    def list_directories_and_files(self) -> list:
+        """Return the list of directories and files stored on a Azure File 
Share."""
+        return list(self.share_directory_client.list_directories_and_files())
+
+    def list_files(self) -> list[str]:
+        """Return the list of files stored on a Azure File Share."""
+        return [obj.name for obj in self.list_directories_and_files() if 
isinstance(obj, FileProperties)]
 
     def create_share(self, share_name: str, **kwargs) -> bool:
         """
         Create new Azure File Share.
 
         :param share_name: Name of the share.
-        :param kwargs: Optional keyword arguments that
-            `FileService.create_share()` takes.
         :return: True if share is created, False if share already exists.
         """
-        return self.get_conn().create_share(share_name, **kwargs)
+        try:
+            self.share_service_client.create_share(share_name, **kwargs)
+        except Exception as e:
+            self.log.warning(e)
+            return False
+        return True
 
     def delete_share(self, share_name: str, **kwargs) -> bool:
         """
         Delete existing Azure File Share.
 
         :param share_name: Name of the share.
-        :param kwargs: Optional keyword arguments that
-            `FileService.delete_share()` takes.
         :return: True if share is deleted, False if share does not exist.
         """
-        return self.get_conn().delete_share(share_name, **kwargs)
+        try:
+            self.share_service_client.delete_share(share_name, **kwargs)
+        except Exception as e:
+            self.log.warning(e)
+            return False
+        return True
 
-    def create_directory(self, share_name: str, directory_name: str, **kwargs) 
-> list:
-        """
-        Create a new directory on a Azure File Share.
+    def create_directory(self, **kwargs) -> Any:
+        """Create a new directory on a Azure File Share."""
+        return self.share_directory_client.create_directory(**kwargs)
 
-        :param share_name: Name of the share.
-        :param directory_name: Name of the directory.
-        :param kwargs: Optional keyword arguments that
-            `FileService.create_directory()` takes.
-        :return: A list of files and directories
-        """
-        return self.get_conn().create_directory(share_name, directory_name, 
**kwargs)
-
-    def get_file(
-        self, file_path: str, share_name: str, directory_name: str, file_name: 
str, **kwargs
-    ) -> None:
+    def get_file(self, file_path: str, **kwargs) -> None:
         """
         Download a file from Azure File Share.
 
         :param file_path: Where to store the file.
-        :param share_name: Name of the share.
-        :param directory_name: Name of the directory.
-        :param file_name: Name of the file.
-        :param kwargs: Optional keyword arguments that
-            `FileService.get_file_to_path()` takes.
         """
-        self.get_conn().get_file_to_path(share_name, directory_name, 
file_name, file_path, **kwargs)
+        with open(file_path, "wb") as file_handle:
+            data = self.share_file_client.download_file(**kwargs)
+            data.readinto(file_handle)
 
-    def get_file_to_stream(
-        self, stream: IO, share_name: str, directory_name: str, file_name: 
str, **kwargs
-    ) -> None:
+    def get_file_to_stream(self, stream: IO, **kwargs) -> None:
         """
         Download a file from Azure File Share.
 
         :param stream: A filehandle to store the file to.
-        :param share_name: Name of the share.
-        :param directory_name: Name of the directory.
-        :param file_name: Name of the file.
-        :param kwargs: Optional keyword arguments that
-            `FileService.get_file_to_stream()` takes.
         """
-        self.get_conn().get_file_to_stream(share_name, directory_name, 
file_name, stream, **kwargs)
+        data = self.share_file_client.download_file(**kwargs)
+        data.readinto(stream)
 
-    def load_file(
-        self, file_path: str, share_name: str, directory_name: str, file_name: 
str, **kwargs
-    ) -> None:
+    def load_file(self, file_path: str, **kwargs) -> None:
         """
         Upload a file to Azure File Share.
 
         :param file_path: Path to the file to load.
-        :param share_name: Name of the share.
-        :param directory_name: Name of the directory.
-        :param file_name: Name of the file.
-        :param kwargs: Optional keyword arguments that
-            `FileService.create_file_from_path()` takes.
         """
-        self.get_conn().create_file_from_path(share_name, directory_name, 
file_name, file_path, **kwargs)
+        with open(file_path, "rb") as source_file:
+            self.share_file_client.upload_file(source_file, **kwargs)
 
-    def load_string(
-        self, string_data: str, share_name: str, directory_name: str, 
file_name: str, **kwargs
-    ) -> None:
+    def load_data(self, string_data: bytes | str | IO, **kwargs) -> None:
         """
         Upload a string to Azure File Share.
 
-        :param string_data: String to load.
-        :param share_name: Name of the share.
-        :param directory_name: Name of the directory.
-        :param file_name: Name of the file.
-        :param kwargs: Optional keyword arguments that
-            `FileService.create_file_from_text()` takes.
-        """
-        self.get_conn().create_file_from_text(share_name, directory_name, 
file_name, string_data, **kwargs)
-
-    def load_stream(
-        self, stream: str, share_name: str, directory_name: str, file_name: 
str, count: str, **kwargs
-    ) -> None:
-        """
-        Upload a stream to Azure File Share.
-
-        :param stream: Opened file/stream to upload as the file content.
-        :param share_name: Name of the share.
-        :param directory_name: Name of the directory.
-        :param file_name: Name of the file.
-        :param count: Size of the stream in bytes
-        :param kwargs: Optional keyword arguments that
-            `FileService.create_file_from_stream()` takes.
+        :param string_data: String/Stream to load.
         """
-        self.get_conn().create_file_from_stream(
-            share_name, directory_name, file_name, stream, count, **kwargs
-        )
+        self.share_file_client.upload_file(string_data, **kwargs)
 
     def test_connection(self):
         """Test Azure FileShare connection."""
@@ -292,7 +253,7 @@ class AzureFileShareHook(BaseHook):
 
         try:
             # Attempt to retrieve file share information
-            next(iter(self.get_conn().list_shares()))
+            next(iter(self.share_service_client.list_shares()))
             return success
         except StopIteration:
             # If the iterator returned is empty it should still be considered 
a successful connection since
diff --git a/airflow/providers/microsoft/azure/provider.yaml 
b/airflow/providers/microsoft/azure/provider.yaml
index 40673fb1eb..75f90d7d57 100644
--- a/airflow/providers/microsoft/azure/provider.yaml
+++ b/airflow/providers/microsoft/azure/provider.yaml
@@ -21,6 +21,7 @@ description: |
   `Microsoft Azure <https://azure.microsoft.com/>`__
 suspended: false
 versions:
+  - 7.0.0
   - 6.3.0
   - 6.2.4
   - 6.2.3
@@ -74,8 +75,8 @@ dependencies:
   - azure-mgmt-resource>=2.2.0
   - azure-storage-blob>=12.14.0
   - azure-storage-common>=2.1.0
-  - azure-storage-file>=2.1.0
   - azure-mgmt-storage>=16.0.0
+  - azure-storage-file-share
   - azure-servicebus>=7.6.1
   - azure-synapse-spark
   - adal>=1.2.7
diff --git 
a/docs/apache-airflow-providers-microsoft-azure/connections/azure_fileshare.rst 
b/docs/apache-airflow-providers-microsoft-azure/connections/azure_fileshare.rst
index 39b827d4d1..6510978719 100644
--- 
a/docs/apache-airflow-providers-microsoft-azure/connections/azure_fileshare.rst
+++ 
b/docs/apache-airflow-providers-microsoft-azure/connections/azure_fileshare.rst
@@ -30,13 +30,13 @@ Authenticating to Azure File Share
 There are four ways to connect to Azure File Share using Airflow.
 
 1. Use `token credentials
-   
<https://docs.microsoft.com/en-us/azure/developer/python/azure-sdk-authenticate?tabs=cmd#authenticate-with-token-credentials>`_
+   
<https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/identity/azure-identity>`_
    i.e. add specific credentials (client_id, secret) and subscription id to 
the Airflow connection.
 2. Use a `SAS Token
-   
<https://docs.microsoft.com/en-us/rest/api/storageservices/create-account-sas>`_
+   
<https://learn.microsoft.com/en-gb/azure/storage/common/storage-sas-overview>`_
    i.e. add a key config to ``sas_token`` in the Airflow connection.
 3. Use a `Connection String
-   
<https://docs.microsoft.com/en-us/azure/data-explorer/kusto/api/connection-strings/storage>`_
+   
<https://learn.microsoft.com/en-us/azure/storage/common/storage-configure-connection-string>`_
    i.e. add connection string to ``connection_string`` in the Airflow 
connection.
 
 Only one authorization method can be used at a time. If you need to manage 
multiple credentials or keys then you should
@@ -66,7 +66,6 @@ Extra (optional)
 
     * ``connection_string``: Connection string for use with connection string 
authentication.
     * ``sas_token``: SAS Token for use with SAS Token authentication.
-    * ``protocol``: Specify the protocol to use (default is ``https``).
 
 When specifying the connection in environment variable you should specify
 it using URI syntax.
diff --git a/generated/provider_dependencies.json 
b/generated/provider_dependencies.json
index 014a738668..d130259abb 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -566,7 +566,7 @@
       "azure-storage-blob>=12.14.0",
       "azure-storage-common>=2.1.0",
       "azure-storage-file-datalake>=12.9.1",
-      "azure-storage-file>=2.1.0",
+      "azure-storage-file-share",
       "azure-synapse-spark"
     ],
     "cross-providers-deps": [
diff --git 
a/tests/providers/google/cloud/transfers/test_azure_fileshare_to_gcs.py 
b/tests/providers/google/cloud/transfers/test_azure_fileshare_to_gcs.py
index 09d2162868..0ef2f4a5b9 100644
--- a/tests/providers/google/cloud/transfers/test_azure_fileshare_to_gcs.py
+++ b/tests/providers/google/cloud/transfers/test_azure_fileshare_to_gcs.py
@@ -80,7 +80,7 @@ class TestAzureFileShareToGCSOperator:
             any_order=True,
         )
 
-        
azure_fileshare_mock_hook.assert_called_once_with(AZURE_FILESHARE_CONN_ID)
+        assert 
azure_fileshare_mock_hook.return_value.get_file_to_stream.call_count == 3
 
         gcs_mock_hook.assert_called_once_with(
             gcp_conn_id=GCS_CONN_ID,
diff --git a/tests/providers/microsoft/azure/hooks/test_azure_fileshare.py 
b/tests/providers/microsoft/azure/hooks/test_azure_fileshare.py
index a99ca9e90b..8456cd3602 100644
--- a/tests/providers/microsoft/azure/hooks/test_azure_fileshare.py
+++ b/tests/providers/microsoft/azure/hooks/test_azure_fileshare.py
@@ -15,23 +15,13 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""
-This module contains integration with Azure File Share.
-
-Cloud variant of a SMB file share. Make sure that a Airflow connection of
-type `wasb` exists. Authorization can be done by supplying a login (=Storage 
account name)
-and password (=Storage account key), or login and SAS token in the extra field
-(see connection `azure_fileshare_default` for an example).
-"""
 from __future__ import annotations
 
-import os
+import io
 from unittest import mock
-from unittest.mock import patch
 
 import pytest
-from azure.storage.file import Directory, File
-from pytest import param
+from azure.storage.fileshare import DirectoryProperties, FileProperties, 
ShareServiceClient
 
 from airflow.models import Connection
 from airflow.providers.microsoft.azure.hooks.fileshare import 
AzureFileShareHook
@@ -51,7 +41,7 @@ class TestAzureFileshareHook:
                 conn_id="azure_fileshare_extras",
                 conn_type="azure_fileshare",
                 login="login",
-                extra={"sas_token": "token", "protocol": "http"},
+                extra={"sas_token": "token"},
             ),
             # Neither password nor sas_token present
             Connection(
@@ -68,203 +58,119 @@ class TestAzureFileshareHook:
         )
 
     def test_key_and_connection(self):
-        from azure.storage.file import FileService
 
         hook = 
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_test_key")
-        assert hook.conn_id == "azure_fileshare_test_key"
-        assert hook._conn is None
-        print(hook.get_conn())
-        assert isinstance(hook.get_conn(), FileService)
+        assert hook._conn_id == "azure_fileshare_test_key"
+        share_client = hook.share_service_client
+        assert isinstance(share_client, ShareServiceClient)
 
     def test_sas_token(self):
-        from azure.storage.file import FileService
-
-        hook = 
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_extras")
-        assert hook.conn_id == "azure_fileshare_extras"
-        assert isinstance(hook.get_conn(), FileService)
-
-    def test_wrong_extras(self):
-        hook = 
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_extras_wrong")
-        assert hook.conn_id == "azure_fileshare_extras_wrong"
-        with pytest.raises(TypeError, match=".*wrong_key.*"):
-            hook.get_conn()
 
-    def test_missing_credentials(self):
-        hook = 
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_missing_credentials")
-        assert hook.conn_id == "azure_fileshare_missing_credentials"
-        with pytest.raises(ValueError, match=".*account_key or sas_token.*"):
-            hook.get_conn()
-
-    
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService", 
autospec=True)
-    def test_check_for_file(self, mock_service):
-        mock_instance = mock_service.return_value
-        mock_instance.exists.return_value = True
         hook = 
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_extras")
-        assert hook.check_for_file("share", "directory", "file", timeout=3)
-        mock_instance.exists.assert_called_once_with("share", "directory", 
"file", timeout=3)
+        assert hook._conn_id == "azure_fileshare_extras"
+        share_client = hook.share_service_client
+        assert isinstance(share_client, ShareServiceClient)
 
-    
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService", 
autospec=True)
+    
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.ShareDirectoryClient",
 autospec=True)
     def test_check_for_directory(self, mock_service):
         mock_instance = mock_service.return_value
         mock_instance.exists.return_value = True
-        hook = 
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_extras")
-        assert hook.check_for_directory("share", "directory", timeout=3)
-        mock_instance.exists.assert_called_once_with("share", "directory", 
timeout=3)
-
-    
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService", 
autospec=True)
-    def test_load_file(self, mock_service):
-        mock_instance = mock_service.return_value
-        hook = 
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_extras")
-        hook.load_file("path", "share", "directory", "file", max_connections=1)
-        mock_instance.create_file_from_path.assert_called_once_with(
-            "share", "directory", "file", "path", max_connections=1
-        )
-
-    
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService", 
autospec=True)
-    def test_load_string(self, mock_service):
-        mock_instance = mock_service.return_value
-        hook = 
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_extras")
-        hook.load_string("big string", "share", "directory", "file", timeout=1)
-        mock_instance.create_file_from_text.assert_called_once_with(
-            "share", "directory", "file", "big string", timeout=1
+        hook = AzureFileShareHook(
+            azure_fileshare_conn_id="azure_fileshare_extras", 
share_name="share", directory_path="directory"
         )
+        assert hook.check_for_directory()
+        mock_instance.exists.assert_called_once_with()
 
-    
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService", 
autospec=True)
-    def test_load_stream(self, mock_service):
+    
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.ShareFileClient",
 autospec=True)
+    def test_load_data(self, mock_service):
         mock_instance = mock_service.return_value
-        hook = 
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_extras")
-        hook.load_stream("stream", "share", "directory", "file", 42, timeout=1)
-        mock_instance.create_file_from_stream.assert_called_once_with(
-            "share", "directory", "file", "stream", 42, timeout=1
+        hook = AzureFileShareHook(
+            azure_fileshare_conn_id="azure_fileshare_extras", 
share_name="share", file_path="file"
         )
+        hook.load_data("big string")
+        mock_instance.upload_file.assert_called_once_with("big string")
 
-    
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService", 
autospec=True)
+    
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.ShareDirectoryClient",
 autospec=True)
     def test_list_directories_and_files(self, mock_service):
         mock_instance = mock_service.return_value
-        hook = 
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_extras")
-        hook.list_directories_and_files("share", "directory", timeout=1)
-        
mock_instance.list_directories_and_files.assert_called_once_with("share", 
"directory", timeout=1)
+        hook = AzureFileShareHook(
+            azure_fileshare_conn_id="azure_fileshare_extras", 
share_name="share", directory_path="directory"
+        )
+        hook.list_directories_and_files()
+        mock_instance.list_directories_and_files.assert_called_once_with()
 
-    
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService", 
autospec=True)
+    
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.ShareDirectoryClient",
 autospec=True)
     def test_list_files(self, mock_service):
         mock_instance = mock_service.return_value
         mock_instance.list_directories_and_files.return_value = [
-            File("file1"),
-            File("file2"),
-            Directory("dir1"),
-            Directory("dir2"),
+            FileProperties(name="file1"),
+            FileProperties(name="file2"),
+            DirectoryProperties(name="dir1"),
+            DirectoryProperties(name="dir2"),
         ]
-        hook = 
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_extras")
-        files = hook.list_files("share", "directory", timeout=1)
+        hook = AzureFileShareHook(
+            azure_fileshare_conn_id="azure_fileshare_extras", 
share_name="share", directory_path="directory"
+        )
+        files = hook.list_files()
         assert files == ["file1", "file2"]
-        
mock_instance.list_directories_and_files.assert_called_once_with("share", 
"directory", timeout=1)
+        mock_instance.list_directories_and_files.assert_called_once_with()
 
-    
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService", 
autospec=True)
+    
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.ShareDirectoryClient",
 autospec=True)
     def test_create_directory(self, mock_service):
         mock_instance = mock_service.return_value
-        hook = 
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_extras")
-        hook.create_directory("share", "directory", timeout=1)
-        mock_instance.create_directory.assert_called_once_with("share", 
"directory", timeout=1)
+        hook = AzureFileShareHook(
+            azure_fileshare_conn_id="azure_fileshare_extras", 
share_name="share", directory_path="directory"
+        )
+        hook.create_directory()
+        mock_instance.create_directory.assert_called_once_with()
 
-    
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService", 
autospec=True)
+    
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.ShareFileClient",
 autospec=True)
     def test_get_file(self, mock_service):
         mock_instance = mock_service.return_value
-        hook = 
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_extras")
-        hook.get_file("path", "share", "directory", "file", max_connections=1)
-        mock_instance.get_file_to_path.assert_called_once_with(
-            "share", "directory", "file", "path", max_connections=1
+        hook = AzureFileShareHook(
+            azure_fileshare_conn_id="azure_fileshare_extras", 
share_name="share", file_path="file"
         )
+        hook.get_file("path")
+        mock_instance.download_file.assert_called_once_with()
 
-    
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService", 
autospec=True)
+    
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.ShareFileClient",
 autospec=True)
     def test_get_file_to_stream(self, mock_service):
         mock_instance = mock_service.return_value
-        hook = 
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_extras")
-        hook.get_file_to_stream("stream", "share", "directory", "file", 
max_connections=1)
-        mock_instance.get_file_to_stream.assert_called_once_with(
-            "share", "directory", "file", "stream", max_connections=1
+        hook = AzureFileShareHook(
+            azure_fileshare_conn_id="azure_fileshare_extras", 
share_name="share", file_path="file"
         )
+        data = io.StringIO("stream")
+        hook.get_file_to_stream(stream=data)
+        mock_instance.download_file.assert_called_once_with()
 
-    
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService", 
autospec=True)
+    
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.ShareServiceClient",
 autospec=True)
     def test_create_share(self, mock_service):
         mock_instance = mock_service.return_value
         hook = 
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_extras")
-        hook.create_share("my_share")
+        hook.create_share(share_name="my_share")
         mock_instance.create_share.assert_called_once_with("my_share")
 
-    
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService", 
autospec=True)
+    
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.ShareServiceClient",
 autospec=True)
     def test_delete_share(self, mock_service):
         mock_instance = mock_service.return_value
         hook = 
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_extras")
         hook.delete_share("my_share")
         mock_instance.delete_share.assert_called_once_with("my_share")
 
-    
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService", 
autospec=True)
+    
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.ShareServiceClient",
 autospec=True)
     def test_connection_success(self, mock_service):
+        mock_instance = mock_service.return_value
         hook = 
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_extras")
-        hook.get_conn().list_shares.return_value = ["test_container"]
+        mock_instance.list_shares.return_value = ["test_container"]
         status, msg = hook.test_connection()
         assert status is True
         assert msg == "Successfully connected to Azure File Share."
 
-    
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService", 
autospec=True)
+    
@mock.patch("airflow.providers.microsoft.azure.hooks.fileshare.ShareServiceClient",
 autospec=True)
     def test_connection_failure(self, mock_service):
+        mock_instance = mock_service.return_value
         hook = 
AzureFileShareHook(azure_fileshare_conn_id="azure_fileshare_extras")
-        hook.get_conn().list_shares.side_effect = Exception("Test Connection 
Failure")
+        mock_instance.list_shares.side_effect = Exception("Test Connection 
Failure")
         status, msg = hook.test_connection()
         assert status is False
         assert msg == "Test Connection Failure"
-
-    @pytest.mark.parametrize(
-        "uri",
-        [
-            param(
-                
"a://?extra__azure_fileshare__anything=abc&extra__azure_fileshare__other_thing=abc",
-                id="prefix",
-            ),
-            param("a://?anything=abc&other_thing=abc", id="no-prefix"),
-        ],
-    )
-    @patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService")
-    def test_backcompat_prefix_works(self, mock_service, uri):
-        with patch.dict(os.environ, AIRFLOW_CONN_MY_CONN=uri):
-            hook = AzureFileShareHook("my_conn")
-            hook.get_conn()
-            mock_service.assert_called_with(
-                account_key=None, account_name=None, anything="abc", 
other_thing="abc"
-            )
-
-    @patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService")
-    def test_backcompat_prefix_both_causes_warning(self, mock_service):
-        with patch.dict(
-            os.environ,
-            
AIRFLOW_CONN_MY_CONN="a://?anything=non-prefixed&extra__azure_fileshare__anything=prefixed",
-        ):
-            hook = AzureFileShareHook("my_conn")
-            with pytest.warns(Warning, match="Using value for `anything`"):
-                hook.get_conn()
-            mock_service.assert_called_with(account_key=None, 
account_name=None, anything="non-prefixed")
-
-    @patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService")
-    def test_empty_string_ignored(self, mock_service):
-        with patch.dict(
-            os.environ,
-            AIRFLOW_CONN_MY_CONN='{"extra": {"anything": "", "other_thing": 
"a"}}',
-        ):
-            hook = AzureFileShareHook("my_conn")
-            hook.get_conn()
-            mock_service.assert_called_with(account_key=None, 
account_name=None, other_thing="a")
-
-    @patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService")
-    def test_extra_params_forwarded_to_service_options(self, fs_mock):
-        with patch.dict(os.environ, 
AIRFLOW_CONN_MY_CONN="a://login@?a=1&b=2&c=3"):
-            hook = AzureFileShareHook("my_conn")
-            hook.get_conn()
-            fs_mock.assert_called_with(account_key=None, account_name="login", 
a="1", b="2", c="3")
-
-    @patch("airflow.providers.microsoft.azure.hooks.fileshare.FileService")
-    def test_unrecognized_extra_warns(self, fs_mock):
-        with patch.dict(os.environ, 
AIRFLOW_CONN_MY_CONN="a://login:password@?extra__wasb__hello=hi&foo=bar"):
-            hook = AzureFileShareHook("my_conn")
-            with pytest.warns(Warning, match="Extra param `extra__wasb__hello` 
not recognized; ignoring."):
-                hook.get_conn()
-            fs_mock.assert_called_with(account_key="password", 
account_name="login", foo="bar")
diff --git a/tests/test_utils/azure_system_helpers.py 
b/tests/test_utils/azure_system_helpers.py
index 85c700537a..033399235a 100644
--- a/tests/test_utils/azure_system_helpers.py
+++ b/tests/test_utils/azure_system_helpers.py
@@ -113,12 +113,14 @@ class AzureSystemTest(SystemTest):
     @classmethod
     def delete_share(cls, share_name: str, azure_fileshare_conn_id: str):
         hook = 
AzureFileShareHook(azure_fileshare_conn_id=azure_fileshare_conn_id)
-        hook.delete_share(share_name)
+        hook.delete_share(share_name=share_name)
 
     @classmethod
     def create_directory(cls, share_name: str, azure_fileshare_conn_id: str, 
directory: str):
-        hook = 
AzureFileShareHook(azure_fileshare_conn_id=azure_fileshare_conn_id)
-        hook.create_directory(share_name=share_name, directory_name=directory)
+        hook = AzureFileShareHook(
+            azure_fileshare_conn_id=azure_fileshare_conn_id, 
share_name=share_name, directory_path=directory
+        )
+        hook.create_directory()
 
     @classmethod
     def upload_file_from_string(
@@ -127,30 +129,25 @@ class AzureSystemTest(SystemTest):
         share_name: str,
         azure_fileshare_conn_id: str,
         file_name: str,
-        directory: str,
     ):
-        hook = 
AzureFileShareHook(azure_fileshare_conn_id=azure_fileshare_conn_id)
-        hook.load_string(
-            string_data=string_data,
-            share_name=share_name,
-            directory_name=directory,
-            file_name=file_name,
+        hook = AzureFileShareHook(
+            azure_fileshare_conn_id=azure_fileshare_conn_id, 
share_name=share_name, file_path=file_name
         )
+        hook.load_data(string_data=string_data)
 
     @classmethod
     def prepare_share(cls, share_name: str, azure_fileshare_conn_id: str, 
file_name: str, directory: str):
         """
         Create share with a file in given directory. If directory is None, 
file is in root dir.
         """
-        cls.create_share(share_name=share_name, 
azure_fileshare_conn_id=azure_fileshare_conn_id)
-        cls.create_directory(
-            share_name=share_name, 
azure_fileshare_conn_id=azure_fileshare_conn_id, directory=directory
-        )
-        string_data = "".join(random.choices(string.ascii_letters, k=1024))
-        cls.upload_file_from_string(
-            string_data=string_data,
-            share_name=share_name,
+        hook = AzureFileShareHook(
             azure_fileshare_conn_id=azure_fileshare_conn_id,
-            file_name=file_name,
-            directory=directory,
+            share_name=share_name,
+            directory_path=directory,
+            file_path=file_name,
         )
+        hook.create_share(share_name)
+        hook.create_directory()
+
+        string_data = "".join(random.choices(string.ascii_letters, k=1024))
+        hook.load_data(string_data)


Reply via email to