This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new cda0e9e9bee Move create topic from 
`AzureServiceBusTopicCreateOperator` to `AdminClientHook` (#45297)
cda0e9e9bee is described below

commit cda0e9e9bee23a1d04cc18060bb496c7fffc43da
Author: perry2of5 <[email protected]>
AuthorDate: Tue Dec 31 23:08:27 2024 -0800

    Move create topic from `AzureServiceBusTopicCreateOperator` to 
`AdminClientHook` (#45297)
    
    * Move create topic to hook in Azure Service Bus provider
    
    * Fix indenting of documentation
---
 .../airflow/providers/microsoft/azure/hooks/asb.py | 88 ++++++++++++++++++++++
 .../providers/microsoft/azure/operators/asb.py     | 46 +++++------
 providers/tests/microsoft/azure/hooks/test_asb.py  | 20 +++++
 .../tests/microsoft/azure/operators/test_asb.py    | 32 ++++++--
 4 files changed, 151 insertions(+), 35 deletions(-)

diff --git a/providers/src/airflow/providers/microsoft/azure/hooks/asb.py 
b/providers/src/airflow/providers/microsoft/azure/hooks/asb.py
index 504dbe16871..4a153f0c826 100644
--- a/providers/src/airflow/providers/microsoft/azure/hooks/asb.py
+++ b/providers/src/airflow/providers/microsoft/azure/hooks/asb.py
@@ -28,6 +28,7 @@ from azure.servicebus import (
     ServiceBusSender,
 )
 from azure.servicebus.management import (
+    AuthorizationRule,
     CorrelationRuleFilter,
     QueueProperties,
     ServiceBusAdministrationClient,
@@ -194,6 +195,93 @@ class AdminClientHook(BaseAzureServiceBusHook):
         with self.get_conn() as service_mgmt_conn:
             service_mgmt_conn.delete_queue(queue_name)
 
+    def create_topic(
+        self,
+        topic_name: str,
+        azure_service_bus_conn_id: str = "azure_service_bus_default",
+        default_message_time_to_live: datetime.timedelta | str | None = None,
+        max_size_in_megabytes: int | None = None,
+        requires_duplicate_detection: bool | None = None,
+        duplicate_detection_history_time_window: datetime.timedelta | str | 
None = None,
+        enable_batched_operations: bool | None = None,
+        size_in_bytes: int | None = None,
+        filtering_messages_before_publishing: bool | None = None,
+        authorization_rules: list[AuthorizationRule] | None = None,
+        support_ordering: bool | None = None,
+        auto_delete_on_idle: datetime.timedelta | str | None = None,
+        enable_partitioning: bool | None = None,
+        enable_express: bool | None = None,
+        user_metadata: str | None = None,
+        max_message_size_in_kilobytes: int | None = None,
+    ) -> str:
+        """
+        Create a topic by connecting to service Bus Admin client.
+
+        :param topic_name: Name of the topic.
+        :param default_message_time_to_live: ISO 8601 default message time 
span to live value. This is
+            the duration after which the message expires, starting from when 
the message is sent to Service
+            Bus. This is the default value used when TimeToLive is not set on 
a message itself.
+            Input value of either type ~datetime.timedelta or string in ISO 
8601 duration format
+            like "PT300S" is accepted.
+        :param max_size_in_megabytes: The maximum size of the topic in 
megabytes, which is the size of
+            memory allocated for the topic.
+        :param requires_duplicate_detection: A value indicating if this topic 
requires duplicate
+            detection.
+        :param duplicate_detection_history_time_window: ISO 8601 time span 
structure that defines the
+            duration of the duplicate detection history. The default value is 
10 minutes.
+            Input value of either type ~datetime.timedelta or string in ISO 
8601 duration format
+            like "PT300S" is accepted.
+        :param enable_batched_operations: Value that indicates whether 
server-side batched operations
+            are enabled.
+        :param size_in_bytes: The size of the topic, in bytes.
+        :param filtering_messages_before_publishing: Filter messages before 
publishing.
+        :param authorization_rules: List of Authorization rules for resource.
+        :param support_ordering: A value that indicates whether the topic 
supports ordering.
+        :param auto_delete_on_idle: ISO 8601 time span idle interval after 
which the topic is
+            automatically deleted. The minimum duration is 5 minutes.
+            Input value of either type ~datetime.timedelta or string in ISO 
8601 duration format
+            like "PT300S" is accepted.
+        :param enable_partitioning: A value that indicates whether the topic 
is to be partitioned
+            across multiple message brokers.
+        :param enable_express: A value that indicates whether Express Entities 
are enabled. An express
+            queue holds a message in memory temporarily before writing it to 
persistent storage.
+        :param user_metadata: Metadata associated with the topic.
+        :param max_message_size_in_kilobytes: The maximum size in kilobytes of 
message payload that
+            can be accepted by the queue. This feature is only available when 
using a Premium namespace
+            and Service Bus API version "2021-05" or higher.
+            The minimum allowed value is 1024 while the maximum allowed value 
is 102400. Default value is 1024.
+        """
+        if topic_name is None:
+            raise TypeError("Topic name cannot be None.")
+
+        with self.get_conn() as service_mgmt_conn:
+            try:
+                topic_properties = service_mgmt_conn.get_topic(topic_name)
+            except ResourceNotFoundError:
+                topic_properties = None
+            if topic_properties and topic_properties.name == topic_name:
+                self.log.info("Topic name already exists")
+                return topic_properties.name
+            topic = service_mgmt_conn.create_topic(
+                topic_name=topic_name,
+                default_message_time_to_live=default_message_time_to_live,
+                max_size_in_megabytes=max_size_in_megabytes,
+                requires_duplicate_detection=requires_duplicate_detection,
+                
duplicate_detection_history_time_window=duplicate_detection_history_time_window,
+                enable_batched_operations=enable_batched_operations,
+                size_in_bytes=size_in_bytes,
+                
filtering_messages_before_publishing=filtering_messages_before_publishing,
+                authorization_rules=authorization_rules,
+                support_ordering=support_ordering,
+                auto_delete_on_idle=auto_delete_on_idle,
+                enable_partitioning=enable_partitioning,
+                enable_express=enable_express,
+                user_metadata=user_metadata,
+                max_message_size_in_kilobytes=max_message_size_in_kilobytes,
+            )
+            self.log.info("Created Topic %s", topic.name)
+            return topic.name
+
     def create_subscription(
         self,
         topic_name: str,
diff --git a/providers/src/airflow/providers/microsoft/azure/operators/asb.py 
b/providers/src/airflow/providers/microsoft/azure/operators/asb.py
index 1c16a8115d7..ba3a3257b94 100644
--- a/providers/src/airflow/providers/microsoft/azure/operators/asb.py
+++ b/providers/src/airflow/providers/microsoft/azure/operators/asb.py
@@ -19,8 +19,6 @@ from __future__ import annotations
 from collections.abc import Sequence
 from typing import TYPE_CHECKING, Any, Callable
 
-from azure.core.exceptions import ResourceNotFoundError
-
 from airflow.models import BaseOperator
 from airflow.providers.microsoft.azure.hooks.asb import AdminClientHook, 
MessageHook
 
@@ -313,33 +311,23 @@ class AzureServiceBusTopicCreateOperator(BaseOperator):
         # Create the hook
         hook = 
AdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)
 
-        with hook.get_conn() as service_mgmt_conn:
-            try:
-                topic_properties = service_mgmt_conn.get_topic(self.topic_name)
-            except ResourceNotFoundError:
-                topic_properties = None
-            if topic_properties and topic_properties.name == self.topic_name:
-                self.log.info("Topic name already exists")
-                return topic_properties.name
-            topic = service_mgmt_conn.create_topic(
-                topic_name=self.topic_name,
-                default_message_time_to_live=self.default_message_time_to_live,
-                max_size_in_megabytes=self.max_size_in_megabytes,
-                requires_duplicate_detection=self.requires_duplicate_detection,
-                
duplicate_detection_history_time_window=self.duplicate_detection_history_time_window,
-                enable_batched_operations=self.enable_batched_operations,
-                size_in_bytes=self.size_in_bytes,
-                
filtering_messages_before_publishing=self.filtering_messages_before_publishing,
-                authorization_rules=self.authorization_rules,
-                support_ordering=self.support_ordering,
-                auto_delete_on_idle=self.auto_delete_on_idle,
-                enable_partitioning=self.enable_partitioning,
-                enable_express=self.enable_express,
-                user_metadata=self.user_metadata,
-                
max_message_size_in_kilobytes=self.max_message_size_in_kilobytes,
-            )
-            self.log.info("Created Topic %s", topic.name)
-            return topic.name
+        return hook.create_topic(
+            topic_name=self.topic_name,
+            default_message_time_to_live=self.default_message_time_to_live,
+            max_size_in_megabytes=self.max_size_in_megabytes,
+            requires_duplicate_detection=self.requires_duplicate_detection,
+            
duplicate_detection_history_time_window=self.duplicate_detection_history_time_window,
+            enable_batched_operations=self.enable_batched_operations,
+            size_in_bytes=self.size_in_bytes,
+            
filtering_messages_before_publishing=self.filtering_messages_before_publishing,
+            authorization_rules=self.authorization_rules,
+            support_ordering=self.support_ordering,
+            auto_delete_on_idle=self.auto_delete_on_idle,
+            enable_partitioning=self.enable_partitioning,
+            enable_express=self.enable_express,
+            user_metadata=self.user_metadata,
+            max_message_size_in_kilobytes=self.max_message_size_in_kilobytes,
+        )
 
 
 class AzureServiceBusSubscriptionCreateOperator(BaseOperator):
diff --git a/providers/tests/microsoft/azure/hooks/test_asb.py 
b/providers/tests/microsoft/azure/hooks/test_asb.py
index fc5e433058c..067e79bf570 100644
--- a/providers/tests/microsoft/azure/hooks/test_asb.py
+++ b/providers/tests/microsoft/azure/hooks/test_asb.py
@@ -119,6 +119,26 @@ class TestAdminClientHook:
         with pytest.raises(TypeError):
             hook.delete_queue(None)
 
+    # Test creating a topic using hook method `create_topic`
+    @mock.patch("azure.servicebus.management.TopicProperties")
+    @mock.patch(f"{MODULE}.AdminClientHook.get_conn")
+    def test_create_topic(self, mock_sb_admin_client, mock_topic_properties):
+        """
+        Test `create_topic` hook function with mocking connection, topic 
properties value and
+        the azure service bus `create_topic` function
+        """
+        topic_name = "test_topic_name"
+        mock_topic_properties.name = topic_name
+        
mock_sb_admin_client.return_value.__enter__.return_value.create_topic.return_value
 = (
+            mock_topic_properties
+        )
+        hook = AdminClientHook(azure_service_bus_conn_id=self.conn_id)
+        with mock.patch.object(hook.log, "info") as mock_log_info:
+            hook.create_topic(topic_name)
+        assert mock_topic_properties.name == topic_name
+
+        mock_log_info.assert_called_with("Created Topic %s", topic_name)
+
     # Test creating subscription with topic name and subscription name using 
hook method `create_subscription`
     @mock.patch("azure.servicebus.management.SubscriptionProperties")
     @mock.patch(f"{MODULE}.AdminClientHook.get_conn")
diff --git a/providers/tests/microsoft/azure/operators/test_asb.py 
b/providers/tests/microsoft/azure/operators/test_asb.py
index 7e0c953890c..145d2e72940 100644
--- a/providers/tests/microsoft/azure/operators/test_asb.py
+++ b/providers/tests/microsoft/azure/operators/test_asb.py
@@ -255,19 +255,39 @@ class TestABSTopicCreateOperator:
     @mock.patch("azure.servicebus.management.TopicProperties")
     def test_create_topic(self, mock_topic_properties, mock_get_conn):
         """
-        Test AzureServiceBusTopicCreateOperator passed with the topic name
-        mocking the connection details, hook create_topic function
+        Test AzureServiceBusSubscriptionCreateOperator passed with the 
subscription name, topic name
+        mocking the connection details, hook create_subscription function
         """
+        print("Wazzup doc")
         asb_create_topic = AzureServiceBusTopicCreateOperator(
             task_id="asb_create_topic",
             topic_name=TOPIC_NAME,
         )
         mock_topic_properties.name = TOPIC_NAME
         
mock_get_conn.return_value.__enter__.return_value.create_topic.return_value = 
mock_topic_properties
-
-        with mock.patch.object(asb_create_topic.log, "info") as mock_log_info:
-            asb_create_topic.execute(None)
-        mock_log_info.assert_called_with("Created Topic %s", TOPIC_NAME)
+        # create the topic
+        created_topic_name = asb_create_topic.execute(None)
+        # ensure the topic name is returned
+        assert created_topic_name == TOPIC_NAME
+        # ensure create_subscription is called with the correct arguments on 
the connection
+        
mock_get_conn.return_value.__enter__.return_value.create_topic.assert_called_once_with(
+            topic_name=TOPIC_NAME,
+            default_message_time_to_live=None,
+            max_size_in_megabytes=None,
+            requires_duplicate_detection=None,
+            duplicate_detection_history_time_window=None,
+            enable_batched_operations=None,
+            size_in_bytes=None,
+            filtering_messages_before_publishing=None,
+            authorization_rules=None,
+            support_ordering=None,
+            auto_delete_on_idle=None,
+            enable_partitioning=None,
+            enable_express=None,
+            user_metadata=None,
+            max_message_size_in_kilobytes=None,
+        )
+        print("Later Gator")
 
     @mock.patch("airflow.providers.microsoft.azure.hooks.asb.AdminClientHook")
     def test_create_subscription_exception(self, mock_sb_admin_client):

Reply via email to