This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new cda0e9e9bee Move create topic from
`AzureServiceBusTopicCreateOperator` to `AdminClientHook` (#45297)
cda0e9e9bee is described below
commit cda0e9e9bee23a1d04cc18060bb496c7fffc43da
Author: perry2of5 <[email protected]>
AuthorDate: Tue Dec 31 23:08:27 2024 -0800
Move create topic from `AzureServiceBusTopicCreateOperator` to
`AdminClientHook` (#45297)
* Move create topic to hook in Azure Service Bus provider
* Fix indenting of documentation
---
.../airflow/providers/microsoft/azure/hooks/asb.py | 88 ++++++++++++++++++++++
.../providers/microsoft/azure/operators/asb.py | 46 +++++------
providers/tests/microsoft/azure/hooks/test_asb.py | 20 +++++
.../tests/microsoft/azure/operators/test_asb.py | 32 ++++++--
4 files changed, 151 insertions(+), 35 deletions(-)
diff --git a/providers/src/airflow/providers/microsoft/azure/hooks/asb.py
b/providers/src/airflow/providers/microsoft/azure/hooks/asb.py
index 504dbe16871..4a153f0c826 100644
--- a/providers/src/airflow/providers/microsoft/azure/hooks/asb.py
+++ b/providers/src/airflow/providers/microsoft/azure/hooks/asb.py
@@ -28,6 +28,7 @@ from azure.servicebus import (
ServiceBusSender,
)
from azure.servicebus.management import (
+ AuthorizationRule,
CorrelationRuleFilter,
QueueProperties,
ServiceBusAdministrationClient,
@@ -194,6 +195,93 @@ class AdminClientHook(BaseAzureServiceBusHook):
with self.get_conn() as service_mgmt_conn:
service_mgmt_conn.delete_queue(queue_name)
+ def create_topic(
+ self,
+ topic_name: str,
+ azure_service_bus_conn_id: str = "azure_service_bus_default",
+ default_message_time_to_live: datetime.timedelta | str | None = None,
+ max_size_in_megabytes: int | None = None,
+ requires_duplicate_detection: bool | None = None,
+ duplicate_detection_history_time_window: datetime.timedelta | str |
None = None,
+ enable_batched_operations: bool | None = None,
+ size_in_bytes: int | None = None,
+ filtering_messages_before_publishing: bool | None = None,
+ authorization_rules: list[AuthorizationRule] | None = None,
+ support_ordering: bool | None = None,
+ auto_delete_on_idle: datetime.timedelta | str | None = None,
+ enable_partitioning: bool | None = None,
+ enable_express: bool | None = None,
+ user_metadata: str | None = None,
+ max_message_size_in_kilobytes: int | None = None,
+ ) -> str:
+ """
+ Create a topic by connecting to service Bus Admin client.
+
+ :param topic_name: Name of the topic.
+ :param default_message_time_to_live: ISO 8601 default message time
span to live value. This is
+ the duration after which the message expires, starting from when
the message is sent to Service
+ Bus. This is the default value used when TimeToLive is not set on
a message itself.
+ Input value of either type ~datetime.timedelta or string in ISO
8601 duration format
+ like "PT300S" is accepted.
+ :param max_size_in_megabytes: The maximum size of the topic in
megabytes, which is the size of
+ memory allocated for the topic.
+ :param requires_duplicate_detection: A value indicating if this topic
requires duplicate
+ detection.
+ :param duplicate_detection_history_time_window: ISO 8601 time span
structure that defines the
+ duration of the duplicate detection history. The default value is
10 minutes.
+ Input value of either type ~datetime.timedelta or string in ISO
8601 duration format
+ like "PT300S" is accepted.
+ :param enable_batched_operations: Value that indicates whether
server-side batched operations
+ are enabled.
+ :param size_in_bytes: The size of the topic, in bytes.
+ :param filtering_messages_before_publishing: Filter messages before
publishing.
+ :param authorization_rules: List of Authorization rules for resource.
+ :param support_ordering: A value that indicates whether the topic
supports ordering.
+ :param auto_delete_on_idle: ISO 8601 time span idle interval after
which the topic is
+ automatically deleted. The minimum duration is 5 minutes.
+ Input value of either type ~datetime.timedelta or string in ISO
8601 duration format
+ like "PT300S" is accepted.
+ :param enable_partitioning: A value that indicates whether the topic
is to be partitioned
+ across multiple message brokers.
+ :param enable_express: A value that indicates whether Express Entities
are enabled. An express
+ queue holds a message in memory temporarily before writing it to
persistent storage.
+ :param user_metadata: Metadata associated with the topic.
+ :param max_message_size_in_kilobytes: The maximum size in kilobytes of
message payload that
+ can be accepted by the queue. This feature is only available when
using a Premium namespace
+ and Service Bus API version "2021-05" or higher.
+ The minimum allowed value is 1024 while the maximum allowed value
is 102400. Default value is 1024.
+ """
+ if topic_name is None:
+ raise TypeError("Topic name cannot be None.")
+
+ with self.get_conn() as service_mgmt_conn:
+ try:
+ topic_properties = service_mgmt_conn.get_topic(topic_name)
+ except ResourceNotFoundError:
+ topic_properties = None
+ if topic_properties and topic_properties.name == topic_name:
+ self.log.info("Topic name already exists")
+ return topic_properties.name
+ topic = service_mgmt_conn.create_topic(
+ topic_name=topic_name,
+ default_message_time_to_live=default_message_time_to_live,
+ max_size_in_megabytes=max_size_in_megabytes,
+ requires_duplicate_detection=requires_duplicate_detection,
+
duplicate_detection_history_time_window=duplicate_detection_history_time_window,
+ enable_batched_operations=enable_batched_operations,
+ size_in_bytes=size_in_bytes,
+
filtering_messages_before_publishing=filtering_messages_before_publishing,
+ authorization_rules=authorization_rules,
+ support_ordering=support_ordering,
+ auto_delete_on_idle=auto_delete_on_idle,
+ enable_partitioning=enable_partitioning,
+ enable_express=enable_express,
+ user_metadata=user_metadata,
+ max_message_size_in_kilobytes=max_message_size_in_kilobytes,
+ )
+ self.log.info("Created Topic %s", topic.name)
+ return topic.name
+
def create_subscription(
self,
topic_name: str,
diff --git a/providers/src/airflow/providers/microsoft/azure/operators/asb.py
b/providers/src/airflow/providers/microsoft/azure/operators/asb.py
index 1c16a8115d7..ba3a3257b94 100644
--- a/providers/src/airflow/providers/microsoft/azure/operators/asb.py
+++ b/providers/src/airflow/providers/microsoft/azure/operators/asb.py
@@ -19,8 +19,6 @@ from __future__ import annotations
from collections.abc import Sequence
from typing import TYPE_CHECKING, Any, Callable
-from azure.core.exceptions import ResourceNotFoundError
-
from airflow.models import BaseOperator
from airflow.providers.microsoft.azure.hooks.asb import AdminClientHook,
MessageHook
@@ -313,33 +311,23 @@ class AzureServiceBusTopicCreateOperator(BaseOperator):
# Create the hook
hook =
AdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)
- with hook.get_conn() as service_mgmt_conn:
- try:
- topic_properties = service_mgmt_conn.get_topic(self.topic_name)
- except ResourceNotFoundError:
- topic_properties = None
- if topic_properties and topic_properties.name == self.topic_name:
- self.log.info("Topic name already exists")
- return topic_properties.name
- topic = service_mgmt_conn.create_topic(
- topic_name=self.topic_name,
- default_message_time_to_live=self.default_message_time_to_live,
- max_size_in_megabytes=self.max_size_in_megabytes,
- requires_duplicate_detection=self.requires_duplicate_detection,
-
duplicate_detection_history_time_window=self.duplicate_detection_history_time_window,
- enable_batched_operations=self.enable_batched_operations,
- size_in_bytes=self.size_in_bytes,
-
filtering_messages_before_publishing=self.filtering_messages_before_publishing,
- authorization_rules=self.authorization_rules,
- support_ordering=self.support_ordering,
- auto_delete_on_idle=self.auto_delete_on_idle,
- enable_partitioning=self.enable_partitioning,
- enable_express=self.enable_express,
- user_metadata=self.user_metadata,
-
max_message_size_in_kilobytes=self.max_message_size_in_kilobytes,
- )
- self.log.info("Created Topic %s", topic.name)
- return topic.name
+ return hook.create_topic(
+ topic_name=self.topic_name,
+ default_message_time_to_live=self.default_message_time_to_live,
+ max_size_in_megabytes=self.max_size_in_megabytes,
+ requires_duplicate_detection=self.requires_duplicate_detection,
+
duplicate_detection_history_time_window=self.duplicate_detection_history_time_window,
+ enable_batched_operations=self.enable_batched_operations,
+ size_in_bytes=self.size_in_bytes,
+
filtering_messages_before_publishing=self.filtering_messages_before_publishing,
+ authorization_rules=self.authorization_rules,
+ support_ordering=self.support_ordering,
+ auto_delete_on_idle=self.auto_delete_on_idle,
+ enable_partitioning=self.enable_partitioning,
+ enable_express=self.enable_express,
+ user_metadata=self.user_metadata,
+ max_message_size_in_kilobytes=self.max_message_size_in_kilobytes,
+ )
class AzureServiceBusSubscriptionCreateOperator(BaseOperator):
diff --git a/providers/tests/microsoft/azure/hooks/test_asb.py
b/providers/tests/microsoft/azure/hooks/test_asb.py
index fc5e433058c..067e79bf570 100644
--- a/providers/tests/microsoft/azure/hooks/test_asb.py
+++ b/providers/tests/microsoft/azure/hooks/test_asb.py
@@ -119,6 +119,26 @@ class TestAdminClientHook:
with pytest.raises(TypeError):
hook.delete_queue(None)
+ # Test creating a topic using hook method `create_topic`
+ @mock.patch("azure.servicebus.management.TopicProperties")
+ @mock.patch(f"{MODULE}.AdminClientHook.get_conn")
+ def test_create_topic(self, mock_sb_admin_client, mock_topic_properties):
+ """
+ Test `create_topic` hook function with mocking connection, topic
properties value and
+ the azure service bus `create_topic` function
+ """
+ topic_name = "test_topic_name"
+ mock_topic_properties.name = topic_name
+
mock_sb_admin_client.return_value.__enter__.return_value.create_topic.return_value
= (
+ mock_topic_properties
+ )
+ hook = AdminClientHook(azure_service_bus_conn_id=self.conn_id)
+ with mock.patch.object(hook.log, "info") as mock_log_info:
+ hook.create_topic(topic_name)
+ assert mock_topic_properties.name == topic_name
+
+ mock_log_info.assert_called_with("Created Topic %s", topic_name)
+
# Test creating subscription with topic name and subscription name using
hook method `create_subscription`
@mock.patch("azure.servicebus.management.SubscriptionProperties")
@mock.patch(f"{MODULE}.AdminClientHook.get_conn")
diff --git a/providers/tests/microsoft/azure/operators/test_asb.py
b/providers/tests/microsoft/azure/operators/test_asb.py
index 7e0c953890c..145d2e72940 100644
--- a/providers/tests/microsoft/azure/operators/test_asb.py
+++ b/providers/tests/microsoft/azure/operators/test_asb.py
@@ -255,19 +255,39 @@ class TestABSTopicCreateOperator:
@mock.patch("azure.servicebus.management.TopicProperties")
def test_create_topic(self, mock_topic_properties, mock_get_conn):
"""
- Test AzureServiceBusTopicCreateOperator passed with the topic name
- mocking the connection details, hook create_topic function
+ Test AzureServiceBusSubscriptionCreateOperator passed with the
subscription name, topic name
+ mocking the connection details, hook create_subscription function
"""
+ print("Wazzup doc")
asb_create_topic = AzureServiceBusTopicCreateOperator(
task_id="asb_create_topic",
topic_name=TOPIC_NAME,
)
mock_topic_properties.name = TOPIC_NAME
mock_get_conn.return_value.__enter__.return_value.create_topic.return_value =
mock_topic_properties
-
- with mock.patch.object(asb_create_topic.log, "info") as mock_log_info:
- asb_create_topic.execute(None)
- mock_log_info.assert_called_with("Created Topic %s", TOPIC_NAME)
+ # create the topic
+ created_topic_name = asb_create_topic.execute(None)
+ # ensure the topic name is returned
+ assert created_topic_name == TOPIC_NAME
+ # ensure create_subscription is called with the correct arguments on
the connection
+
mock_get_conn.return_value.__enter__.return_value.create_topic.assert_called_once_with(
+ topic_name=TOPIC_NAME,
+ default_message_time_to_live=None,
+ max_size_in_megabytes=None,
+ requires_duplicate_detection=None,
+ duplicate_detection_history_time_window=None,
+ enable_batched_operations=None,
+ size_in_bytes=None,
+ filtering_messages_before_publishing=None,
+ authorization_rules=None,
+ support_ordering=None,
+ auto_delete_on_idle=None,
+ enable_partitioning=None,
+ enable_express=None,
+ user_metadata=None,
+ max_message_size_in_kilobytes=None,
+ )
+ print("Later Gator")
@mock.patch("airflow.providers.microsoft.azure.hooks.asb.AdminClientHook")
def test_create_subscription_exception(self, mock_sb_admin_client):