ldacey edited a comment on issue #11968:
URL: https://github.com/apache/airflow/issues/11968#issuecomment-723535347


   I installed Airflow 1.10.12 and upgraded the Azure Blob SDK to v12 and was 
able to begin a rough wasb_hook. I still need to add some more methods to 
upload data, copy blobs, delete blobs, etc. I also need to add the async stuff 
as well since that is one of the major benefits of v12. Do you think I should 
just add flags (async=True) with if statements in my current methods where 
applicable in terms of best practices?
   
   Edit - I added a few more methods and kept the naming convention the same as 
the original WasbHook. I think I can technically edit the 
`airflow/utils/log/wasb_task_handler.py` file to import this custom hook, and 
edit the` airflow_local_settings.py` wasb definition to point towards my custom 
wasb_task_handler.
   
   ```
   import logging
   from pathlib import Path
   
   import azure.common
   import fsspec
   import pendulum
   import requests
   from airflow.exceptions import AirflowException
   from airflow.hooks.base_hook import BaseHook
   from azure.core.exceptions import ResourceNotFoundError
   from azure.storage.blob import BlobServiceClient
   
   logger = logging.getLogger("azure.storage")
   logger.setLevel(logging.ERROR)
   
   
   class AzureBlobHook(BaseHook):
       """
       Interacts with Azure Blob Storage through the wasb:// protocol.
       Additional options passed in the 'extra' field of the connection will be
       passed to the `BlobServiceClient()` constructor. Authenticate through 
the connection password
       using the account key or a SAS token.
       :param wasb_conn_id: Reference to the wasb connection.
       :type wasb_conn_id: str
       """
   
       def __init__(self, wasb_conn_id="wasb_default", request_session=None, 
session_pool_size=10):
           self.conn_id = wasb_conn_id
           self.request_session = request_session
           self.session_pool_size = session_pool_size
           self.conn_info = self.get_connection(self.conn_id)
           self.account_name = self.conn_info.login
           self.url = f"https://{self.account_name}.blob.core.windows.net";
           self.connection = self.get_conn()
   
       def get_conn(self):
           """Return the BlobServiceClient object."""
           service_options = self.conn_info.extra_dejson
           if self.request_session is not None:
               session = requests.Session()
               adapter = requests.adapters.HTTPAdapter(
                   pool_connections=self.session_pool_size, 
pool_maxsize=self.session_pool_size
               )
               session.mount("https://";, adapter)
           else:
               session = None
           return BlobServiceClient(
               account_url=self.url,
               credential=self.conn_info.password,
               request_session=session,
               logger=logger,
               **service_options,
           )
   
       def container_client(self, container_name):
           """
           Get a client to interact with the specified container.
           :param container_name: Name of the container
           :type container_name: str
           """
           return self.connection.get_container_client(container=container_name)
   
       def blob_client(self, container_name, blob_name):
           """
           Get a client to interact with the specified container.
           :param container_name: Name of the container
           :type container_name: str
           :param blob_name: Name of the blob
           :type blob_name: str
           """
           return 
self.container_client(container_name).get_blob_client(blob=blob_name)
       
       def list_containers(self, prefix, include_metadata=True, **kwargs):
           """
           Returns a generator to list the containers under the specified 
account.
           :param prefix: Prefix of the container name.
           :type prefix: str
           :param include_metadata: Whether to include container metadata
           :type include_metadata: bool
           :param kwargs: Optional keyword arguments that
               `BlobServiceClient.list_containers()` takes.
           :type kwargs: object
           """
           return self.connection.list_containers(
               name_starts_with=prefix, include_metadata=include_metadata, 
**kwargs
           )
   
       def container_exists(self, container_name, **kwargs):
           """
           Check if a container exists on Azure Blob Storage.
           :param container_name: Name of the container.
           :type container_name: str
           :param kwargs: Optional keyword arguments that
               `ContainerClient.get_container_properties()` takes.
           :type kwargs: object
           """
           try:
               
self.container_client(container_name).get_container_properties(**kwargs)
           except ResourceNotFoundError:
               return False
           return True
       
       def create_container(self, container_name, metadata=None, **kwargs):
           """
           Creates a new container under the specified account.
           :param container_name: Name of the container
           :type container_name: str
           :param metadata: A dict with name-value pairs to associate with the 
container as metadata.
           :type metadata: dict
           :param kwargs: Optional keyword arguments that
               `BlobServiceClient.create_container()` takes.
           :type kwargs: object
           """
           if not self.container_exists(container_name):
               self.log.info(f"Creating {container_name}")
               return self.connection.create_container(
                   name=container_name, metadata=metadata, **kwargs
               )
           
       def delete_container(self, container_name, **kwargs):
           """
           The container and any blobs contained within it are later deleted 
during garbage collection.
           If the container is not found, a ResourceNotFoundError will be 
raised.
           :param container_name: Name of the container
           :type container_name: str
           :param kwargs: Optional keyword arguments that
               `BlobServiceClient.delete_container()` takes.
           :type kwargs: object
           """
           if self.container_exists(container_name):
               self.log.info(f"Deleting {container_name}")
               return self.connection.delete_container(container_name, **kwargs)
   
       def get_bytes(self, container_name, blob_name, start=None, end=None, 
**kwargs):
           """
           Downloads a blob to the StorageStreamDownloader.
           The readall() method must be used to read all the content or 
readinto() must be used to download the blob into a stream.
           :param container_name: Name of the container
           :type container_name: str
           :param blob_name: Name of the blob
           :type blob_name: str
           :param start: Start byte position to download blob from
           :type start: int
           :param end: End byte position to download blob from
           :type end: int
           :param kwargs: Optional keyword arguments that
               `ContainerClient.download_blob()` takes.
           :type kwargs: object
           """
           return self.container_client(container_name).download_blob(
               blob=blob_name, offset=start, length=end, **kwargs
           )
   
       def get_file(self, container_name, file_path, blob_name, **kwargs):
           """
           Download a file from Azure Blob Storage to the local filesystem.
           :param container_name: Name of the container
           :type container_name: str
           :param blob_name: Name of the blob
           :type blob_name: str
           :param file_path: Destination file path
           :type file_path: str or Path
           :param start: Start byte position to download blob from
           :type start: int
           :param end: End byte position to download blob from
           :type end: int
           :param kwargs: Optional keyword arguments that
               `ContainerClient.download_blob()` takes.
           :type kwargs: object
           """
           file_path = Path(file_path)
           self.log.info(f"Downloading {container_name}/{blob_name} to 
{str(file_path)}")
           file_path.parent.mkdir(parents=True, exist_ok=True)
           with open(file_path, "wb") as fp:
               blob = 
self.container_client(container_name).download_blob(blob=blob_name, **kwargs)
               fp.write(blob.readall())
   
       def load_file(self, container_name, file_path, blob_name, metadata=None, 
**kwargs):
           """
           Upload a single file to a path inside the container
           :param container_name: Name of the container
           :type container_name: str
           :param blob_name: Name of the blob
           :type blob_name: str
           :param file_path: Source file path
           :type file_path: str or Path
           :param metadata: Blob metadata
           :type metadata: dict
           :param kwargs: Optional keyword arguments that
               `ContainerClient.upload_blob()` takes.
           :type kwargs: object
           """
           file_path = Path(file_path)
           self.log.info(f"Uploading {str(file_path)} to 
{container_name}/{blob_name}")
           with open(file_path, "rb") as data:
               self.container_client(container_name).upload_blob(
                   name=blob_name, data=data, metadata=metadata, **kwargs
               )
   
       def load_string(self, container_name, string_data, blob_name, 
metadata=None, **kwargs):
           """
           Upload a single file to a path inside the container
           :param container_name: Name of the container
           :type container_name: str
           :param blob_name: Name of the blob
           :type blob_name: str
           :param string_data: String to load
           :type string_data: str
           :param metadata: Blob metadata
           :type metadata: dict
           :param kwargs: Optional keyword arguments that
               `ContainerClient.upload_blob()` takes.
           :type kwargs: object
           """
           self.log.info(f"Uploading data to {container_name}/{blob_name}")
           self.container_client(container_name).upload_blob(
               name=blob_name, data=string_data, metadata=metadata, **kwargs
           )
   
       def load_bytes(self, container_name, buf, blob_name, metadata=None, 
**kwargs):
           """
           Upload a single file to a path inside the container
           :param container_name: Name of the container
           :type container_name: str
           :param blob_name: Name of the blob
           :type blob_name: str
           :param buf: Buffer output stream
           :type buf: byes
           :param metadata: Blob metadata
           :type metadata: dict
           :param kwargs: Optional keyword arguments that
               `ContainerClient.upload_blob()` takes.
           :type kwargs: object
           """
           self.log.info(f"Uploading data to {container_name}/{blob_name}")
           self.container_client(container_name).upload_blob(
               name=blob_name, data=buf, metadata=metadata, **kwargs
           )
   
       def list_blobs(self, container_name, prefix="", include=None, **kwargs):
           """
           Returns a generator to list the blobs under the specified container.
           The generator will lazily follow the continuation tokens returned by 
the service.
           :param container_name: Name of the container
           :type container_name: str
           :param prefix: Blob name prefix
           :type prefix: str
           :param include: Options include: 'snapshots', 'metadata', 
'uncommittedblobs', 'copy', 'deleted', 'tags'.
           :type include: str
           :param kwargs: Optional keyword arguments that
               `ContainerClient.list_blobs()` takes.
           :type kwargs: object
           """
           return self.container_client(container_name).list_blobs(
               name_starts_with=prefix, include=include, **kwargs
           )
   
       def delete_blob(self, container_name, blob_name, is_prefix=False, 
ignore_if_missing=False, **kwargs):
           """
           Delete a file from Azure Blob Storage.
           :param container_name: Name of the container.
           :type container_name: str
           :param blob_name: Name of the blob.
           :type blob_name: str
           :param is_prefix: If blob_name is a prefix, delete all matching files
           :type is_prefix: bool
           :param ignore_if_missing: if True, then return success even if the
               blob does not exist.
           :type ignore_if_missing: bool
           :param kwargs: Optional keyword arguments that
               `BlockBlobService.create_blob_from_path()` takes.
           :type kwargs: object
           """
           if is_prefix:
               blobs_to_delete = [
                   blob.name for blob in self.list_blobs(container_name, 
prefix=blob_name, **kwargs)
               ]
           elif self.check_for_blob(container_name, blob_name):
               blobs_to_delete = [blob_name]
           else:
               blobs_to_delete = []
           if not ignore_if_missing and len(blobs_to_delete) == 0:
               raise AirflowException(f"Blob(s) not found: {blob_name}")
           for blob in blobs_to_delete:
               self.log.info(f"Deleting {blob}")
               self.blob_client(container_name, 
blob_name=blob).delete_blob(delete_snapshots=False, **kwargs)
   
       def get_metadata(self, container_name, blob_name, **kwargs):
           """Returns all user-defined metadata, standard HTTP properties, and 
system properties for the blob.
           It does not return the content of the blob.
           :param blob_name: Name of the blob
           :type blob_name: str
           :param kwargs: Optional keyword arguments that
               `BlobClient.get_blob_properties()` takes.
           :type kwargs: object
           """
           return self.blob_client(container_name, 
blob_name).get_blob_properties(**kwargs)
   
       def set_metadata(self, container_name, blob_name, metadata=None, 
**kwargs):
           """
           Sets user-defined metadata for the blob as one or more name-value 
pairs.
           :param container_name: Name of the container.
           :type container_name: str
           :param blob_name: Blob name
           :type blob_name: str
           :param metadata: Blob metadata
           :type metadata: dict
           :param kwargs: Optional keyword arguments that
               `BlobClient.set_blob_metadata()` takes.
           :type kwargs: object
           """
           return self.blob_client(container_name, 
blob_name).set_blob_metadata(metadata=metadata, **kwargs)
   
       def check_for_blob(self, container_name, blob_name, **kwargs):
           """
           Check if a blob exists on Azure Blob Storage.
           :param container_name: Name of the container.
           :type container_name: str
           :param blob_name: Name of the blob.
           :type blob_name: str
           :param kwargs: Optional keyword arguments that
               `BlobClient.get_blob_properties()` takes.
           :type kwargs: object
           """
           try:
               self.blob_client(container_name, 
blob_name).get_blob_properties(**kwargs)
           except ResourceNotFoundError:
               return False
           return True
   
       def list_modified_blobs(self, container_name, prefix="", include=None, 
modified_since_seconds=None):
           """
           Returns a list of blobs modified since a specified period of time.
           :param container_name: Name of the container
           :type container_name: str
           :param prefix: Blob name prefix
           :type prefix: str
           :param include: Options include: 'snapshots', 'metadata', 
'uncommittedblobs', 'copy', 'deleted', 'tags'.
           :type include: str
   
   
           :param kwargs: Optional keyword arguments that
               `ContainerClient.list_blobs()` takes.
           :type kwargs: object
           """
           blobs = self.list_blobs(container_name, prefix, include)
           files = []
           for blob in blobs:
               properties = self.get_metadata(container_name, 
blob_name=blob.name)
               if (
                   pendulum.now().int_timestamp - 
pendulum.parse(str(properties.last_modified)).int_timestamp
                   < modified_since_seconds
               ):
                   files.append(blob)
           return files
   
       def create_filesystem(self):
           """Instantiate filesystems for given protocol and arguments"""
           return fsspec.filesystem(
               protocol="abfs", account_name=self.conn_info.login, 
account_key=self.conn_info.password
           )
   
       def copy_blob(self, container_name, blob_name, source_container_name, 
source_blob_name, **kwargs):
           """
           Copies a blob to a new location
           :param container_name: Name of the container.
           :type container_name: str
           :param blob_name: Blob name
           :type blob_name: str
           :param source_container_name: Name of the source container
           :type source_container_name: str
           :param source_blob_name: Name of the source blob
           :type source_blob_name: str
           :param incremental_copy: Copies the snapshot of the source page blob 
to a destination page blob.
                                   The snapshot is copied such that only the 
differential changes between the
                                   previously copied snapshot are transferred 
to the destination.
           :type incremental_copy: bool
           :param kwargs: Optional keyword arguments that
               `BlobClient.start_copy_from_url()` takes.
           :type kwargs: object
           """
   
           source_url = f"{self.url}/{source_container_name}/{source_blob_name}"
           return self.blob_client(container_name, 
blob_name).start_copy_from_url(source_url, **kwargs)
   
       def copy_blob_list(
           self,
           source_container,
           destination_container,
           source_blobs,
           destination_path,
           delete_first=False,
       ):
           """
           Copy all blobs in a path to another path or container.
           :param source_container: Source container
           :type container_name: str
           :param destination_container: Source container
           :type destination_container: str
           :param source_blobs: List of blobs to copy
           :type source_blobs: list
           :param destination_path: Blob path (not including the filename)
           :type destination_path: str
           :param delete_first: Default false. If true, the contents of the 
destination path will be deleted before the copy
           :type delete_first: bool
           :return: None
           """
           if not destination_path.endswith("/"):
               destination_path += "/"
           if delete_first:
               self.log.info(f"Deleting existing blobs in destination path: 
{destination_path}!")
               try:
                   self.delete_blob(
                       container_name=destination_container,
                       blob_name=destination_path,
                       is_prefix=True,
                       ignore_if_missing=True,
                   )
               except azure.common.AzureException as e:
                   self.log.info(e)
           count = 0
           for blob in source_blobs:
               print(f"{destination_path}{blob.name}")
               try:
                   self.copy_blob(
                       container_name=destination_container,
                       source_container_name=source_container,
                       source_blob_name=blob.name,
                       blob_name=f"{destination_path}{blob.name}",
                   )
                   count += 1
               except azure.common.AzureException as e:
                   self.log.warning(e)
           self.log.info(f"Successfully copied {count} out of 
{len(source_blobs)} files to {destination_path}!")
   
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to