This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 09f38ad3f6 Implement Azure Service Bus Queue Operators (#24038)
09f38ad3f6 is described below

commit 09f38ad3f6872bae5059a1de226362eb358c4a7a
Author: Bharanidharan <[email protected]>
AuthorDate: Thu Jun 23 21:22:47 2022 +0530

    Implement Azure Service Bus Queue Operators (#24038)
    
    Implemented Azure Service Bus Queue based Operator's to create queue, send 
message to the queue and receive message(list of message or batch message) and 
delete queue in azure service
    - Added `AzureServiceBusCreateQueueOperator`
    - Added `AzureServiceBusSendMessageOperator`
    - Added  `AzureServiceBusReceiveMessageOperator`
    - Added `AzureServiceBusDeleteQueueOperator`
    - Added Example DAG
    - Added Documentation
    - Added hooks and connection type in - provider yaml file
    - Added unit Test case, doc strings
---
 airflow/providers/microsoft/azure/hooks/asb.py     | 197 ++++++++++++++++++++
 airflow/providers/microsoft/azure/operators/asb.py | 206 +++++++++++++++++++++
 airflow/providers/microsoft/azure/provider.yaml    |  15 ++
 .../connections/asb.rst                            |  50 +++++
 .../operators/asb.rst                              | 107 +++++++++++
 docs/integration-logos/azure/Service-Bus.svg       |   1 +
 setup.py                                           |   1 +
 tests/providers/microsoft/azure/hooks/test_asb.py  | 203 ++++++++++++++++++++
 .../microsoft/azure/operators/test_asb.py          | 195 +++++++++++++++++++
 .../microsoft/azure/example_azure_service_bus.py   | 115 ++++++++++++
 10 files changed, 1090 insertions(+)

diff --git a/airflow/providers/microsoft/azure/hooks/asb.py 
b/airflow/providers/microsoft/azure/hooks/asb.py
new file mode 100644
index 0000000000..e99296b602
--- /dev/null
+++ b/airflow/providers/microsoft/azure/hooks/asb.py
@@ -0,0 +1,197 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Any, Dict, List, Optional, Union
+
+from azure.servicebus import ServiceBusClient, ServiceBusMessage, 
ServiceBusSender
+from azure.servicebus.management import QueueProperties, 
ServiceBusAdministrationClient
+
+from airflow.hooks.base import BaseHook
+
+
+class BaseAzureServiceBusHook(BaseHook):
+    """
+    BaseAzureServiceBusHook class to create session and create connection 
using connection string
+
+    :param azure_service_bus_conn_id: Reference to the
+        :ref:`Azure Service Bus 
connection<howto/connection:azure_service_bus>`.
+    """
+
+    conn_name_attr = 'azure_service_bus_conn_id'
+    default_conn_name = 'azure_service_bus_default'
+    conn_type = 'azure_service_bus'
+    hook_name = 'Azure Service Bus'
+
+    @staticmethod
+    def get_ui_field_behaviour() -> Dict[str, Any]:
+        """Returns custom field behaviour"""
+        return {
+            "hidden_fields": ['port', 'host', 'extra', 'login', 'password'],
+            "relabeling": {'schema': 'Connection String'},
+            "placeholders": {
+                'schema': 'Endpoint=sb://<Resource 
group>.servicebus.windows.net/;SharedAccessKeyName=<AccessKeyName>;SharedAccessKey=<SharedAccessKey>',
  # noqa
+            },
+        }
+
+    def __init__(self, azure_service_bus_conn_id: str = default_conn_name) -> 
None:
+        super().__init__()
+        self.conn_id = azure_service_bus_conn_id
+
+    def get_conn(self):
+        raise NotImplementedError
+
+
+class AdminClientHook(BaseAzureServiceBusHook):
+    """
+    Interacts with ServiceBusAdministrationClient client
+    to create, update, list, and delete resources of a
+    Service Bus namespace.  This hook uses the same Azure Service Bus client 
connection inherited
+    from the base class
+    """
+
+    def get_conn(self) -> ServiceBusAdministrationClient:
+        """
+        Create and returns ServiceBusAdministrationClient by using the 
connection
+        string in connection details
+        """
+        conn = self.get_connection(self.conn_id)
+
+        connection_string: str = str(conn.schema)
+        return 
ServiceBusAdministrationClient.from_connection_string(connection_string)
+
+    def create_queue(
+        self,
+        queue_name: str,
+        max_delivery_count: int = 10,
+        dead_lettering_on_message_expiration: bool = True,
+        enable_batched_operations: bool = True,
+    ) -> QueueProperties:
+        """
+        Create Queue by connecting to service Bus Admin client return the 
QueueProperties
+
+        :param queue_name: The name of the queue or a QueueProperties with 
name.
+        :param max_delivery_count: The maximum delivery count. A message is 
automatically
+            dead lettered after this number of deliveries. Default value is 
10..
+        :param dead_lettering_on_message_expiration: A value that indicates 
whether this subscription has
+            dead letter support when a message expires.
+        :param enable_batched_operations: Value that indicates whether 
server-side batched
+            operations are enabled.
+        """
+        if queue_name is None:
+            raise TypeError("Queue name cannot be None.")
+
+        with self.get_conn() as service_mgmt_conn:
+            queue = service_mgmt_conn.create_queue(
+                queue_name,
+                max_delivery_count=max_delivery_count,
+                
dead_lettering_on_message_expiration=dead_lettering_on_message_expiration,
+                enable_batched_operations=enable_batched_operations,
+            )
+            return queue
+
+    def delete_queue(self, queue_name: str) -> None:
+        """
+        Delete the queue by queue_name in service bus namespace
+
+        :param queue_name: The name of the queue or a QueueProperties with 
name.
+        """
+        if queue_name is None:
+            raise TypeError("Queue name cannot be None.")
+
+        with self.get_conn() as service_mgmt_conn:
+            service_mgmt_conn.delete_queue(queue_name)
+
+
+class MessageHook(BaseAzureServiceBusHook):
+    """
+    Interacts with ServiceBusClient and acts as a high level interface
+    for getting ServiceBusSender and ServiceBusReceiver.
+    """
+
+    def get_conn(self) -> ServiceBusClient:
+        """Create and returns ServiceBusClient by using the connection string 
in connection details"""
+        conn = self.get_connection(self.conn_id)
+        connection_string: str = str(conn.schema)
+
+        self.log.info("Create and returns ServiceBusClient")
+        return 
ServiceBusClient.from_connection_string(conn_str=connection_string, 
logging_enable=True)
+
+    def send_message(
+        self, queue_name: str, messages: Union[str, List[str]], 
batch_message_flag: bool = False
+    ):
+        """
+        By using ServiceBusClient Send message(s) to a Service Bus Queue. By 
using
+        batch_message_flag it enables and send message as batch message
+
+        :param queue_name: The name of the queue or a QueueProperties with 
name.
+        :param messages: Message which needs to be sent to the queue. It can 
be string or list of string.
+        :param batch_message_flag: bool flag, can be set to True if message 
needs to be sent as batch message.
+        """
+        if queue_name is None:
+            raise TypeError("Queue name cannot be None.")
+        if not messages:
+            raise ValueError("Messages list cannot be empty.")
+        with self.get_conn() as service_bus_client, 
service_bus_client.get_queue_sender(
+            queue_name=queue_name
+        ) as sender:
+            with sender:
+                if isinstance(messages, str):
+                    if not batch_message_flag:
+                        msg = ServiceBusMessage(messages)
+                        sender.send_messages(msg)
+                    else:
+                        self.send_batch_message(sender, [messages])
+                else:
+                    if not batch_message_flag:
+                        self.send_list_messages(sender, messages)
+                    else:
+                        self.send_batch_message(sender, messages)
+
+    @staticmethod
+    def send_list_messages(sender: ServiceBusSender, messages: List[str]):
+        list_messages = [ServiceBusMessage(message) for message in messages]
+        sender.send_messages(list_messages)  # type: ignore[arg-type]
+
+    @staticmethod
+    def send_batch_message(sender: ServiceBusSender, messages: List[str]):
+        batch_message = sender.create_message_batch()
+        for message in messages:
+            batch_message.add_message(ServiceBusMessage(message))
+        sender.send_messages(batch_message)
+
+    def receive_message(
+        self, queue_name, max_message_count: Optional[int] = 1, max_wait_time: 
Optional[float] = None
+    ):
+        """
+        Receive a batch of messages at once in a specified Queue name
+
+        :param queue_name: The name of the queue name or a QueueProperties 
with name.
+        :param max_message_count: Maximum number of messages in the batch.
+        :param max_wait_time: Maximum time to wait in seconds for the first 
message to arrive.
+        """
+        if queue_name is None:
+            raise TypeError("Queue name cannot be None.")
+
+        with self.get_conn() as service_bus_client, 
service_bus_client.get_queue_receiver(
+            queue_name=queue_name
+        ) as receiver:
+            with receiver:
+                received_msgs = receiver.receive_messages(
+                    max_message_count=max_message_count, 
max_wait_time=max_wait_time
+                )
+                for msg in received_msgs:
+                    self.log.info(msg)
+                    receiver.complete_message(msg)
diff --git a/airflow/providers/microsoft/azure/operators/asb.py 
b/airflow/providers/microsoft/azure/operators/asb.py
new file mode 100644
index 0000000000..f8c363c678
--- /dev/null
+++ b/airflow/providers/microsoft/azure/operators/asb.py
@@ -0,0 +1,206 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import TYPE_CHECKING, List, Sequence, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.microsoft.azure.hooks.asb import AdminClientHook, 
MessageHook
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class AzureServiceBusCreateQueueOperator(BaseOperator):
+    """
+    Creates a Azure Service Bus queue under a Service Bus Namespace by using 
ServiceBusAdministrationClient
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:AzureServiceBusCreateQueueOperator`
+
+    :param queue_name: The name of the queue. should be unique.
+    :param max_delivery_count: The maximum delivery count. A message is 
automatically
+            dead lettered after this number of deliveries. Default value is 
10..
+    :param dead_lettering_on_message_expiration: A value that indicates 
whether this subscription has
+        dead letter support when a message expires.
+    :param enable_batched_operations: Value that indicates whether server-side 
batched
+        operations are enabled.
+    :param azure_service_bus_conn_id: Reference to the
+        :ref:`Azure Service Bus 
connection<howto/connection:azure_service_bus>`.
+    """
+
+    template_fields: Sequence[str] = ("queue_name",)
+    ui_color = "#e4f0e8"
+
+    def __init__(
+        self,
+        *,
+        queue_name: str,
+        max_delivery_count: int = 10,
+        dead_lettering_on_message_expiration: bool = True,
+        enable_batched_operations: bool = True,
+        azure_service_bus_conn_id: str = 'azure_service_bus_default',
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.queue_name = queue_name
+        self.max_delivery_count = max_delivery_count
+        self.dead_lettering_on_message_expiration = 
dead_lettering_on_message_expiration
+        self.enable_batched_operations = enable_batched_operations
+        self.azure_service_bus_conn_id = azure_service_bus_conn_id
+
+    def execute(self, context: "Context") -> None:
+        """Creates Queue in Azure Service Bus namespace, by connecting to 
Service Bus Admin client in hook"""
+        hook = 
AdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)
+
+        # create queue with name
+        queue = hook.create_queue(
+            self.queue_name,
+            self.max_delivery_count,
+            self.dead_lettering_on_message_expiration,
+            self.enable_batched_operations,
+        )
+        self.log.info("Created Queue %s", queue.name)
+
+
+class AzureServiceBusSendMessageOperator(BaseOperator):
+    """
+    Send Message or batch message to the Service Bus queue
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:AzureServiceBusSendMessageOperator`
+
+    :param queue_name: The name of the queue. should be unique.
+    :param message: Message which needs to be sent to the queue. It can be 
string or list of string.
+    :param batch: Its boolean flag by default it is set to False, if the 
message needs to be sent
+        as batch message it can be set to True.
+    :param azure_service_bus_conn_id: Reference to the
+        :ref: `Azure Service Bus 
connection<howto/connection:azure_service_bus>`.
+    """
+
+    template_fields: Sequence[str] = ("queue_name",)
+    ui_color = "#e4f0e8"
+
+    def __init__(
+        self,
+        *,
+        queue_name: str,
+        message: Union[str, List[str]],
+        batch: bool = False,
+        azure_service_bus_conn_id: str = 'azure_service_bus_default',
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.queue_name = queue_name
+        self.batch = batch
+        self.message = message
+        self.azure_service_bus_conn_id = azure_service_bus_conn_id
+
+    def execute(self, context: "Context") -> None:
+        """
+        Sends Message to the specific queue in Service Bus namespace, by
+        connecting to Service Bus  client
+        """
+        # Create the hook
+        hook = 
MessageHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)
+
+        # send message
+        hook.send_message(self.queue_name, self.message, self.batch)
+
+
+class AzureServiceBusReceiveMessageOperator(BaseOperator):
+    """
+    Receive a batch of messages at once in a specified Queue name
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:AzureServiceBusReceiveMessageOperator`
+
+    :param queue_name: The name of the queue name or a QueueProperties with 
name.
+    :param max_message_count: Maximum number of messages in the batch.
+    :param max_wait_time: Maximum time to wait in seconds for the first 
message to arrive.
+    :param azure_service_bus_conn_id: Reference to the
+        :ref: `Azure Service Bus connection 
<howto/connection:azure_service_bus>`.
+    """
+
+    template_fields: Sequence[str] = ("queue_name",)
+    ui_color = "#e4f0e8"
+
+    def __init__(
+        self,
+        *,
+        queue_name: str,
+        azure_service_bus_conn_id: str = 'azure_service_bus_default',
+        max_message_count: int = 10,
+        max_wait_time: float = 5,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.queue_name = queue_name
+        self.azure_service_bus_conn_id = azure_service_bus_conn_id
+        self.max_message_count = max_message_count
+        self.max_wait_time = max_wait_time
+
+    def execute(self, context: "Context") -> None:
+        """
+        Receive Message in specific queue in Service Bus namespace,
+        by connecting to Service Bus client
+        """
+        # Create the hook
+        hook = 
MessageHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)
+
+        # Receive message
+        hook.receive_message(
+            self.queue_name, max_message_count=self.max_message_count, 
max_wait_time=self.max_wait_time
+        )
+
+
+class AzureServiceBusDeleteQueueOperator(BaseOperator):
+    """
+    Deletes the Queue in the Azure Service Bus namespace
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:AzureServiceBusDeleteQueueOperator`
+
+    :param queue_name: The name of the queue in Service Bus namespace.
+    :param azure_service_bus_conn_id: Reference to the
+        :ref: `Azure Service Bus connection 
<howto/connection:azure_service_bus>`.
+    """
+
+    template_fields: Sequence[str] = ("queue_name",)
+    ui_color = "#e4f0e8"
+
+    def __init__(
+        self,
+        *,
+        queue_name: str,
+        azure_service_bus_conn_id: str = 'azure_service_bus_default',
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.queue_name = queue_name
+        self.azure_service_bus_conn_id = azure_service_bus_conn_id
+
+    def execute(self, context: "Context") -> None:
+        """Delete Queue in Service Bus namespace, by connecting to Service Bus 
Admin client"""
+        # Create the hook
+        hook = 
AdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)
+
+        # delete queue with name
+        hook.delete_queue(self.queue_name)
diff --git a/airflow/providers/microsoft/azure/provider.yaml 
b/airflow/providers/microsoft/azure/provider.yaml
index e4ca164b23..e4373d9f09 100644
--- a/airflow/providers/microsoft/azure/provider.yaml
+++ b/airflow/providers/microsoft/azure/provider.yaml
@@ -89,6 +89,12 @@ integrations:
     external-doc-url: https://azure.microsoft.com/
     logo: /integration-logos/azure/Microsoft-Azure.png
     tags: [azure]
+  - integration-name: Microsoft Azure Service Bus
+    external-doc-url: https://azure.microsoft.com/en-us/services/service-bus/
+    logo: /integration-logos/azure/Service-Bus.svg
+    how-to-guide:
+      - /docs/apache-airflow-providers-microsoft-azure/operators/asb.rst
+    tags: [azure]
 
 operators:
   - integration-name: Microsoft Azure Data Lake Storage
@@ -117,6 +123,9 @@ operators:
   - integration-name: Microsoft Azure Data Factory
     python-modules:
       - airflow.providers.microsoft.azure.operators.data_factory
+  - integration-name: Microsoft Azure Service Bus
+    python-modules:
+      - airflow.providers.microsoft.azure.operators.asb
 
 sensors:
   - integration-name: Microsoft Azure Cosmos DB
@@ -168,6 +177,9 @@ hooks:
     python-modules:
       - airflow.providers.microsoft.azure.hooks.data_factory
       - airflow.providers.microsoft.azure.hooks.azure_data_factory
+  - integration-name: Microsoft Azure Service Bus
+    python-modules:
+      - airflow.providers.microsoft.azure.hooks.asb
 
 transfers:
   - source-integration-name: Local
@@ -204,6 +216,7 @@ hook-class-names:  # deprecated - to be removed after 
providers add dependency o
   - airflow.providers.microsoft.azure.hooks.wasb.WasbHook
   - airflow.providers.microsoft.azure.hooks.data_factory.AzureDataFactoryHook
   - 
airflow.providers.microsoft.azure.hooks.container_registry.AzureContainerRegistryHook
+  - airflow.providers.microsoft.azure.hooks.asb.BaseAzureServiceBusHook
 
 connection-types:
   - hook-class-name: 
airflow.providers.microsoft.azure.hooks.base_azure.AzureBaseHook
@@ -230,6 +243,8 @@ connection-types:
   - hook-class-name: >-
       
airflow.providers.microsoft.azure.hooks.container_registry.AzureContainerRegistryHook
     connection-type: azure_container_registry
+  - hook-class-name: 
airflow.providers.microsoft.azure.hooks.asb.BaseAzureServiceBusHook
+    connection-type: azure_service_bus
 
 secrets-backends:
   - airflow.providers.microsoft.azure.secrets.key_vault.AzureKeyVaultBackend
diff --git a/docs/apache-airflow-providers-microsoft-azure/connections/asb.rst 
b/docs/apache-airflow-providers-microsoft-azure/connections/asb.rst
new file mode 100644
index 0000000000..daf50d6017
--- /dev/null
+++ b/docs/apache-airflow-providers-microsoft-azure/connections/asb.rst
@@ -0,0 +1,50 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+
+.. _howto/connection:azure_service_bus:
+
+Microsoft Azure Service Bus
+=======================================
+
+The Microsoft Azure Service Bus connection type enables the Azure Service Bus 
Integration.
+
+Authenticating to Azure Service Bus
+------------------------------------
+
+There are multiple ways to authenticate and authorize access to Azure Service 
Bus resources:
+Currently Supports Shared Access Signatures (SAS).
+
+1. Use a `Connection String
+   
<https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-quickstart-portal#get-the-connection-string>`_
+   i.e. Use connection string Field to add ``Connection String`` in the 
Airflow connection.
+
+Default Connection IDs
+----------------------
+
+All hooks and operators related to Microsoft Azure Service Bus use 
``azure_service_bus_default`` by default.
+
+Configuring the Connection
+--------------------------
+
+Connection String
+    Specify the Azure Service bus connection string ID used for the initial 
connection.
+    Please find the documentation on how to generate connection string in 
azure service bus
+    `Get connection string
+    
<https://docs.microsoft.com/en-gb/azure/service-bus-messaging/service-bus-create-namespace-portal#get-the-connection-string.>`_
+    Use the key ``connection_string`` to pass in the Connection ID .
diff --git a/docs/apache-airflow-providers-microsoft-azure/operators/asb.rst 
b/docs/apache-airflow-providers-microsoft-azure/operators/asb.rst
new file mode 100644
index 0000000000..96b27a7d80
--- /dev/null
+++ b/docs/apache-airflow-providers-microsoft-azure/operators/asb.rst
@@ -0,0 +1,107 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Azure Service Bus Operators
+============================
+Azure Service Bus is a fully managed enterprise message broker with message 
queues and
+publish-subscribe topics (in a namespace). Service Bus is used to decouple 
applications
+and services from each other. Service Bus that perform operations on
+entities, such as namespaces, queues, and topics.
+
+The Service Bus REST API provides operations for working with the following 
resources:
+  - Azure Resource Manager
+  - Service Bus service
+
+Azure Service Bus Queue Operators
+---------------------------------
+Azure Service Bus Operators helps to interact with Azure Bus Queue based 
operation like Create, Delete,
+Send and Receive message in Queue.
+
+.. _howto/operator:AzureServiceBusCreateQueueOperator:
+
+Create Azure Service Bus Queue
+===============================
+
+To create Azure service bus queue with specific Parameter you can use
+:class:`~airflow.providers.microsoft.azure.operators.asb.AzureServiceBusCreateQueueOperator`.
+
+Below is an example of using this operator to execute an Azure Service Bus 
Create Queue.
+
+.. exampleinclude:: 
/../../tests/system/providers/microsoft/azure/example_azure_service_bus.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_create_service_bus_queue]
+    :end-before: [END howto_operator_create_service_bus_queue]
+
+
+.. _howto/operator:AzureServiceBusSendMessageOperator:
+
+Send Message to Azure Service Bus Queue
+=======================================
+
+To Send message or list of message or batch Message to the Azure Service Bus 
Queue. You can use
+:class:`~airflow.providers.microsoft.azure.operators.asb.AzureServiceBusSendMessageOperator`.
+
+Below is an example of using this operator to execute an Azure Service Bus 
Send Message to Queue.
+
+.. exampleinclude:: 
/../../tests/system/providers/microsoft/azure/example_azure_service_bus.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_send_message_to_service_bus_queue]
+    :end-before: [END howto_operator_send_message_to_service_bus_queue]
+
+
+.. _howto/operator:AzureServiceBusReceiveMessageOperator:
+
+Receive Message Azure Service Bus Queue
+========================================
+
+To Receive Message or list of message or Batch message message in a Queue you 
can use
+:class:`~airflow.providers.microsoft.azure.operators.asb.AzureServiceBusReceiveMessageOperator`.
+
+Below is an example of using this operator to execute an Azure Service Bus 
Create Queue.
+
+.. exampleinclude:: 
/../../tests/system/providers/microsoft/azure/example_azure_service_bus.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_receive_message_service_bus_queue]
+    :end-before: [END howto_operator_receive_message_service_bus_queue]
+
+
+.. _howto/operator:AzureServiceBusDeleteQueueOperator:
+
+Delete Azure Service Bus Queue
+===============================
+
+To Delete the Azure service bus queue you can use
+:class:`~airflow.providers.microsoft.azure.operators.asb.AzureServiceBusDeleteQueueOperator`.
+
+Below is an example of using this operator to execute an Azure Service Bus 
Delete Queue.
+
+.. exampleinclude:: 
/../../tests/system/providers/microsoft/azure/example_azure_service_bus.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_delete_service_bus_queue]
+    :end-before: [END howto_operator_delete_service_bus_queue]
+
+
+Reference
+---------
+
+For further information, please refer to the Microsoft documentation:
+
+  * `Azure Service Bus Documentation 
<https://azure.microsoft.com/en-us/services/service-bus/>`__
diff --git a/docs/integration-logos/azure/Service-Bus.svg 
b/docs/integration-logos/azure/Service-Bus.svg
new file mode 100644
index 0000000000..1604e04232
--- /dev/null
+++ b/docs/integration-logos/azure/Service-Bus.svg
@@ -0,0 +1 @@
+<svg xmlns="http://www.w3.org/2000/svg"; viewBox="0 0 18 
18"><defs><linearGradient id="a" x1="9" y1="14.071" x2="9" y2="1.313" 
gradientUnits="userSpaceOnUse"><stop offset="0" stop-color="#0078d4"/><stop 
offset=".156" stop-color="#1380da"/><stop offset=".528" 
stop-color="#3c91e5"/><stop offset=".822" stop-color="#559cec"/><stop 
offset="1" stop-color="#5ea0ef"/></linearGradient></defs><path d="M18 
10.075a4.044 4.044 0 00-3.51-3.887 5.1 5.1 0 00-5.25-4.875 5.229 5.229 0 00-5 
3.408A4.825 4.82 [...]
diff --git a/setup.py b/setup.py
index c52a682f66..c997322b12 100644
--- a/setup.py
+++ b/setup.py
@@ -232,6 +232,7 @@ azure = [
     'azure-storage-blob>=12.7.0,<12.9.0',
     'azure-storage-common>=2.1.0',
     'azure-storage-file>=2.1.0',
+    'azure-servicebus>=7.6.1',
 ]
 cassandra = [
     'cassandra-driver>=3.13.0',
diff --git a/tests/providers/microsoft/azure/hooks/test_asb.py 
b/tests/providers/microsoft/azure/hooks/test_asb.py
new file mode 100644
index 0000000000..315a31802a
--- /dev/null
+++ b/tests/providers/microsoft/azure/hooks/test_asb.py
@@ -0,0 +1,203 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import mock
+
+import pytest
+from azure.servicebus import ServiceBusClient, ServiceBusMessage, 
ServiceBusMessageBatch
+from azure.servicebus.management import ServiceBusAdministrationClient
+
+from airflow.models import Connection
+from airflow.providers.microsoft.azure.hooks.asb import AdminClientHook, 
MessageHook
+
+MESSAGE = "Test Message"
+MESSAGE_LIST = [MESSAGE + " " + str(n) for n in range(0, 10)]
+
+
+class TestAdminClientHook:
+    def setup_class(self) -> None:
+        self.queue_name: str = "test_queue"
+        self.conn_id: str = 'azure_service_bus_default'
+        self.connection_string = (
+            "Endpoint=sb://test-service-bus-provider.servicebus.windows.net/;"
+            "SharedAccessKeyName=Test;SharedAccessKey=1234566acbc"
+        )
+        self.mock_conn = Connection(
+            conn_id='azure_service_bus_default',
+            conn_type='azure_service_bus',
+            schema=self.connection_string,
+        )
+
+    
@mock.patch("airflow.providers.microsoft.azure.hooks.asb.AdminClientHook.get_connection")
+    def test_get_conn(self, mock_connection):
+        mock_connection.return_value = self.mock_conn
+        hook = AdminClientHook(azure_service_bus_conn_id=self.conn_id)
+        assert isinstance(hook.get_conn(), ServiceBusAdministrationClient)
+
+    @mock.patch('azure.servicebus.management.QueueProperties')
+    
@mock.patch('airflow.providers.microsoft.azure.hooks.asb.AdminClientHook.get_conn')
+    def test_create_queue(self, mock_sb_admin_client, mock_queue_properties):
+        """
+        Test `create_queue` hook function with mocking connection, queue 
properties value and
+        the azure service bus `create_queue` function
+        """
+        mock_queue_properties.name = self.queue_name
+        
mock_sb_admin_client.return_value.__enter__.return_value.create_queue.return_value
 = (
+            mock_queue_properties
+        )
+        hook = AdminClientHook(azure_service_bus_conn_id=self.conn_id)
+        response = hook.create_queue(self.queue_name)
+        assert response == mock_queue_properties
+
+    
@mock.patch('airflow.providers.microsoft.azure.hooks.asb.ServiceBusAdministrationClient')
+    def test_create_queue_exception(self, mock_sb_admin_client):
+        """Test `create_queue` functionality to raise ValueError by passing 
queue name as None"""
+        hook = AdminClientHook(azure_service_bus_conn_id=self.conn_id)
+        with pytest.raises(TypeError):
+            hook.create_queue(None)
+
+    
@mock.patch('airflow.providers.microsoft.azure.hooks.asb.AdminClientHook.get_conn')
+    def test_delete_queue(self, mock_sb_admin_client):
+        """
+        Test Delete queue functionality by passing queue name, assert the 
function with values,
+        mock the azure service bus function  `delete_queue`
+        """
+        hook = AdminClientHook(azure_service_bus_conn_id=self.conn_id)
+        hook.delete_queue(self.queue_name)
+        expected_calls = 
[mock.call().__enter__().delete_queue(self.queue_name)]
+        mock_sb_admin_client.assert_has_calls(expected_calls)
+
+    
@mock.patch('airflow.providers.microsoft.azure.hooks.asb.ServiceBusAdministrationClient')
+    def test_delete_queue_exception(self, mock_sb_admin_client):
+        """Test `delete_queue` functionality to raise ValueError, by passing 
queue name as None"""
+        hook = AdminClientHook(azure_service_bus_conn_id=self.conn_id)
+        with pytest.raises(TypeError):
+            hook.delete_queue(None)
+
+
+class TestMessageHook:
+    def setup_class(self) -> None:
+        self.queue_name: str = "test_queue"
+        self.conn_id: str = 'azure_service_bus_default'
+        self.connection_string = (
+            "Endpoint=sb://test-service-bus-provider.servicebus.windows.net/;"
+            "SharedAccessKeyName=Test;SharedAccessKey=1234566acbc"
+        )
+        self.conn = Connection(
+            conn_id='azure_service_bus_default',
+            conn_type='azure_service_bus',
+            schema=self.connection_string,
+        )
+
+    
@mock.patch("airflow.providers.microsoft.azure.hooks.asb.MessageHook.get_connection")
+    def test_get_service_bus_message_conn(self, mock_connection):
+        """
+        Test get_conn() function and check whether the get_conn() function 
returns value
+        is instance of ServiceBusClient
+        """
+        mock_connection.return_value = self.conn
+        hook = MessageHook(azure_service_bus_conn_id=self.conn_id)
+        assert isinstance(hook.get_conn(), ServiceBusClient)
+
+    @pytest.mark.parametrize(
+        "mock_message, mock_batch_flag",
+        [
+            (MESSAGE, True),
+            (MESSAGE, False),
+            (MESSAGE_LIST, True),
+            (MESSAGE_LIST, False),
+        ],
+    )
+    
@mock.patch('airflow.providers.microsoft.azure.hooks.asb.MessageHook.send_list_messages')
+    
@mock.patch('airflow.providers.microsoft.azure.hooks.asb.MessageHook.send_batch_message')
+    
@mock.patch('airflow.providers.microsoft.azure.hooks.asb.MessageHook.get_conn')
+    def test_send_message(
+        self, mock_sb_client, mock_batch_message, mock_list_message, 
mock_message, mock_batch_flag
+    ):
+        """
+        Test `send_message` hook function with batch flag and message passed 
as mocked params,
+        which can be string or list of string, mock the azure service bus 
`send_messages` function
+        """
+        hook = 
MessageHook(azure_service_bus_conn_id="azure_service_bus_default")
+        hook.send_message(
+            queue_name=self.queue_name, messages=mock_message, 
batch_message_flag=mock_batch_flag
+        )
+        if isinstance(mock_message, list):
+            if mock_batch_flag:
+                message = ServiceBusMessageBatch(mock_message)
+            else:
+                message = [ServiceBusMessage(msg) for msg in mock_message]
+        elif isinstance(mock_message, str):
+            if mock_batch_flag:
+                message = ServiceBusMessageBatch(mock_message)
+            else:
+                message = ServiceBusMessage(mock_message)
+
+        expected_calls = [
+            mock.call()
+            .__enter__()
+            .get_queue_sender(self.queue_name)
+            .__enter__()
+            .send_messages(message)
+            .__exit__()
+        ]
+        mock_sb_client.assert_has_calls(expected_calls, any_order=False)
+
+    
@mock.patch('airflow.providers.microsoft.azure.hooks.asb.MessageHook.get_conn')
+    def test_send_message_exception(self, mock_sb_client):
+        """
+        Test `send_message` functionality to raise AirflowException in Azure 
MessageHook
+        by passing queue name as None
+        """
+        hook = MessageHook(azure_service_bus_conn_id=self.conn_id)
+        with pytest.raises(TypeError):
+            hook.send_message(queue_name=None, messages="", 
batch_message_flag=False)
+
+    @mock.patch('azure.servicebus.ServiceBusMessage')
+    
@mock.patch('airflow.providers.microsoft.azure.hooks.asb.MessageHook.get_conn')
+    def test_receive_message(self, mock_sb_client, mock_service_bus_message):
+        """
+        Test `receive_message` hook function and assert the function with mock 
value,
+        mock the azure service bus `receive_messages` function
+        """
+        hook = MessageHook(azure_service_bus_conn_id=self.conn_id)
+        
mock_sb_client.return_value.get_queue_receiver.return_value.receive_messages.return_value
 = [
+            mock_service_bus_message
+        ]
+        hook.receive_message(self.queue_name)
+        expected_calls = [
+            mock.call()
+            .__enter__()
+            .get_queue_receiver(self.queue_name)
+            .__enter__()
+            .receive_messages(max_message_count=30, max_wait_time=5)
+            .get_queue_receiver(self.queue_name)
+            .__exit__()
+            .mock_call()
+            .__exit__
+        ]
+        mock_sb_client.assert_has_calls(expected_calls)
+
+    
@mock.patch('airflow.providers.microsoft.azure.hooks.asb.MessageHook.get_conn')
+    def test_receive_message_exception(self, mock_sb_client):
+        """
+        Test `receive_message` functionality to raise AirflowException in 
Azure MessageHook
+        by passing queue name as None
+        """
+        hook = MessageHook(azure_service_bus_conn_id=self.conn_id)
+        with pytest.raises(TypeError):
+            hook.receive_message(None)
diff --git a/tests/providers/microsoft/azure/operators/test_asb.py 
b/tests/providers/microsoft/azure/operators/test_asb.py
new file mode 100644
index 0000000000..44fb630434
--- /dev/null
+++ b/tests/providers/microsoft/azure/operators/test_asb.py
@@ -0,0 +1,195 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import mock
+
+import pytest
+from azure.servicebus import ServiceBusMessage
+
+from airflow.providers.microsoft.azure.operators.asb import (
+    AzureServiceBusCreateQueueOperator,
+    AzureServiceBusDeleteQueueOperator,
+    AzureServiceBusReceiveMessageOperator,
+    AzureServiceBusSendMessageOperator,
+)
+
+QUEUE_NAME = "test_queue"
+MESSAGE = "Test Message"
+MESSAGE_LIST = [MESSAGE + " " + str(n) for n in range(0, 10)]
+
+
+class TestAzureServiceBusCreateQueueOperator:
+    @pytest.mark.parametrize(
+        "mock_dl_msg_expiration, mock_batched_operation",
+        [
+            (True, True),
+            (True, False),
+            (False, True),
+            (False, False),
+        ],
+    )
+    def test_init(self, mock_dl_msg_expiration, mock_batched_operation):
+        """
+        Test init by creating AzureServiceBusCreateQueueOperator with task id,
+        queue_name and asserting with value
+        """
+        asb_create_queue_operator = AzureServiceBusCreateQueueOperator(
+            task_id="asb_create_queue",
+            queue_name=QUEUE_NAME,
+            max_delivery_count=10,
+            dead_lettering_on_message_expiration=mock_dl_msg_expiration,
+            enable_batched_operations=mock_batched_operation,
+        )
+        assert asb_create_queue_operator.task_id == "asb_create_queue"
+        assert asb_create_queue_operator.queue_name == QUEUE_NAME
+        assert asb_create_queue_operator.max_delivery_count == 10
+        assert asb_create_queue_operator.dead_lettering_on_message_expiration 
is mock_dl_msg_expiration
+        assert asb_create_queue_operator.enable_batched_operations is 
mock_batched_operation
+
+    
@mock.patch("airflow.providers.microsoft.azure.hooks.asb.AdminClientHook.get_conn")
+    def test_create_queue(self, mock_get_conn):
+        """
+        Test AzureServiceBusCreateQueueOperator passed with the queue name,
+        mocking the connection details, hook create_queue function
+        """
+        asb_create_queue_operator = AzureServiceBusCreateQueueOperator(
+            task_id="asb_create_queue_operator",
+            queue_name=QUEUE_NAME,
+            max_delivery_count=10,
+            dead_lettering_on_message_expiration=True,
+            enable_batched_operations=True,
+        )
+        asb_create_queue_operator.execute(None)
+        
mock_get_conn.return_value.__enter__.return_value.create_queue.assert_called_once_with(
+            QUEUE_NAME,
+            max_delivery_count=10,
+            dead_lettering_on_message_expiration=True,
+            enable_batched_operations=True,
+        )
+
+
+class TestAzureServiceBusDeleteQueueOperator:
+    def test_init(self):
+        """
+        Test init by creating AzureServiceBusDeleteQueueOperator with task id, 
queue_name and asserting
+        with values
+        """
+        asb_delete_queue_operator = AzureServiceBusDeleteQueueOperator(
+            task_id="asb_delete_queue",
+            queue_name=QUEUE_NAME,
+        )
+        assert asb_delete_queue_operator.task_id == "asb_delete_queue"
+        assert asb_delete_queue_operator.queue_name == QUEUE_NAME
+
+    
@mock.patch("airflow.providers.microsoft.azure.hooks.asb.AdminClientHook.get_conn")
+    def test_delete_queue(self, mock_get_conn):
+        """Test AzureServiceBusDeleteQueueOperator by mocking queue name, 
connection and hook delete_queue"""
+        asb_delete_queue_operator = AzureServiceBusDeleteQueueOperator(
+            task_id="asb_delete_queue",
+            queue_name=QUEUE_NAME,
+        )
+        asb_delete_queue_operator.execute(None)
+        
mock_get_conn.return_value.__enter__.return_value.delete_queue.assert_called_once_with(QUEUE_NAME)
+
+
+class TestAzureServiceBusSendMessageOperator:
+    @pytest.mark.parametrize(
+        "mock_message, mock_batch_flag",
+        [
+            (MESSAGE, True),
+            (MESSAGE, False),
+            (MESSAGE_LIST, True),
+            (MESSAGE_LIST, False),
+        ],
+    )
+    def test_init(self, mock_message, mock_batch_flag):
+        """
+        Test init by creating AzureServiceBusSendMessageOperator with task id, 
queue_name, message,
+        batch and asserting with values
+        """
+        asb_send_message_queue_operator = AzureServiceBusSendMessageOperator(
+            task_id="asb_send_message_queue_without_batch",
+            queue_name=QUEUE_NAME,
+            message=mock_message,
+            batch=mock_batch_flag,
+        )
+        assert asb_send_message_queue_operator.task_id == 
"asb_send_message_queue_without_batch"
+        assert asb_send_message_queue_operator.queue_name == QUEUE_NAME
+        assert asb_send_message_queue_operator.message == mock_message
+        assert asb_send_message_queue_operator.batch is mock_batch_flag
+
+    
@mock.patch("airflow.providers.microsoft.azure.hooks.asb.MessageHook.get_conn")
+    def test_send_message_queue(self, mock_get_conn):
+        """
+        Test AzureServiceBusSendMessageOperator with queue name, batch boolean 
flag, mock
+        the send_messages of azure service bus function
+        """
+        asb_send_message_queue_operator = AzureServiceBusSendMessageOperator(
+            task_id="asb_send_message_queue",
+            queue_name=QUEUE_NAME,
+            message="Test message",
+            batch=False,
+        )
+        asb_send_message_queue_operator.execute(None)
+        expected_calls = [
+            mock.call()
+            .__enter__()
+            .get_queue_sender(QUEUE_NAME)
+            .__enter__()
+            .send_messages(ServiceBusMessage("Test message"))
+            .__exit__()
+        ]
+        mock_get_conn.assert_has_calls(expected_calls, any_order=False)
+
+
+class TestAzureServiceBusReceiveMessageOperator:
+    def test_init(self):
+        """
+        Test init by creating AzureServiceBusReceiveMessageOperator with task 
id, queue_name, message,
+        batch and asserting with values
+        """
+
+        asb_receive_queue_operator = AzureServiceBusReceiveMessageOperator(
+            task_id="asb_receive_message_queue",
+            queue_name=QUEUE_NAME,
+        )
+        assert asb_receive_queue_operator.task_id == 
"asb_receive_message_queue"
+        assert asb_receive_queue_operator.queue_name == QUEUE_NAME
+
+    
@mock.patch("airflow.providers.microsoft.azure.hooks.asb.MessageHook.get_conn")
+    def test_receive_message_queue(self, mock_get_conn):
+        """
+        Test AzureServiceBusReceiveMessageOperator by mock connection, values
+        and the service bus receive message
+        """
+        asb_receive_queue_operator = AzureServiceBusReceiveMessageOperator(
+            task_id="asb_receive_message_queue",
+            queue_name=QUEUE_NAME,
+        )
+        asb_receive_queue_operator.execute(None)
+        expected_calls = [
+            mock.call()
+            .__enter__()
+            .get_queue_receiver(QUEUE_NAME)
+            .__enter__()
+            .receive_messages(max_message_count=10, max_wait_time=5)
+            .get_queue_receiver(QUEUE_NAME)
+            .__exit__()
+            .mock_call()
+            .__exit__
+        ]
+        mock_get_conn.assert_has_calls(expected_calls)
diff --git 
a/tests/system/providers/microsoft/azure/example_azure_service_bus.py 
b/tests/system/providers/microsoft/azure/example_azure_service_bus.py
new file mode 100644
index 0000000000..d754991690
--- /dev/null
+++ b/tests/system/providers/microsoft/azure/example_azure_service_bus.py
@@ -0,0 +1,115 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+from datetime import datetime, timedelta
+
+from airflow import DAG
+from airflow.models.baseoperator import chain
+from airflow.providers.microsoft.azure.operators.asb import (
+    AzureServiceBusCreateQueueOperator,
+    AzureServiceBusDeleteQueueOperator,
+    AzureServiceBusReceiveMessageOperator,
+    AzureServiceBusSendMessageOperator,
+)
+
+EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
+
+CLIENT_ID = os.getenv("CLIENT_ID", "")
+QUEUE_NAME = "sb_mgmt_queue_test"
+MESSAGE = "Test Message"
+MESSAGE_LIST = [MESSAGE + " " + str(n) for n in range(0, 10)]
+
+with DAG(
+    dag_id="example_azure_service_bus_queue",
+    start_date=datetime(2021, 8, 13),
+    schedule_interval=None,
+    catchup=False,
+    default_args={
+        "execution_timeout": timedelta(hours=EXECUTION_TIMEOUT),
+        "azure_service_bus_conn_id": "azure_service_bus_default",
+    },
+    tags=["example", "Azure service bus Queue"],
+) as dag:
+    # [START howto_operator_create_service_bus_queue]
+    create_service_bus_queue = AzureServiceBusCreateQueueOperator(
+        task_id="create_service_bus_queue",
+        queue_name=QUEUE_NAME,
+    )
+    # [END howto_operator_create_service_bus_queue]
+
+    # [START howto_operator_send_message_to_service_bus_queue]
+    send_message_to_service_bus_queue = AzureServiceBusSendMessageOperator(
+        task_id="send_message_to_service_bus_queue",
+        message=MESSAGE,
+        queue_name=QUEUE_NAME,
+        batch=False,
+    )
+    # [END howto_operator_send_message_to_service_bus_queue]
+
+    # [START howto_operator_send_list_message_to_service_bus_queue]
+    send_list_message_to_service_bus_queue = 
AzureServiceBusSendMessageOperator(
+        task_id="send_list_message_to_service_bus_queue",
+        message=MESSAGE_LIST,
+        queue_name=QUEUE_NAME,
+        batch=False,
+    )
+    # [END howto_operator_send_list_message_to_service_bus_queue]
+
+    # [START howto_operator_send_batch_message_to_service_bus_queue]
+    send_batch_message_to_service_bus_queue = 
AzureServiceBusSendMessageOperator(
+        task_id="send_batch_message_to_service_bus_queue",
+        message=MESSAGE_LIST,
+        queue_name=QUEUE_NAME,
+        batch=True,
+    )
+    # [END howto_operator_send_batch_message_to_service_bus_queue]
+
+    # [START howto_operator_receive_message_service_bus_queue]
+    receive_message_service_bus_queue = AzureServiceBusReceiveMessageOperator(
+        task_id="receive_message_service_bus_queue",
+        queue_name=QUEUE_NAME,
+        max_message_count=20,
+        max_wait_time=5,
+    )
+    # [END howto_operator_receive_message_service_bus_queue]
+
+    # [START howto_operator_delete_service_bus_queue]
+    delete_service_bus_queue = AzureServiceBusDeleteQueueOperator(
+        task_id="delete_service_bus_queue", queue_name=QUEUE_NAME, 
trigger_rule="all_done"
+    )
+    # [END howto_operator_delete_service_bus_queue]
+
+    chain(
+        create_service_bus_queue,
+        send_message_to_service_bus_queue,
+        send_list_message_to_service_bus_queue,
+        send_batch_message_to_service_bus_queue,
+        receive_message_service_bus_queue,
+        delete_service_bus_queue,
+    )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)

Reply via email to