This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5c7c518aa0 Implement Azure Service Bus Topic Create, Delete Operators 
(#25436)
5c7c518aa0 is described below

commit 5c7c518aa065bba873bc95d5764658faa9e81b63
Author: Bharanidharan <[email protected]>
AuthorDate: Tue Aug 16 21:41:00 2022 +0530

    Implement Azure Service Bus Topic Create, Delete Operators (#25436)
    
    - Added Create Topic Operator
    - Added Test case
    - Added Example DAG
    - Added Doc for the operator
    
    Implemented Azure service bus Topic Delete operator
    
    - Added Operator for Delete Topic Operator
    - Added example DAG
---
 airflow/providers/microsoft/azure/operators/asb.py | 165 ++++++++++++++++++++-
 .../operators/asb.rst                              |  37 +++++
 .../microsoft/azure/operators/test_asb.py          | 104 +++++++++++++
 .../microsoft/azure/example_azure_service_bus.py   |  17 +++
 4 files changed, 322 insertions(+), 1 deletion(-)

diff --git a/airflow/providers/microsoft/azure/operators/asb.py 
b/airflow/providers/microsoft/azure/operators/asb.py
index 2e7599c469..3a6bc2800e 100644
--- a/airflow/providers/microsoft/azure/operators/asb.py
+++ b/airflow/providers/microsoft/azure/operators/asb.py
@@ -15,12 +15,14 @@
 # specific language governing permissions and limitations
 # under the License.
 import datetime
-from typing import TYPE_CHECKING, List, Optional, Sequence, Union
+from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Union
 
 from airflow.models import BaseOperator
 from airflow.providers.microsoft.azure.hooks.asb import AdminClientHook, 
MessageHook
 
 if TYPE_CHECKING:
+    from azure.servicebus.management._models import AuthorizationRule
+
     from airflow.utils.context import Context
 
 
@@ -206,6 +208,125 @@ class AzureServiceBusDeleteQueueOperator(BaseOperator):
         hook.delete_queue(self.queue_name)
 
 
+class AzureServiceBusTopicCreateOperator(BaseOperator):
+    """
+    Create an Azure Service Bus Topic under a Service Bus Namespace by using 
ServiceBusAdministrationClient
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:AzureServiceBusTopicCreateOperator`
+
+    :param topic_name: Name of the topic.
+    :param default_message_time_to_live: ISO 8601 default message time span to 
live value. This is
+     the duration after which the message expires, starting from when the 
message is sent to Service
+     Bus. This is the default value used when TimeToLive is not set on a 
message itself.
+     Input value of either type ~datetime.timedelta or string in ISO 8601 
duration format
+     like "PT300S" is accepted.
+    :param max_size_in_megabytes: The maximum size of the topic in megabytes, 
which is the size of
+     memory allocated for the topic.
+    :param requires_duplicate_detection: A value indicating if this topic 
requires duplicate
+     detection.
+    :param duplicate_detection_history_time_window: ISO 8601 time span 
structure that defines the
+     duration of the duplicate detection history. The default value is 10 
minutes.
+     Input value of either type ~datetime.timedelta or string in ISO 8601 
duration format
+     like "PT300S" is accepted.
+    :param enable_batched_operations: Value that indicates whether server-side 
batched operations
+     are enabled.
+    :param size_in_bytes: The size of the topic, in bytes.
+    :param filtering_messages_before_publishing: Filter messages before 
publishing.
+    :param authorization_rules: List of Authorization rules for resource.
+    :param support_ordering: A value that indicates whether the topic supports 
ordering.
+    :param auto_delete_on_idle: ISO 8601 time span idle interval after which 
the topic is
+     automatically deleted. The minimum duration is 5 minutes.
+     Input value of either type ~datetime.timedelta or string in ISO 8601 
duration format
+     like "PT300S" is accepted.
+    :param enable_partitioning: A value that indicates whether the topic is to 
be partitioned
+     across multiple message brokers.
+    :param enable_express: A value that indicates whether Express Entities are 
enabled. An express
+     queue holds a message in memory temporarily before writing it to 
persistent storage.
+    :param user_metadata: Metadata associated with the topic.
+    :param max_message_size_in_kilobytes: The maximum size in kilobytes of 
message payload that
+     can be accepted by the queue. This feature is only available when using a 
Premium namespace
+     and Service Bus API version "2021-05" or higher.
+     The minimum allowed value is 1024 while the maximum allowed value is 
102400. Default value is 1024.
+    """
+
+    template_fields: Sequence[str] = ("topic_name",)
+    ui_color = "#e4f0e8"
+
+    def __init__(
+        self,
+        *,
+        topic_name: str,
+        azure_service_bus_conn_id: str = 'azure_service_bus_default',
+        default_message_time_to_live: Optional[Union[datetime.timedelta, str]] 
= None,
+        max_size_in_megabytes: Optional[int] = None,
+        requires_duplicate_detection: Optional[bool] = None,
+        duplicate_detection_history_time_window: 
Optional[Union[datetime.timedelta, str]] = None,
+        enable_batched_operations: Optional[bool] = None,
+        size_in_bytes: Optional[int] = None,
+        filtering_messages_before_publishing: Optional[bool] = None,
+        authorization_rules: Optional[List["AuthorizationRule"]] = None,
+        support_ordering: Optional[bool] = None,
+        auto_delete_on_idle: Optional[Union[datetime.timedelta, str]] = None,
+        enable_partitioning: Optional[bool] = None,
+        enable_express: Optional[bool] = None,
+        user_metadata: Optional[str] = None,
+        max_message_size_in_kilobytes: Optional[int] = None,
+        **kwargs: Any,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.topic_name = topic_name
+        self.azure_service_bus_conn_id = azure_service_bus_conn_id
+        self.default_message_time_to_live = default_message_time_to_live
+        self.max_size_in_megabytes = max_size_in_megabytes
+        self.requires_duplicate_detection = requires_duplicate_detection
+        self.duplicate_detection_history_time_window = 
duplicate_detection_history_time_window
+        self.enable_batched_operations = enable_batched_operations
+        self.size_in_bytes = size_in_bytes
+        self.filtering_messages_before_publishing = 
filtering_messages_before_publishing
+        self.authorization_rules = authorization_rules
+        self.support_ordering = support_ordering
+        self.auto_delete_on_idle = auto_delete_on_idle
+        self.enable_partitioning = enable_partitioning
+        self.enable_express = enable_express
+        self.user_metadata = user_metadata
+        self.max_message_size_in_kilobytes = max_message_size_in_kilobytes
+
+    def execute(self, context: "Context") -> str:
+        """Creates Topic in Service Bus namespace, by connecting to Service 
Bus Admin client"""
+        if self.topic_name is None:
+            raise TypeError("Topic name cannot be None.")
+
+        # Create the hook
+        hook = 
AdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)
+
+        with hook.get_conn() as service_mgmt_conn:
+            topic_properties = service_mgmt_conn.get_topic(self.topic_name)
+            if topic_properties and topic_properties.name == self.topic_name:
+                self.log.info("Topic name already exists")
+                return topic_properties.name
+            topic = service_mgmt_conn.create_topic(
+                topic_name=self.topic_name,
+                default_message_time_to_live=self.default_message_time_to_live,
+                max_size_in_megabytes=self.max_size_in_megabytes,
+                requires_duplicate_detection=self.requires_duplicate_detection,
+                
duplicate_detection_history_time_window=self.duplicate_detection_history_time_window,
+                enable_batched_operations=self.enable_batched_operations,
+                size_in_bytes=self.size_in_bytes,
+                
filtering_messages_before_publishing=self.filtering_messages_before_publishing,
+                authorization_rules=self.authorization_rules,
+                support_ordering=self.support_ordering,
+                auto_delete_on_idle=self.auto_delete_on_idle,
+                enable_partitioning=self.enable_partitioning,
+                enable_express=self.enable_express,
+                user_metadata=self.user_metadata,
+                
max_message_size_in_kilobytes=self.max_message_size_in_kilobytes,
+            )
+            self.log.info("Created Topic %s", topic.name)
+            return topic.name
+
+
 class AzureServiceBusSubscriptionCreateOperator(BaseOperator):
     """
     Create an Azure Service Bus Topic Subscription under a Service Bus 
Namespace
@@ -467,3 +588,45 @@ class 
AzureServiceBusSubscriptionDeleteOperator(BaseOperator):
 
         # delete subscription with name
         hook.delete_subscription(self.subscription_name, self.topic_name)
+
+
+class AzureServiceBusTopicDeleteOperator(BaseOperator):
+    """
+    Deletes the topic in the Azure Service Bus namespace
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:AzureServiceBusTopicDeleteOperator`
+
+    :param topic_name: Name of the topic to be deleted.
+    :param azure_service_bus_conn_id: Reference to the
+        :ref:`Azure Service Bus connection 
<howto/connection:azure_service_bus>`.
+    """
+
+    template_fields: Sequence[str] = ("topic_name",)
+    ui_color = "#e4f0e8"
+
+    def __init__(
+        self,
+        *,
+        topic_name: str,
+        azure_service_bus_conn_id: str = 'azure_service_bus_default',
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.topic_name = topic_name
+        self.azure_service_bus_conn_id = azure_service_bus_conn_id
+
+    def execute(self, context: "Context") -> None:
+        """Delete topic in Service Bus namespace, by connecting to Service Bus 
Admin client"""
+        if self.topic_name is None:
+            raise TypeError("Topic name cannot be None.")
+        hook = 
AdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)
+
+        with hook.get_conn() as service_mgmt_conn:
+            topic_properties = service_mgmt_conn.get_topic(self.topic_name)
+            if topic_properties and topic_properties.name == self.topic_name:
+                service_mgmt_conn.delete_topic(self.topic_name)
+                self.log.info("Topic %s deleted.", self.topic_name)
+            else:
+                self.log.info("Topic %s does not exist.", self.topic_name)
diff --git a/docs/apache-airflow-providers-microsoft-azure/operators/asb.rst 
b/docs/apache-airflow-providers-microsoft-azure/operators/asb.rst
index 0614d13224..5ad6962418 100644
--- a/docs/apache-airflow-providers-microsoft-azure/operators/asb.rst
+++ b/docs/apache-airflow-providers-microsoft-azure/operators/asb.rst
@@ -98,6 +98,43 @@ Below is an example of using this operator to execute an 
Azure Service Bus Delet
     :start-after: [START howto_operator_delete_service_bus_queue]
     :end-before: [END howto_operator_delete_service_bus_queue]
 
+Azure Service Bus Topic Operators
+-----------------------------------------
+Azure Service Bus Topic based Operators helps to interact with topic in 
service bus namespace
+and it helps to Create, Delete operation for topic.
+
+.. _howto/operator:AzureServiceBusTopicCreateOperator:
+
+Create Azure Service Bus Topic
+======================================
+
+To create Azure service bus topic with specific Parameter you can use
+:class:`~airflow.providers.microsoft.azure.operators.asb.AzureServiceBusTopicCreateOperator`.
+
+Below is an example of using this operator to execute an Azure Service Bus 
Create Topic.
+
+.. exampleinclude:: 
/../../tests/system/providers/microsoft/azure/example_azure_service_bus.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_create_service_bus_topic]
+    :end-before: [END howto_operator_create_service_bus_topic]
+
+.. _howto/operator:AzureServiceBusTopicDeleteOperator:
+
+Delete Azure Service Bus Topic
+======================================
+
+To Delete the Azure service bus topic you can use
+:class:`~airflow.providers.microsoft.azure.operators.asb.AzureServiceBusTopicDeleteOperator`.
+
+Below is an example of using this operator to execute an Azure Service Bus 
Delete topic.
+
+.. exampleinclude:: 
/../../tests/system/providers/microsoft/azure/example_azure_service_bus.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_delete_service_bus_topic]
+    :end-before: [END howto_operator_delete_service_bus_topic]
+
 Azure Service Bus Subscription Operators
 -----------------------------------------
 Azure Service Bus Subscription based Operators helps to interact topic 
Subscription in service bus namespace
diff --git a/tests/providers/microsoft/azure/operators/test_asb.py 
b/tests/providers/microsoft/azure/operators/test_asb.py
index 25a79cbf12..d2bdcc90eb 100644
--- a/tests/providers/microsoft/azure/operators/test_asb.py
+++ b/tests/providers/microsoft/azure/operators/test_asb.py
@@ -28,6 +28,8 @@ from airflow.providers.microsoft.azure.operators.asb import (
     AzureServiceBusSendMessageOperator,
     AzureServiceBusSubscriptionCreateOperator,
     AzureServiceBusSubscriptionDeleteOperator,
+    AzureServiceBusTopicCreateOperator,
+    AzureServiceBusTopicDeleteOperator,
     AzureServiceBusUpdateSubscriptionOperator,
 )
 
@@ -204,6 +206,51 @@ class TestAzureServiceBusReceiveMessageOperator:
         mock_get_conn.assert_has_calls(expected_calls)
 
 
+class TestABSTopicCreateOperator:
+    def test_init(self):
+        """
+        Test init by creating AzureServiceBusTopicCreateOperator with task id 
and topic name,
+        by asserting the value
+        """
+        asb_create_topic = AzureServiceBusTopicCreateOperator(
+            task_id="asb_create_topic",
+            topic_name=TOPIC_NAME,
+        )
+        assert asb_create_topic.task_id == "asb_create_topic"
+        assert asb_create_topic.topic_name == TOPIC_NAME
+
+    
@mock.patch("airflow.providers.microsoft.azure.hooks.asb.AdminClientHook.get_conn")
+    @mock.patch('azure.servicebus.management.TopicProperties')
+    def test_create_topic(self, mock_topic_properties, mock_get_conn):
+        """
+        Test AzureServiceBusTopicCreateOperator passed with the topic name
+        mocking the connection details, hook create_topic function
+        """
+        asb_create_topic = AzureServiceBusTopicCreateOperator(
+            task_id="asb_create_topic",
+            topic_name=TOPIC_NAME,
+        )
+        mock_topic_properties.name = TOPIC_NAME
+        
mock_get_conn.return_value.__enter__.return_value.create_topic.return_value = 
mock_topic_properties
+
+        with mock.patch.object(asb_create_topic.log, "info") as mock_log_info:
+            asb_create_topic.execute(None)
+        mock_log_info.assert_called_with("Created Topic %s", TOPIC_NAME)
+
+    @mock.patch('airflow.providers.microsoft.azure.hooks.asb.AdminClientHook')
+    def test_create_subscription_exception(self, mock_sb_admin_client):
+        """
+        Test `AzureServiceBusTopicCreateOperator` functionality to raise 
AirflowException,
+         by passing topic name as None and pytest raise Airflow Exception
+        """
+        asb_create_topic_exception = AzureServiceBusTopicCreateOperator(
+            task_id="create_service_bus_subscription",
+            topic_name=None,
+        )
+        with pytest.raises(TypeError):
+            asb_create_topic_exception.execute(None)
+
+
 class TestASBCreateSubscriptionOperator:
     def test_init(self):
         """
@@ -377,3 +424,60 @@ class TestASBSubscriptionReceiveMessageOperator:
             .__exit__
         ]
         mock_get_conn.assert_has_calls(expected_calls)
+
+
+class TestASBTopicDeleteOperator:
+    def test_init(self):
+        """
+        Test init by creating AzureServiceBusTopicDeleteOperator with task id, 
topic name and asserting
+        with values
+        """
+        asb_delete_topic_operator = AzureServiceBusTopicDeleteOperator(
+            task_id="asb_delete_topic",
+            topic_name=TOPIC_NAME,
+        )
+        assert asb_delete_topic_operator.task_id == "asb_delete_topic"
+        assert asb_delete_topic_operator.topic_name == TOPIC_NAME
+
+    
@mock.patch("airflow.providers.microsoft.azure.hooks.asb.AdminClientHook.get_conn")
+    @mock.patch('azure.servicebus.management.TopicProperties')
+    def test_delete_topic(self, mock_topic_properties, mock_get_conn):
+        """
+        Test AzureServiceBusTopicDeleteOperator by mocking topic name, 
connection
+        """
+        asb_delete_topic = AzureServiceBusTopicDeleteOperator(
+            task_id="asb_delete_topic",
+            topic_name=TOPIC_NAME,
+        )
+        mock_topic_properties.name = TOPIC_NAME
+        
mock_get_conn.return_value.__enter__.return_value.get_topic.return_value = 
mock_topic_properties
+        with mock.patch.object(asb_delete_topic.log, "info") as mock_log_info:
+            asb_delete_topic.execute(None)
+        mock_log_info.assert_called_with("Topic %s deleted.", TOPIC_NAME)
+
+    
@mock.patch("airflow.providers.microsoft.azure.hooks.asb.AdminClientHook.get_conn")
+    def test_delete_topic_not_exists(self, mock_get_conn):
+        """
+        Test AzureServiceBusTopicDeleteOperator by mocking topic name, 
connection
+        """
+        asb_delete_topic_not_exists = AzureServiceBusTopicDeleteOperator(
+            task_id="asb_delete_topic_not_exists",
+            topic_name=TOPIC_NAME,
+        )
+        
mock_get_conn.return_value.__enter__.return_value.get_topic.return_value = None
+        with mock.patch.object(asb_delete_topic_not_exists.log, "info") as 
mock_log_info:
+            asb_delete_topic_not_exists.execute(None)
+        mock_log_info.assert_called_with("Topic %s does not exist.", 
TOPIC_NAME)
+
+    @mock.patch('airflow.providers.microsoft.azure.hooks.asb.AdminClientHook')
+    def test_delete_topic_exception(self, mock_sb_admin_client):
+        """
+        Test `delete_topic` functionality to raise AirflowException,
+         by passing topic name as None and pytest raise Airflow Exception
+        """
+        asb_delete_topic_exception = AzureServiceBusTopicDeleteOperator(
+            task_id="delete_service_bus_subscription",
+            topic_name=None,
+        )
+        with pytest.raises(TypeError):
+            asb_delete_topic_exception.execute(None)
diff --git 
a/tests/system/providers/microsoft/azure/example_azure_service_bus.py 
b/tests/system/providers/microsoft/azure/example_azure_service_bus.py
index 6295d28b72..e4e736d8eb 100644
--- a/tests/system/providers/microsoft/azure/example_azure_service_bus.py
+++ b/tests/system/providers/microsoft/azure/example_azure_service_bus.py
@@ -28,6 +28,8 @@ from airflow.providers.microsoft.azure.operators.asb import (
     AzureServiceBusSendMessageOperator,
     AzureServiceBusSubscriptionCreateOperator,
     AzureServiceBusSubscriptionDeleteOperator,
+    AzureServiceBusTopicCreateOperator,
+    AzureServiceBusTopicDeleteOperator,
     AzureServiceBusUpdateSubscriptionOperator,
 )
 
@@ -94,6 +96,12 @@ with DAG(
     )
     # [END howto_operator_receive_message_service_bus_queue]
 
+    # [START howto_operator_create_service_bus_topic]
+    create_service_bus_topic = AzureServiceBusTopicCreateOperator(
+        task_id="create_service_bus_topic", topic_name=TOPIC_NAME
+    )
+    # [END howto_operator_create_service_bus_topic]
+
     # [START howto_operator_create_service_bus_subscription]
     create_service_bus_subscription = 
AzureServiceBusSubscriptionCreateOperator(
         task_id="create_service_bus_subscription",
@@ -129,6 +137,13 @@ with DAG(
     )
     # [END howto_operator_delete_service_bus_subscription]
 
+    # [START howto_operator_delete_service_bus_topic]
+    delete_asb_topic = AzureServiceBusTopicDeleteOperator(
+        task_id="delete_asb_topic",
+        topic_name=TOPIC_NAME,
+    )
+    # [END howto_operator_delete_service_bus_topic]
+
     # [START howto_operator_delete_service_bus_queue]
     delete_service_bus_queue = AzureServiceBusDeleteQueueOperator(
         task_id="delete_service_bus_queue", queue_name=QUEUE_NAME, 
trigger_rule="all_done"
@@ -137,6 +152,7 @@ with DAG(
 
     chain(
         create_service_bus_queue,
+        create_service_bus_topic,
         create_service_bus_subscription,
         send_message_to_service_bus_queue,
         send_list_message_to_service_bus_queue,
@@ -145,6 +161,7 @@ with DAG(
         update_service_bus_subscription,
         receive_message_service_bus_subscription,
         delete_service_bus_subscription,
+        delete_asb_topic,
         delete_service_bus_queue,
     )
 

Reply via email to