ldacey edited a comment on issue #11968:
URL: https://github.com/apache/airflow/issues/11968#issuecomment-723535347
I installed Airflow 1.10.12 and upgraded the Azure Blob SDK to v12 and was
able to begin a rough wasb_hook. I still need to add some more methods to
upload data, copy blobs, delete blobs, etc. I also need to add the async stuff
as well since that is one of the major benefits of v12. Do you think I should
just add flags (async=True) with if statements in my current methods where
applicable in terms of best practices?
```
import logging
from pathlib import Path
import azure.common
import fsspec
import pendulum
import requests
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
from azure.core.exceptions import ResourceNotFoundError
from azure.storage.blob import BlobServiceClient
logger = logging.getLogger("azure.storage")
logger.setLevel(logging.ERROR)
class AzureBlobHook(BaseHook):
"""
Interacts with Azure Blob Storage through the wasb:// protocol.
Additional options passed in the 'extra' field of the connection will be
passed to the `BlobServiceClient()` constructor. Authenticate through
the connection password
using the account key or a SAS token.
:param wasb_conn_id: Reference to the wasb connection.
:type wasb_conn_id: str
"""
def __init__(self, wasb_conn_id="wasb_default", request_session=None,
session_pool_size=10):
self.conn_id = wasb_conn_id
self.request_session = request_session
self.session_pool_size = session_pool_size
self.conn_info = self.get_connection(self.conn_id)
self.account_name = self.conn_info.login
self.url = f"https://{self.account_name}.blob.core.windows.net"
self.connection = self.get_conn()
def get_conn(self):
"""Return the BlobServiceClient object."""
service_options = self.conn_info.extra_dejson
if self.request_session is not None:
session = requests.Session()
adapter = requests.adapters.HTTPAdapter(
pool_connections=self.session_pool_size,
pool_maxsize=self.session_pool_size
)
session.mount("https://", adapter)
else:
session = None
return BlobServiceClient(
account_url=self.url,
credential=self.conn_info.password,
request_session=session,
logger=logger,
**service_options,
)
def container_client(self, container_name):
"""
Get a client to interact with the specified container.
:param container_name: Name of the container
:type container_name: str
"""
return self.connection.get_container_client(container=container_name)
def blob_client(self, container_name, blob_name):
"""
Get a client to interact with the specified container.
:param container_name: Name of the container
:type container_name: str
:param blob_name: Name of the blob
:type blob_name: str
"""
return
self.container_client(container_name).get_blob_client(blob=blob_name)
def list_containers(self, prefix, include_metadata=True, **kwargs):
"""
Returns a generator to list the containers under the specified
account.
:param prefix: Prefix of the container name.
:type prefix: str
:param include_metadata: Whether to include container metadata
:type include_metadata: bool
:param kwargs: Optional keyword arguments that
`BlobServiceClient.list_containers()` takes.
:type kwargs: object
"""
return self.connection.list_containers(
name_starts_with=prefix, include_metadata=include_metadata,
**kwargs
)
def container_exists(self, container_name, **kwargs):
"""
Check if a container exists on Azure Blob Storage.
:param container_name: Name of the container.
:type container_name: str
:param kwargs: Optional keyword arguments that
`ContainerClient.get_container_properties()` takes.
:type kwargs: object
"""
try:
self.container_client(container_name).get_container_properties(**kwargs)
except ResourceNotFoundError:
return False
return True
def create_container(self, container_name, metadata=None, **kwargs):
"""
Creates a new container under the specified account.
:param container_name: Name of the container
:type container_name: str
:param metadata: A dict with name-value pairs to associate with the
container as metadata.
:type metadata: dict
:param kwargs: Optional keyword arguments that
`BlobServiceClient.create_container()` takes.
:type kwargs: object
"""
if not self.container_exists(container_name):
self.log.info(f"Creating {container_name}")
return self.connection.create_container(
name=container_name, metadata=metadata, **kwargs
)
def delete_container(self, container_name, **kwargs):
"""
The container and any blobs contained within it are later deleted
during garbage collection.
If the container is not found, a ResourceNotFoundError will be
raised.
:param container_name: Name of the container
:type container_name: str
:param kwargs: Optional keyword arguments that
`BlobServiceClient.delete_container()` takes.
:type kwargs: object
"""
if self.container_exists(container_name):
self.log.info(f"Deleting {container_name}")
return self.connection.delete_container(container_name, **kwargs)
def get_bytes(self, container_name, blob_name, start=None, end=None,
**kwargs):
"""
Downloads a blob to the StorageStreamDownloader.
The readall() method must be used to read all the content or
readinto() must be used to download the blob into a stream.
:param container_name: Name of the container
:type container_name: str
:param blob_name: Name of the blob
:type blob_name: str
:param start: Start byte position to download blob from
:type start: int
:param end: End byte position to download blob from
:type end: int
:param kwargs: Optional keyword arguments that
`ContainerClient.download_blob()` takes.
:type kwargs: object
"""
return self.container_client(container_name).download_blob(
blob=blob_name, offset=start, length=end, **kwargs
)
def get_file(self, container_name, file_path, blob_name, **kwargs):
"""
Download a file from Azure Blob Storage to the local filesystem.
:param container_name: Name of the container
:type container_name: str
:param blob_name: Name of the blob
:type blob_name: str
:param file_path: Destination file path
:type file_path: str or Path
:param start: Start byte position to download blob from
:type start: int
:param end: End byte position to download blob from
:type end: int
:param kwargs: Optional keyword arguments that
`ContainerClient.download_blob()` takes.
:type kwargs: object
"""
file_path = Path(file_path)
self.log.info(f"Downloading {container_name}/{blob_name} to
{str(file_path)}")
file_path.parent.mkdir(parents=True, exist_ok=True)
with open(file_path, "wb") as fp:
blob =
self.container_client(container_name).download_blob(blob=blob_name, **kwargs)
fp.write(blob.readall())
def load_file(self, container_name, file_path, blob_name, metadata=None,
**kwargs):
"""
Upload a single file to a path inside the container
:param container_name: Name of the container
:type container_name: str
:param blob_name: Name of the blob
:type blob_name: str
:param file_path: Source file path
:type file_path: str or Path
:param metadata: Blob metadata
:type metadata: dict
:param kwargs: Optional keyword arguments that
`ContainerClient.upload_blob()` takes.
:type kwargs: object
"""
file_path = Path(file_path)
self.log.info(f"Uploading {str(file_path)} to
{container_name}/{blob_name}")
with open(file_path, "rb") as data:
self.container_client(container_name).upload_blob(
name=blob_name, data=data, metadata=metadata, **kwargs
)
def load_string(self, container_name, string_data, blob_name,
metadata=None, **kwargs):
"""
Upload a single file to a path inside the container
:param container_name: Name of the container
:type container_name: str
:param blob_name: Name of the blob
:type blob_name: str
:param string_data: String to load
:type string_data: str
:param metadata: Blob metadata
:type metadata: dict
:param kwargs: Optional keyword arguments that
`ContainerClient.upload_blob()` takes.
:type kwargs: object
"""
self.log.info(f"Uploading data to {container_name}/{blob_name}")
self.container_client(container_name).upload_blob(
name=blob_name, data=string_data, metadata=metadata, **kwargs
)
def load_bytes(self, container_name, buf, blob_name, metadata=None,
**kwargs):
"""
Upload a single file to a path inside the container
:param container_name: Name of the container
:type container_name: str
:param blob_name: Name of the blob
:type blob_name: str
:param buf: Buffer output stream
:type buf: byes
:param metadata: Blob metadata
:type metadata: dict
:param kwargs: Optional keyword arguments that
`ContainerClient.upload_blob()` takes.
:type kwargs: object
"""
self.log.info(f"Uploading data to {container_name}/{blob_name}")
self.container_client(container_name).upload_blob(
name=blob_name, data=buf, metadata=metadata, **kwargs
)
def list_blobs(self, container_name, prefix="", include=None, **kwargs):
"""
Returns a generator to list the blobs under the specified container.
The generator will lazily follow the continuation tokens returned by
the service.
:param container_name: Name of the container
:type container_name: str
:param prefix: Blob name prefix
:type prefix: str
:param include: Options include: 'snapshots', 'metadata',
'uncommittedblobs', 'copy', 'deleted', 'tags'.
:type include: str
:param kwargs: Optional keyword arguments that
`ContainerClient.list_blobs()` takes.
:type kwargs: object
"""
return self.container_client(container_name).list_blobs(
name_starts_with=prefix, include=include, **kwargs
)
def delete_blob(self, container_name, blob_name, is_prefix=False,
ignore_if_missing=False, **kwargs):
"""
Delete a file from Azure Blob Storage.
:param container_name: Name of the container.
:type container_name: str
:param blob_name: Name of the blob.
:type blob_name: str
:param is_prefix: If blob_name is a prefix, delete all matching files
:type is_prefix: bool
:param ignore_if_missing: if True, then return success even if the
blob does not exist.
:type ignore_if_missing: bool
:param kwargs: Optional keyword arguments that
`BlockBlobService.create_blob_from_path()` takes.
:type kwargs: object
"""
if is_prefix:
blobs_to_delete = [
blob.name for blob in self.list_blobs(container_name,
prefix=blob_name, **kwargs)
]
elif self.check_for_blob(container_name, blob_name):
blobs_to_delete = [blob_name]
else:
blobs_to_delete = []
if not ignore_if_missing and len(blobs_to_delete) == 0:
raise AirflowException(f"Blob(s) not found: {blob_name}")
for blob in blobs_to_delete:
self.log.info(f"Deleting {blob}")
self.blob_client(container_name,
blob_name=blob).delete_blob(delete_snapshots=False, **kwargs)
def get_metadata(self, container_name, blob_name, **kwargs):
"""Returns all user-defined metadata, standard HTTP properties, and
system properties for the blob.
It does not return the content of the blob.
:param blob_name: Name of the blob
:type blob_name: str
:param kwargs: Optional keyword arguments that
`BlobClient.get_blob_properties()` takes.
:type kwargs: object
"""
return self.blob_client(container_name,
blob_name).get_blob_properties(**kwargs)
def set_metadata(self, container_name, blob_name, metadata=None,
**kwargs):
"""
Sets user-defined metadata for the blob as one or more name-value
pairs.
:param container_name: Name of the container.
:type container_name: str
:param blob_name: Blob name
:type blob_name: str
:param metadata: Blob metadata
:type metadata: dict
:param kwargs: Optional keyword arguments that
`BlobClient.set_blob_metadata()` takes.
:type kwargs: object
"""
return self.blob_client(container_name,
blob_name).set_blob_metadata(metadata=metadata, **kwargs)
def check_for_blob(self, container_name, blob_name, **kwargs):
"""
Check if a blob exists on Azure Blob Storage.
:param container_name: Name of the container.
:type container_name: str
:param blob_name: Name of the blob.
:type blob_name: str
:param kwargs: Optional keyword arguments that
`BlobClient.get_blob_properties()` takes.
:type kwargs: object
"""
try:
self.blob_client(container_name,
blob_name).get_blob_properties(**kwargs)
except ResourceNotFoundError:
return False
return True
def list_modified_blobs(self, container_name, prefix="", include=None,
modified_since_seconds=None):
"""
Returns a list of blobs modified since a specified period of time.
:param container_name: Name of the container
:type container_name: str
:param prefix: Blob name prefix
:type prefix: str
:param include: Options include: 'snapshots', 'metadata',
'uncommittedblobs', 'copy', 'deleted', 'tags'.
:type include: str
:param kwargs: Optional keyword arguments that
`ContainerClient.list_blobs()` takes.
:type kwargs: object
"""
blobs = self.list_blobs(container_name, prefix, include)
files = []
for blob in blobs:
properties = self.get_metadata(container_name,
blob_name=blob.name)
if (
pendulum.now().int_timestamp -
pendulum.parse(str(properties.last_modified)).int_timestamp
< modified_since_seconds
):
files.append(blob)
return files
def create_filesystem(self):
"""Instantiate filesystems for given protocol and arguments"""
return fsspec.filesystem(
protocol="abfs", account_name=self.conn_info.login,
account_key=self.conn_info.password
)
def copy_blob(self, container_name, blob_name, source_container_name,
source_blob_name, **kwargs):
"""
Copies a blob to a new location
:param container_name: Name of the container.
:type container_name: str
:param blob_name: Blob name
:type blob_name: str
:param source_container_name: Name of the source container
:type source_container_name: str
:param source_blob_name: Name of the source blob
:type source_blob_name: str
:param incremental_copy: Copies the snapshot of the source page blob
to a destination page blob.
The snapshot is copied such that only the
differential changes between the
previously copied snapshot are transferred
to the destination.
:type incremental_copy: bool
:param kwargs: Optional keyword arguments that
`BlobClient.start_copy_from_url()` takes.
:type kwargs: object
"""
source_url = f"{self.url}/{source_container_name}/{source_blob_name}"
return self.blob_client(container_name,
blob_name).start_copy_from_url(source_url, **kwargs)
def copy_blob_list(
self,
source_container,
destination_container,
source_blobs,
destination_path,
delete_first=False,
):
"""
Copy all blobs in a path to another path or container.
:param source_container: Source container
:type container_name: str
:param destination_container: Source container
:type destination_container: str
:param source_blobs: List of blobs to copy
:type source_blobs: list
:param destination_path: Blob path (not including the filename)
:type destination_path: str
:param delete_first: Default false. If true, the contents of the
destination path will be deleted before the copy
:type delete_first: bool
:return: None
"""
if not destination_path.endswith("/"):
destination_path += "/"
if delete_first:
self.log.info(f"Deleting existing blobs in destination path:
{destination_path}!")
try:
self.delete_blob(
container_name=destination_container,
blob_name=destination_path,
is_prefix=True,
ignore_if_missing=True,
)
except azure.common.AzureException as e:
self.log.info(e)
count = 0
for blob in source_blobs:
print(f"{destination_path}{blob.name}")
try:
self.copy_blob(
container_name=destination_container,
source_container_name=source_container,
source_blob_name=blob.name,
blob_name=f"{destination_path}{blob.name}",
)
count += 1
except azure.common.AzureException as e:
self.log.warning(e)
self.log.info(f"Successfully copied {count} out of
{len(source_blobs)} files to {destination_path}!")
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]