josh-fell commented on code in PR #24038: URL: https://github.com/apache/airflow/pull/24038#discussion_r889293142
########## airflow/providers/microsoft/azure/example_dags/example_service_bus_queue.py: ########## @@ -0,0 +1,105 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.providers.microsoft.azure.operators.azure_service_bus_queue import ( + AzureServiceBusCreateQueueOperator, + AzureServiceBusDeleteQueueOperator, + AzureServiceBusReceiveMessageOperator, + AzureServiceBusSendMessageOperator, +) + +EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6)) + +default_args = { + "execution_timeout": timedelta(hours=EXECUTION_TIMEOUT), + "azure_service_bus_conn_id": "azure_service_bus_default", +} + +CLIENT_ID = os.getenv("CLIENT_ID", "") +QUEUE_NAME = "sb_mgmt_queue_test" +MESSAGE = "Test Message" +MESSAGE_LIST = [MESSAGE + " " + str(n) for n in range(0, 10)] + +with DAG( + dag_id="example_azure_service_bus_queue", + start_date=datetime(2021, 8, 13), + schedule_interval=None, + catchup=False, + default_args=default_args, Review Comment: ```suggestion default_args={ "execution_timeout": timedelta(hours=EXECUTION_TIMEOUT), "azure_service_bus_conn_id": "azure_service_bus_default", }, ``` There was some work a while back to cleanup the example DAGs and improve readbility. A smart part of that involved moving `default_args` directly to the `DAG()` so you don't have to look back up to the reference its value. Not required by any means, but it could be nice to have the consistency. ########## airflow/providers/microsoft/azure/example_dags/example_service_bus_queue.py: ########## @@ -0,0 +1,105 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.providers.microsoft.azure.operators.azure_service_bus_queue import ( + AzureServiceBusCreateQueueOperator, + AzureServiceBusDeleteQueueOperator, + AzureServiceBusReceiveMessageOperator, + AzureServiceBusSendMessageOperator, +) + +EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6)) + +default_args = { + "execution_timeout": timedelta(hours=EXECUTION_TIMEOUT), + "azure_service_bus_conn_id": "azure_service_bus_default", +} + +CLIENT_ID = os.getenv("CLIENT_ID", "") +QUEUE_NAME = "sb_mgmt_queue_test" +MESSAGE = "Test Message" +MESSAGE_LIST = [MESSAGE + " " + str(n) for n in range(0, 10)] + +with DAG( + dag_id="example_azure_service_bus_queue", + start_date=datetime(2021, 8, 13), + schedule_interval=None, + catchup=False, + default_args=default_args, + tags=["example", "Azure service bus"], +) as dag: + # [START howto_operator_create_service_bus_queue] + create_service_bus_queue = AzureServiceBusCreateQueueOperator( + task_id="create_service_bus_queue", + queue_name=QUEUE_NAME, Review Comment: I assume not moving `queue_name` into `default_args` was to preserve this portion of the operator's (and others') full instantiation for the docs? ########## airflow/providers/microsoft/azure/example_dags/example_service_bus_queue.py: ########## @@ -0,0 +1,105 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.providers.microsoft.azure.operators.azure_service_bus_queue import ( + AzureServiceBusCreateQueueOperator, + AzureServiceBusDeleteQueueOperator, + AzureServiceBusReceiveMessageOperator, + AzureServiceBusSendMessageOperator, +) + +EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6)) + +default_args = { + "execution_timeout": timedelta(hours=EXECUTION_TIMEOUT), + "azure_service_bus_conn_id": "azure_service_bus_default", +} + +CLIENT_ID = os.getenv("CLIENT_ID", "") +QUEUE_NAME = "sb_mgmt_queue_test" +MESSAGE = "Test Message" +MESSAGE_LIST = [MESSAGE + " " + str(n) for n in range(0, 10)] + +with DAG( + dag_id="example_azure_service_bus_queue", + start_date=datetime(2021, 8, 13), + schedule_interval=None, + catchup=False, + default_args=default_args, + tags=["example", "Azure service bus"], +) as dag: + # [START howto_operator_create_service_bus_queue] + create_service_bus_queue = AzureServiceBusCreateQueueOperator( + task_id="create_service_bus_queue", + queue_name=QUEUE_NAME, + ) + # [END howto_operator_create_service_bus_queue] + + # [START howto_operator_send_message_to_service_bus_queue] + send_message_to_service_bus_queue = AzureServiceBusSendMessageOperator( + task_id="send_message_to_service_bus_queue", + message=MESSAGE, + queue_name=QUEUE_NAME, + batch=False, + ) + # [END howto_operator_send_message_to_service_bus_queue] + + # [START howto_operator_send_list_message_to_service_bus_queue] + send_list_message_to_service_bus_queue = AzureServiceBusSendMessageOperator( + task_id="send_list_message_to_service_bus_queue", + message=MESSAGE_LIST, + queue_name=QUEUE_NAME, + batch=False, + ) + # [END howto_operator_send_list_message_to_service_bus_queue] + + # [START howto_operator_send_batch_message_to_service_bus_queue] + send_batch_message_to_service_bus_queue = AzureServiceBusSendMessageOperator( + task_id="send_batch_message_to_service_bus_queue", + message=MESSAGE_LIST, + queue_name=QUEUE_NAME, + batch=True, + ) + # [END howto_operator_send_batch_message_to_service_bus_queue] + + # [START howto_operator_receive_message_service_bus_queue] + receive_message_service_bus_queue = AzureServiceBusReceiveMessageOperator( + task_id="receive_message_service_bus_queue", + queue_name=QUEUE_NAME, + max_message_count=20, + max_wait_time=5, + ) + # [END howto_operator_receive_message_service_bus_queue] + + # [START howto_operator_delete_service_bus_queue] + delete_service_bus_queue = AzureServiceBusDeleteQueueOperator( + task_id="delete_service_bus_queue", queue_name=QUEUE_NAME, trigger_rule="all_done" + ) + # [END howto_operator_delete_service_bus_queue] + + ( + create_service_bus_queue + >> send_message_to_service_bus_queue + >> send_list_message_to_service_bus_queue + >> send_batch_message_to_service_bus_queue + >> receive_message_service_bus_queue + >> delete_service_bus_queue + ) Review Comment: Using `airflow.models.baseoperator.chain` seems more readable here: ```python chain( create_service_bus_queue, send_message_to_service_bus_queue, send_list_message_to_service_bus_queue, send_batch_message_to_service_bus_queue, receive_message_service_bus_queue, delete_service_bus_queue, ) ``` But no strong opinion though. ########## airflow/providers/microsoft/azure/hooks/base_asb.py: ########## @@ -0,0 +1,73 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Any, Dict, Optional + +from airflow.hooks.base import BaseHook + + +class BaseAzureServiceBusHook(BaseHook): + """ + BaseAzureServiceBusHook class to create session and create connection. Client ID and + Secrete IDs are optional. Review Comment: ```suggestion Secret ID are optional. ``` ########## airflow/providers/microsoft/azure/hooks/asb_admin_client.py: ########## @@ -0,0 +1,83 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from azure.servicebus.management import QueueProperties, ServiceBusAdministrationClient + +from airflow.providers.microsoft.azure.hooks.base_asb import BaseAzureServiceBusHook + + +class AzureServiceBusAdminClientHook(BaseAzureServiceBusHook): + """ + Interacts with Azure ServiceBus management client + and Use this client to create, update, list, and delete resources of a ServiceBus namespace. + it uses the same azure service bus client connection inherits from the base class + """ + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + + def get_conn(self) -> ServiceBusAdministrationClient: + """Create and returns ServiceBusAdministration by using the connection string in connection details""" Review Comment: ```suggestion """Create and returns ServiceBusAdministrationClient by using the connection string in connection details""" ``` Update for consistency with the return annotation. ########## airflow/providers/microsoft/azure/operators/azure_service_bus_queue.py: ########## @@ -0,0 +1,182 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import TYPE_CHECKING, List, Optional, Sequence, Union + +from airflow.models import BaseOperator +from airflow.providers.microsoft.azure.hooks.asb_admin_client import AzureServiceBusAdminClientHook +from airflow.providers.microsoft.azure.hooks.asb_message import ServiceBusMessageHook + +if TYPE_CHECKING: + from airflow.utils.context import Context + + +class AzureServiceBusCreateQueueOperator(BaseOperator): + """ + Creates a Azure ServiceBus queue under a ServiceBus Namespace by using ServiceBusAdministrationClient + + :param queue_name: The name of the queue. should be unique. + :param azure_service_bus_conn_id: Reference to the + :ref:`Azure Service Bus connection<howto/connection:azure_service_bus>`. Review Comment: Missing a few parameters from the docstring. ########## airflow/providers/microsoft/azure/operators/azure_service_bus_queue.py: ########## @@ -0,0 +1,182 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import TYPE_CHECKING, List, Optional, Sequence, Union + +from airflow.models import BaseOperator +from airflow.providers.microsoft.azure.hooks.asb_admin_client import AzureServiceBusAdminClientHook +from airflow.providers.microsoft.azure.hooks.asb_message import ServiceBusMessageHook + +if TYPE_CHECKING: + from airflow.utils.context import Context + + +class AzureServiceBusCreateQueueOperator(BaseOperator): + """ + Creates a Azure ServiceBus queue under a ServiceBus Namespace by using ServiceBusAdministrationClient + + :param queue_name: The name of the queue. should be unique. + :param azure_service_bus_conn_id: Reference to the + :ref:`Azure Service Bus connection<howto/connection:azure_service_bus>`. + """ + + template_fields: Sequence[str] = ("queue_name",) + ui_color = "#e4f0e8" + + def __init__( + self, + *, + queue_name: str, + max_delivery_count: int = 10, + dead_lettering_on_message_expiration: bool = True, + enable_batched_operations: bool = True, + azure_service_bus_conn_id: str = 'azure_service_bus_default', + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.queue_name = queue_name + self.max_delivery_count = max_delivery_count + self.dead_lettering_on_message_expiration = dead_lettering_on_message_expiration + self.enable_batched_operations = enable_batched_operations + self.azure_service_bus_conn_id = azure_service_bus_conn_id + + def execute(self, context: "Context") -> None: + """Creates Queue in Azure Service Bus namespace, by connecting to Service Bus Admin client in hook""" + hook = AzureServiceBusAdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id) + + # create queue with name + queue = hook.create_queue( + self.queue_name, + self.max_delivery_count, + self.dead_lettering_on_message_expiration, + self.enable_batched_operations, + ) + self.log.info("Created Queue ", queue) + + +class AzureServiceBusSendMessageOperator(BaseOperator): + """ + Send Message or batch message to the service bus queue + + :param message: Message which needs to be sent to the queue. It can be string or list of string. + :param batch: Its boolean flag by default it is set to False, if the message needs to be sent + as batch message it can be set to True. + :param azure_service_bus_conn_id: Reference to the + :ref: `Azure Service Bus connection<howto/connection:azure_service_bus>`. + """ + + template_fields: Sequence[str] = ("queue_name",) + ui_color = "#e4f0e8" + + def __init__( + self, + *, + queue_name: str, + message: Union[str, List[str]], + batch: bool = False, + azure_service_bus_conn_id: str = 'azure_service_bus_default', + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.queue_name = queue_name + self.batch = batch + self.message = message + self.azure_service_bus_conn_id = azure_service_bus_conn_id + + def execute(self, context: "Context") -> None: + """ + Sends Message to the specific queue in Service Bus namespace, by + connecting to Service Bus client + """ + # Create the hook + hook = ServiceBusMessageHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id) + + # send message + hook.send_message(self.queue_name, self.message, self.batch) + + +class AzureServiceBusReceiveMessageOperator(BaseOperator): + """ + Receive a batch of messages at once in a specified Queue name + + :param queue_name: The name of the queue in Service Bus namespace. + :param azure_service_bus_conn_id: Reference to the + :ref: `Azure Service Bus connection <howto/connection:azure_service_bus>`. + """ + + template_fields: Sequence[str] = ("queue_name",) + ui_color = "#e4f0e8" + + def __init__( + self, + *, + queue_name: str, + azure_service_bus_conn_id: str = 'azure_service_bus_default', + max_message_count: Optional[int] = 10, + max_wait_time: Optional[float] = 5, Review Comment: ```suggestion max_message_count: int = 10, max_wait_time: float = 5, ``` Can you add these parameters to the operator's docstring too? ########## docs/apache-airflow-providers-microsoft-azure/connections/azure_service_bus.rst: ########## @@ -0,0 +1,64 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + + +.. _howto/connection:azure_service_bus: + +Microsoft Azure Service Bus +======================================= + +The Microsoft Azure Service Bus connection type enables the Azure Service Bus Integration. + +Authenticating to Azure Service Bus +------------------------------------ + +There are two ways to authenticate and authorize access to Azure Service Bus resources: +Azure Active Directory (Azure AD) and Shared Access Signatures (SAS). + +1. Use `Azure Active Directory + <https://docs.microsoft.com/en-gb/azure/service-bus-messaging/service-bus-authentication-and-authorization#azure-active-directory>`_ + i.e. add specific credentials (client_id, secret, tenant) and subscription id to the Airflow connection. +2. Use `Azure Shared access signature + <https://docs.microsoft.com/en-gb/azure/service-bus-messaging/service-bus-authentication-and-authorization#azure-active-directory>`_ +3. Fallback on `DefaultAzureCredential + <https://docs.microsoft.com/en-us/python/api/overview/azure/identity-readme?view=azure-python#defaultazurecredential>`_. + This includes a mechanism to try different options to authenticate: Managed System Identity, environment variables, authentication through Azure CLI... + +Default Connection IDs +---------------------- + +All hooks and operators related to Microsoft Azure Service Bus use ``azure_service_bus_default`` by default. + +Configuring the Connection +-------------------------- + +Client ID (optional) + Specify the ``client_id`` used for the initial connection. + This is needed for *token credentials* authentication mechanism. + It can be left out to fall back on ``DefaultAzureCredential``. + +Secret (optional) + Specify the ``secret`` used for the initial connection. + This is needed for *token credentials* authentication mechanism. + It can be left out to fall back on ``DefaultAzureCredential``. + +Connection ID Review Comment: ```suggestion Connection String ``` ########## airflow/providers/microsoft/azure/hooks/base_asb.py: ########## @@ -0,0 +1,73 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Any, Dict, Optional + +from airflow.hooks.base import BaseHook + + +class BaseAzureServiceBusHook(BaseHook): + """ + BaseAzureServiceBusHook class to create session and create connection. Client ID and + Secrete IDs are optional. + + :param azure_service_bus_conn_id: Reference to the + :ref:`Azure Service Bus connection<howto/connection:azure_service_bus>`. + """ + + conn_name_attr = 'azure_service_bus_conn_id' + default_conn_name = 'azure_service_bus_default' + conn_type = 'azure_service_bus' + hook_name = 'Azure ServiceBus' + + @staticmethod + def get_connection_form_widgets() -> Dict[str, Any]: + """Returns connection widgets to add to connection form""" + from flask_appbuilder.fieldwidgets import BS3TextFieldWidget + from flask_babel import lazy_gettext + from wtforms import StringField + + return { + "extra__azure_service_bus__connection_string": StringField( Review Comment: WDYT about repurposing `schema` instead of creating a new field? ########## docs/apache-airflow-providers-microsoft-azure/connections/azure_service_bus.rst: ########## @@ -0,0 +1,64 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + + +.. _howto/connection:azure_service_bus: + +Microsoft Azure Service Bus +======================================= + +The Microsoft Azure Service Bus connection type enables the Azure Service Bus Integration. + +Authenticating to Azure Service Bus +------------------------------------ + +There are two ways to authenticate and authorize access to Azure Service Bus resources: +Azure Active Directory (Azure AD) and Shared Access Signatures (SAS). + +1. Use `Azure Active Directory + <https://docs.microsoft.com/en-gb/azure/service-bus-messaging/service-bus-authentication-and-authorization#azure-active-directory>`_ + i.e. add specific credentials (client_id, secret, tenant) and subscription id to the Airflow connection. +2. Use `Azure Shared access signature + <https://docs.microsoft.com/en-gb/azure/service-bus-messaging/service-bus-authentication-and-authorization#azure-active-directory>`_ Review Comment: Where do these types of auth come into the picture? I can only find Connection String use in the hooks. ########## docs/apache-airflow-providers-microsoft-azure/connections/azure_service_bus.rst: ########## @@ -0,0 +1,64 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + + +.. _howto/connection:azure_service_bus: + +Microsoft Azure Service Bus +======================================= + +The Microsoft Azure Service Bus connection type enables the Azure Service Bus Integration. + +Authenticating to Azure Service Bus +------------------------------------ + +There are two ways to authenticate and authorize access to Azure Service Bus resources: +Azure Active Directory (Azure AD) and Shared Access Signatures (SAS). + +1. Use `Azure Active Directory + <https://docs.microsoft.com/en-gb/azure/service-bus-messaging/service-bus-authentication-and-authorization#azure-active-directory>`_ + i.e. add specific credentials (client_id, secret, tenant) and subscription id to the Airflow connection. +2. Use `Azure Shared access signature + <https://docs.microsoft.com/en-gb/azure/service-bus-messaging/service-bus-authentication-and-authorization#azure-active-directory>`_ +3. Fallback on `DefaultAzureCredential + <https://docs.microsoft.com/en-us/python/api/overview/azure/identity-readme?view=azure-python#defaultazurecredential>`_. + This includes a mechanism to try different options to authenticate: Managed System Identity, environment variables, authentication through Azure CLI... + +Default Connection IDs +---------------------- + +All hooks and operators related to Microsoft Azure Service Bus use ``azure_service_bus_default`` by default. + +Configuring the Connection +-------------------------- + +Client ID (optional) + Specify the ``client_id`` used for the initial connection. + This is needed for *token credentials* authentication mechanism. + It can be left out to fall back on ``DefaultAzureCredential``. + +Secret (optional) + Specify the ``secret`` used for the initial connection. + This is needed for *token credentials* authentication mechanism. + It can be left out to fall back on ``DefaultAzureCredential``. + +Connection ID + Specify the Azure Service bus connection string ID used for the initial connection. + `Get connection string + <https://docs.microsoft.com/en-gb/azure/service-bus-messaging/service-bus-create-namespace-portal#get-the-connection-string.>`_ + Use the key ``extra__azure_service_bus__connection_string`` to pass in the Connection ID . Review Comment: This could also be `connection_string` provided in classic `extra` right? Just thinking about folks setting up a connection outside of the UI. ########## airflow/providers/microsoft/azure/hooks/asb_admin_client.py: ########## @@ -0,0 +1,83 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from azure.servicebus.management import QueueProperties, ServiceBusAdministrationClient + +from airflow.providers.microsoft.azure.hooks.base_asb import BaseAzureServiceBusHook + + +class AzureServiceBusAdminClientHook(BaseAzureServiceBusHook): + """ + Interacts with Azure ServiceBus management client + and Use this client to create, update, list, and delete resources of a ServiceBus namespace. + it uses the same azure service bus client connection inherits from the base class Review Comment: ```suggestion Interacts with Azure ServiceBus management client to create, update, list, and delete resources of a ServiceBus namespace. This hook uses the same Azure ServiceBus client connection inherited from the base class. ``` Small nits. ########## airflow/providers/microsoft/azure/hooks/base_asb.py: ########## @@ -0,0 +1,73 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Any, Dict, Optional + +from airflow.hooks.base import BaseHook + + +class BaseAzureServiceBusHook(BaseHook): + """ + BaseAzureServiceBusHook class to create session and create connection. Client ID and + Secrete IDs are optional. + + :param azure_service_bus_conn_id: Reference to the + :ref:`Azure Service Bus connection<howto/connection:azure_service_bus>`. + """ + + conn_name_attr = 'azure_service_bus_conn_id' + default_conn_name = 'azure_service_bus_default' + conn_type = 'azure_service_bus' + hook_name = 'Azure ServiceBus' + + @staticmethod + def get_connection_form_widgets() -> Dict[str, Any]: + """Returns connection widgets to add to connection form""" + from flask_appbuilder.fieldwidgets import BS3TextFieldWidget + from flask_babel import lazy_gettext + from wtforms import StringField + + return { + "extra__azure_service_bus__connection_string": StringField( + lazy_gettext('Azure Service Bus Connection String'), widget=BS3TextFieldWidget() + ), + } + + @staticmethod + def get_ui_field_behaviour() -> Dict[str, Any]: + """Returns custom field behaviour""" + return { + "hidden_fields": ['schema', 'port', 'host', 'extra'], + "relabeling": { + 'login': 'Client ID', + 'password': 'Secret', + }, + "placeholders": { + 'login': 'Client ID (Optional)', + 'password': 'Client Secret (Optional)', + 'extra__azure_service_bus__connection_string': 'Azure Service Bus Connection String', + }, + } + + def __init__(self, azure_service_bus_conn_id: str = default_conn_name) -> None: + super().__init__() + self.conn_id = azure_service_bus_conn_id + self._conn = None + self.connection_string: Optional[str] = None + + def get_conn(self): + return None Review Comment: Should this raise a `NotImplementedError()`? Seems like other hooks that subclass this one do overwrite this method and _should_ I assume. WDYT? ########## airflow/providers/microsoft/azure/hooks/base_asb.py: ########## @@ -0,0 +1,73 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Any, Dict, Optional + +from airflow.hooks.base import BaseHook + + +class BaseAzureServiceBusHook(BaseHook): + """ + BaseAzureServiceBusHook class to create session and create connection. Client ID and + Secrete IDs are optional. + + :param azure_service_bus_conn_id: Reference to the + :ref:`Azure Service Bus connection<howto/connection:azure_service_bus>`. + """ + + conn_name_attr = 'azure_service_bus_conn_id' + default_conn_name = 'azure_service_bus_default' + conn_type = 'azure_service_bus' + hook_name = 'Azure ServiceBus' + + @staticmethod + def get_connection_form_widgets() -> Dict[str, Any]: + """Returns connection widgets to add to connection form""" + from flask_appbuilder.fieldwidgets import BS3TextFieldWidget + from flask_babel import lazy_gettext + from wtforms import StringField + + return { + "extra__azure_service_bus__connection_string": StringField( + lazy_gettext('Azure Service Bus Connection String'), widget=BS3TextFieldWidget() + ), + } + + @staticmethod + def get_ui_field_behaviour() -> Dict[str, Any]: + """Returns custom field behaviour""" + return { + "hidden_fields": ['schema', 'port', 'host', 'extra'], + "relabeling": { + 'login': 'Client ID', + 'password': 'Secret', + }, + "placeholders": { + 'login': 'Client ID (Optional)', + 'password': 'Client Secret (Optional)', + 'extra__azure_service_bus__connection_string': 'Azure Service Bus Connection String', Review Comment: WDYT about having an example connection string or even not having a placeholder at all for `extra__azure_service_bus__connection_string`? Repeating the form field name might not help users looking at the form. Also, I could definitely be missing it, but is login/Client ID and password/Secret used in the other hooks? I can only find connection string use. Would future additional hooks use Client ID/Secret? ########## airflow/providers/microsoft/azure/operators/azure_service_bus_queue.py: ########## @@ -0,0 +1,182 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import TYPE_CHECKING, List, Optional, Sequence, Union + +from airflow.models import BaseOperator +from airflow.providers.microsoft.azure.hooks.asb_admin_client import AzureServiceBusAdminClientHook +from airflow.providers.microsoft.azure.hooks.asb_message import ServiceBusMessageHook + +if TYPE_CHECKING: + from airflow.utils.context import Context + + +class AzureServiceBusCreateQueueOperator(BaseOperator): + """ + Creates a Azure ServiceBus queue under a ServiceBus Namespace by using ServiceBusAdministrationClient + + :param queue_name: The name of the queue. should be unique. + :param azure_service_bus_conn_id: Reference to the + :ref:`Azure Service Bus connection<howto/connection:azure_service_bus>`. + """ + + template_fields: Sequence[str] = ("queue_name",) + ui_color = "#e4f0e8" + + def __init__( + self, + *, + queue_name: str, + max_delivery_count: int = 10, + dead_lettering_on_message_expiration: bool = True, + enable_batched_operations: bool = True, + azure_service_bus_conn_id: str = 'azure_service_bus_default', + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.queue_name = queue_name + self.max_delivery_count = max_delivery_count + self.dead_lettering_on_message_expiration = dead_lettering_on_message_expiration + self.enable_batched_operations = enable_batched_operations + self.azure_service_bus_conn_id = azure_service_bus_conn_id + + def execute(self, context: "Context") -> None: + """Creates Queue in Azure Service Bus namespace, by connecting to Service Bus Admin client in hook""" + hook = AzureServiceBusAdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id) + + # create queue with name + queue = hook.create_queue( + self.queue_name, + self.max_delivery_count, + self.dead_lettering_on_message_expiration, + self.enable_batched_operations, + ) + self.log.info("Created Queue ", queue) + + +class AzureServiceBusSendMessageOperator(BaseOperator): + """ + Send Message or batch message to the service bus queue + + :param message: Message which needs to be sent to the queue. It can be string or list of string. + :param batch: Its boolean flag by default it is set to False, if the message needs to be sent + as batch message it can be set to True. + :param azure_service_bus_conn_id: Reference to the + :ref: `Azure Service Bus connection<howto/connection:azure_service_bus>`. + """ + + template_fields: Sequence[str] = ("queue_name",) + ui_color = "#e4f0e8" + + def __init__( + self, + *, + queue_name: str, Review Comment: This parameter is missing from the docstring. ########## airflow/providers/microsoft/azure/operators/azure_service_bus_queue.py: ########## @@ -0,0 +1,182 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import TYPE_CHECKING, List, Optional, Sequence, Union + +from airflow.models import BaseOperator +from airflow.providers.microsoft.azure.hooks.asb_admin_client import AzureServiceBusAdminClientHook +from airflow.providers.microsoft.azure.hooks.asb_message import ServiceBusMessageHook + +if TYPE_CHECKING: + from airflow.utils.context import Context + + +class AzureServiceBusCreateQueueOperator(BaseOperator): + """ + Creates a Azure ServiceBus queue under a ServiceBus Namespace by using ServiceBusAdministrationClient + + :param queue_name: The name of the queue. should be unique. + :param azure_service_bus_conn_id: Reference to the + :ref:`Azure Service Bus connection<howto/connection:azure_service_bus>`. + """ + + template_fields: Sequence[str] = ("queue_name",) + ui_color = "#e4f0e8" + + def __init__( + self, + *, + queue_name: str, + max_delivery_count: int = 10, + dead_lettering_on_message_expiration: bool = True, + enable_batched_operations: bool = True, + azure_service_bus_conn_id: str = 'azure_service_bus_default', + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.queue_name = queue_name + self.max_delivery_count = max_delivery_count + self.dead_lettering_on_message_expiration = dead_lettering_on_message_expiration + self.enable_batched_operations = enable_batched_operations + self.azure_service_bus_conn_id = azure_service_bus_conn_id + + def execute(self, context: "Context") -> None: + """Creates Queue in Azure Service Bus namespace, by connecting to Service Bus Admin client in hook""" + hook = AzureServiceBusAdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id) + + # create queue with name + queue = hook.create_queue( + self.queue_name, + self.max_delivery_count, + self.dead_lettering_on_message_expiration, + self.enable_batched_operations, + ) + self.log.info("Created Queue ", queue) Review Comment: Should this log the queue name rather than the `QueueProperties` object result from `create_queue()`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
