This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5c7c518aa0 Implement Azure Service Bus Topic Create, Delete Operators
(#25436)
5c7c518aa0 is described below
commit 5c7c518aa065bba873bc95d5764658faa9e81b63
Author: Bharanidharan <[email protected]>
AuthorDate: Tue Aug 16 21:41:00 2022 +0530
Implement Azure Service Bus Topic Create, Delete Operators (#25436)
- Added Create Topic Operator
- Added Test case
- Added Example DAG
- Added Doc for the operator
Implemented Azure service bus Topic Delete operator
- Added Operator for Delete Topic Operator
- Added example DAG
---
airflow/providers/microsoft/azure/operators/asb.py | 165 ++++++++++++++++++++-
.../operators/asb.rst | 37 +++++
.../microsoft/azure/operators/test_asb.py | 104 +++++++++++++
.../microsoft/azure/example_azure_service_bus.py | 17 +++
4 files changed, 322 insertions(+), 1 deletion(-)
diff --git a/airflow/providers/microsoft/azure/operators/asb.py
b/airflow/providers/microsoft/azure/operators/asb.py
index 2e7599c469..3a6bc2800e 100644
--- a/airflow/providers/microsoft/azure/operators/asb.py
+++ b/airflow/providers/microsoft/azure/operators/asb.py
@@ -15,12 +15,14 @@
# specific language governing permissions and limitations
# under the License.
import datetime
-from typing import TYPE_CHECKING, List, Optional, Sequence, Union
+from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Union
from airflow.models import BaseOperator
from airflow.providers.microsoft.azure.hooks.asb import AdminClientHook,
MessageHook
if TYPE_CHECKING:
+ from azure.servicebus.management._models import AuthorizationRule
+
from airflow.utils.context import Context
@@ -206,6 +208,125 @@ class AzureServiceBusDeleteQueueOperator(BaseOperator):
hook.delete_queue(self.queue_name)
+class AzureServiceBusTopicCreateOperator(BaseOperator):
+ """
+ Create an Azure Service Bus Topic under a Service Bus Namespace by using
ServiceBusAdministrationClient
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:AzureServiceBusTopicCreateOperator`
+
+ :param topic_name: Name of the topic.
+ :param default_message_time_to_live: ISO 8601 default message time span to
live value. This is
+ the duration after which the message expires, starting from when the
message is sent to Service
+ Bus. This is the default value used when TimeToLive is not set on a
message itself.
+ Input value of either type ~datetime.timedelta or string in ISO 8601
duration format
+ like "PT300S" is accepted.
+ :param max_size_in_megabytes: The maximum size of the topic in megabytes,
which is the size of
+ memory allocated for the topic.
+ :param requires_duplicate_detection: A value indicating if this topic
requires duplicate
+ detection.
+ :param duplicate_detection_history_time_window: ISO 8601 time span
structure that defines the
+ duration of the duplicate detection history. The default value is 10
minutes.
+ Input value of either type ~datetime.timedelta or string in ISO 8601
duration format
+ like "PT300S" is accepted.
+ :param enable_batched_operations: Value that indicates whether server-side
batched operations
+ are enabled.
+ :param size_in_bytes: The size of the topic, in bytes.
+ :param filtering_messages_before_publishing: Filter messages before
publishing.
+ :param authorization_rules: List of Authorization rules for resource.
+ :param support_ordering: A value that indicates whether the topic supports
ordering.
+ :param auto_delete_on_idle: ISO 8601 time span idle interval after which
the topic is
+ automatically deleted. The minimum duration is 5 minutes.
+ Input value of either type ~datetime.timedelta or string in ISO 8601
duration format
+ like "PT300S" is accepted.
+ :param enable_partitioning: A value that indicates whether the topic is to
be partitioned
+ across multiple message brokers.
+ :param enable_express: A value that indicates whether Express Entities are
enabled. An express
+ queue holds a message in memory temporarily before writing it to
persistent storage.
+ :param user_metadata: Metadata associated with the topic.
+ :param max_message_size_in_kilobytes: The maximum size in kilobytes of
message payload that
+ can be accepted by the queue. This feature is only available when using a
Premium namespace
+ and Service Bus API version "2021-05" or higher.
+ The minimum allowed value is 1024 while the maximum allowed value is
102400. Default value is 1024.
+ """
+
+ template_fields: Sequence[str] = ("topic_name",)
+ ui_color = "#e4f0e8"
+
+ def __init__(
+ self,
+ *,
+ topic_name: str,
+ azure_service_bus_conn_id: str = 'azure_service_bus_default',
+ default_message_time_to_live: Optional[Union[datetime.timedelta, str]]
= None,
+ max_size_in_megabytes: Optional[int] = None,
+ requires_duplicate_detection: Optional[bool] = None,
+ duplicate_detection_history_time_window:
Optional[Union[datetime.timedelta, str]] = None,
+ enable_batched_operations: Optional[bool] = None,
+ size_in_bytes: Optional[int] = None,
+ filtering_messages_before_publishing: Optional[bool] = None,
+ authorization_rules: Optional[List["AuthorizationRule"]] = None,
+ support_ordering: Optional[bool] = None,
+ auto_delete_on_idle: Optional[Union[datetime.timedelta, str]] = None,
+ enable_partitioning: Optional[bool] = None,
+ enable_express: Optional[bool] = None,
+ user_metadata: Optional[str] = None,
+ max_message_size_in_kilobytes: Optional[int] = None,
+ **kwargs: Any,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.topic_name = topic_name
+ self.azure_service_bus_conn_id = azure_service_bus_conn_id
+ self.default_message_time_to_live = default_message_time_to_live
+ self.max_size_in_megabytes = max_size_in_megabytes
+ self.requires_duplicate_detection = requires_duplicate_detection
+ self.duplicate_detection_history_time_window =
duplicate_detection_history_time_window
+ self.enable_batched_operations = enable_batched_operations
+ self.size_in_bytes = size_in_bytes
+ self.filtering_messages_before_publishing =
filtering_messages_before_publishing
+ self.authorization_rules = authorization_rules
+ self.support_ordering = support_ordering
+ self.auto_delete_on_idle = auto_delete_on_idle
+ self.enable_partitioning = enable_partitioning
+ self.enable_express = enable_express
+ self.user_metadata = user_metadata
+ self.max_message_size_in_kilobytes = max_message_size_in_kilobytes
+
+ def execute(self, context: "Context") -> str:
+ """Creates Topic in Service Bus namespace, by connecting to Service
Bus Admin client"""
+ if self.topic_name is None:
+ raise TypeError("Topic name cannot be None.")
+
+ # Create the hook
+ hook =
AdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)
+
+ with hook.get_conn() as service_mgmt_conn:
+ topic_properties = service_mgmt_conn.get_topic(self.topic_name)
+ if topic_properties and topic_properties.name == self.topic_name:
+ self.log.info("Topic name already exists")
+ return topic_properties.name
+ topic = service_mgmt_conn.create_topic(
+ topic_name=self.topic_name,
+ default_message_time_to_live=self.default_message_time_to_live,
+ max_size_in_megabytes=self.max_size_in_megabytes,
+ requires_duplicate_detection=self.requires_duplicate_detection,
+
duplicate_detection_history_time_window=self.duplicate_detection_history_time_window,
+ enable_batched_operations=self.enable_batched_operations,
+ size_in_bytes=self.size_in_bytes,
+
filtering_messages_before_publishing=self.filtering_messages_before_publishing,
+ authorization_rules=self.authorization_rules,
+ support_ordering=self.support_ordering,
+ auto_delete_on_idle=self.auto_delete_on_idle,
+ enable_partitioning=self.enable_partitioning,
+ enable_express=self.enable_express,
+ user_metadata=self.user_metadata,
+
max_message_size_in_kilobytes=self.max_message_size_in_kilobytes,
+ )
+ self.log.info("Created Topic %s", topic.name)
+ return topic.name
+
+
class AzureServiceBusSubscriptionCreateOperator(BaseOperator):
"""
Create an Azure Service Bus Topic Subscription under a Service Bus
Namespace
@@ -467,3 +588,45 @@ class
AzureServiceBusSubscriptionDeleteOperator(BaseOperator):
# delete subscription with name
hook.delete_subscription(self.subscription_name, self.topic_name)
+
+
+class AzureServiceBusTopicDeleteOperator(BaseOperator):
+ """
+ Deletes the topic in the Azure Service Bus namespace
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:AzureServiceBusTopicDeleteOperator`
+
+ :param topic_name: Name of the topic to be deleted.
+ :param azure_service_bus_conn_id: Reference to the
+ :ref:`Azure Service Bus connection
<howto/connection:azure_service_bus>`.
+ """
+
+ template_fields: Sequence[str] = ("topic_name",)
+ ui_color = "#e4f0e8"
+
+ def __init__(
+ self,
+ *,
+ topic_name: str,
+ azure_service_bus_conn_id: str = 'azure_service_bus_default',
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.topic_name = topic_name
+ self.azure_service_bus_conn_id = azure_service_bus_conn_id
+
+ def execute(self, context: "Context") -> None:
+ """Delete topic in Service Bus namespace, by connecting to Service Bus
Admin client"""
+ if self.topic_name is None:
+ raise TypeError("Topic name cannot be None.")
+ hook =
AdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)
+
+ with hook.get_conn() as service_mgmt_conn:
+ topic_properties = service_mgmt_conn.get_topic(self.topic_name)
+ if topic_properties and topic_properties.name == self.topic_name:
+ service_mgmt_conn.delete_topic(self.topic_name)
+ self.log.info("Topic %s deleted.", self.topic_name)
+ else:
+ self.log.info("Topic %s does not exist.", self.topic_name)
diff --git a/docs/apache-airflow-providers-microsoft-azure/operators/asb.rst
b/docs/apache-airflow-providers-microsoft-azure/operators/asb.rst
index 0614d13224..5ad6962418 100644
--- a/docs/apache-airflow-providers-microsoft-azure/operators/asb.rst
+++ b/docs/apache-airflow-providers-microsoft-azure/operators/asb.rst
@@ -98,6 +98,43 @@ Below is an example of using this operator to execute an
Azure Service Bus Delet
:start-after: [START howto_operator_delete_service_bus_queue]
:end-before: [END howto_operator_delete_service_bus_queue]
+Azure Service Bus Topic Operators
+-----------------------------------------
+Azure Service Bus Topic based Operators helps to interact with topic in
service bus namespace
+and it helps to Create, Delete operation for topic.
+
+.. _howto/operator:AzureServiceBusTopicCreateOperator:
+
+Create Azure Service Bus Topic
+======================================
+
+To create Azure service bus topic with specific Parameter you can use
+:class:`~airflow.providers.microsoft.azure.operators.asb.AzureServiceBusTopicCreateOperator`.
+
+Below is an example of using this operator to execute an Azure Service Bus
Create Topic.
+
+.. exampleinclude::
/../../tests/system/providers/microsoft/azure/example_azure_service_bus.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_create_service_bus_topic]
+ :end-before: [END howto_operator_create_service_bus_topic]
+
+.. _howto/operator:AzureServiceBusTopicDeleteOperator:
+
+Delete Azure Service Bus Topic
+======================================
+
+To Delete the Azure service bus topic you can use
+:class:`~airflow.providers.microsoft.azure.operators.asb.AzureServiceBusTopicDeleteOperator`.
+
+Below is an example of using this operator to execute an Azure Service Bus
Delete topic.
+
+.. exampleinclude::
/../../tests/system/providers/microsoft/azure/example_azure_service_bus.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_delete_service_bus_topic]
+ :end-before: [END howto_operator_delete_service_bus_topic]
+
Azure Service Bus Subscription Operators
-----------------------------------------
Azure Service Bus Subscription based Operators helps to interact topic
Subscription in service bus namespace
diff --git a/tests/providers/microsoft/azure/operators/test_asb.py
b/tests/providers/microsoft/azure/operators/test_asb.py
index 25a79cbf12..d2bdcc90eb 100644
--- a/tests/providers/microsoft/azure/operators/test_asb.py
+++ b/tests/providers/microsoft/azure/operators/test_asb.py
@@ -28,6 +28,8 @@ from airflow.providers.microsoft.azure.operators.asb import (
AzureServiceBusSendMessageOperator,
AzureServiceBusSubscriptionCreateOperator,
AzureServiceBusSubscriptionDeleteOperator,
+ AzureServiceBusTopicCreateOperator,
+ AzureServiceBusTopicDeleteOperator,
AzureServiceBusUpdateSubscriptionOperator,
)
@@ -204,6 +206,51 @@ class TestAzureServiceBusReceiveMessageOperator:
mock_get_conn.assert_has_calls(expected_calls)
+class TestABSTopicCreateOperator:
+ def test_init(self):
+ """
+ Test init by creating AzureServiceBusTopicCreateOperator with task id
and topic name,
+ by asserting the value
+ """
+ asb_create_topic = AzureServiceBusTopicCreateOperator(
+ task_id="asb_create_topic",
+ topic_name=TOPIC_NAME,
+ )
+ assert asb_create_topic.task_id == "asb_create_topic"
+ assert asb_create_topic.topic_name == TOPIC_NAME
+
+
@mock.patch("airflow.providers.microsoft.azure.hooks.asb.AdminClientHook.get_conn")
+ @mock.patch('azure.servicebus.management.TopicProperties')
+ def test_create_topic(self, mock_topic_properties, mock_get_conn):
+ """
+ Test AzureServiceBusTopicCreateOperator passed with the topic name
+ mocking the connection details, hook create_topic function
+ """
+ asb_create_topic = AzureServiceBusTopicCreateOperator(
+ task_id="asb_create_topic",
+ topic_name=TOPIC_NAME,
+ )
+ mock_topic_properties.name = TOPIC_NAME
+
mock_get_conn.return_value.__enter__.return_value.create_topic.return_value =
mock_topic_properties
+
+ with mock.patch.object(asb_create_topic.log, "info") as mock_log_info:
+ asb_create_topic.execute(None)
+ mock_log_info.assert_called_with("Created Topic %s", TOPIC_NAME)
+
+ @mock.patch('airflow.providers.microsoft.azure.hooks.asb.AdminClientHook')
+ def test_create_subscription_exception(self, mock_sb_admin_client):
+ """
+ Test `AzureServiceBusTopicCreateOperator` functionality to raise
AirflowException,
+ by passing topic name as None and pytest raise Airflow Exception
+ """
+ asb_create_topic_exception = AzureServiceBusTopicCreateOperator(
+ task_id="create_service_bus_subscription",
+ topic_name=None,
+ )
+ with pytest.raises(TypeError):
+ asb_create_topic_exception.execute(None)
+
+
class TestASBCreateSubscriptionOperator:
def test_init(self):
"""
@@ -377,3 +424,60 @@ class TestASBSubscriptionReceiveMessageOperator:
.__exit__
]
mock_get_conn.assert_has_calls(expected_calls)
+
+
+class TestASBTopicDeleteOperator:
+ def test_init(self):
+ """
+ Test init by creating AzureServiceBusTopicDeleteOperator with task id,
topic name and asserting
+ with values
+ """
+ asb_delete_topic_operator = AzureServiceBusTopicDeleteOperator(
+ task_id="asb_delete_topic",
+ topic_name=TOPIC_NAME,
+ )
+ assert asb_delete_topic_operator.task_id == "asb_delete_topic"
+ assert asb_delete_topic_operator.topic_name == TOPIC_NAME
+
+
@mock.patch("airflow.providers.microsoft.azure.hooks.asb.AdminClientHook.get_conn")
+ @mock.patch('azure.servicebus.management.TopicProperties')
+ def test_delete_topic(self, mock_topic_properties, mock_get_conn):
+ """
+ Test AzureServiceBusTopicDeleteOperator by mocking topic name,
connection
+ """
+ asb_delete_topic = AzureServiceBusTopicDeleteOperator(
+ task_id="asb_delete_topic",
+ topic_name=TOPIC_NAME,
+ )
+ mock_topic_properties.name = TOPIC_NAME
+
mock_get_conn.return_value.__enter__.return_value.get_topic.return_value =
mock_topic_properties
+ with mock.patch.object(asb_delete_topic.log, "info") as mock_log_info:
+ asb_delete_topic.execute(None)
+ mock_log_info.assert_called_with("Topic %s deleted.", TOPIC_NAME)
+
+
@mock.patch("airflow.providers.microsoft.azure.hooks.asb.AdminClientHook.get_conn")
+ def test_delete_topic_not_exists(self, mock_get_conn):
+ """
+ Test AzureServiceBusTopicDeleteOperator by mocking topic name,
connection
+ """
+ asb_delete_topic_not_exists = AzureServiceBusTopicDeleteOperator(
+ task_id="asb_delete_topic_not_exists",
+ topic_name=TOPIC_NAME,
+ )
+
mock_get_conn.return_value.__enter__.return_value.get_topic.return_value = None
+ with mock.patch.object(asb_delete_topic_not_exists.log, "info") as
mock_log_info:
+ asb_delete_topic_not_exists.execute(None)
+ mock_log_info.assert_called_with("Topic %s does not exist.",
TOPIC_NAME)
+
+ @mock.patch('airflow.providers.microsoft.azure.hooks.asb.AdminClientHook')
+ def test_delete_topic_exception(self, mock_sb_admin_client):
+ """
+ Test `delete_topic` functionality to raise AirflowException,
+ by passing topic name as None and pytest raise Airflow Exception
+ """
+ asb_delete_topic_exception = AzureServiceBusTopicDeleteOperator(
+ task_id="delete_service_bus_subscription",
+ topic_name=None,
+ )
+ with pytest.raises(TypeError):
+ asb_delete_topic_exception.execute(None)
diff --git
a/tests/system/providers/microsoft/azure/example_azure_service_bus.py
b/tests/system/providers/microsoft/azure/example_azure_service_bus.py
index 6295d28b72..e4e736d8eb 100644
--- a/tests/system/providers/microsoft/azure/example_azure_service_bus.py
+++ b/tests/system/providers/microsoft/azure/example_azure_service_bus.py
@@ -28,6 +28,8 @@ from airflow.providers.microsoft.azure.operators.asb import (
AzureServiceBusSendMessageOperator,
AzureServiceBusSubscriptionCreateOperator,
AzureServiceBusSubscriptionDeleteOperator,
+ AzureServiceBusTopicCreateOperator,
+ AzureServiceBusTopicDeleteOperator,
AzureServiceBusUpdateSubscriptionOperator,
)
@@ -94,6 +96,12 @@ with DAG(
)
# [END howto_operator_receive_message_service_bus_queue]
+ # [START howto_operator_create_service_bus_topic]
+ create_service_bus_topic = AzureServiceBusTopicCreateOperator(
+ task_id="create_service_bus_topic", topic_name=TOPIC_NAME
+ )
+ # [END howto_operator_create_service_bus_topic]
+
# [START howto_operator_create_service_bus_subscription]
create_service_bus_subscription =
AzureServiceBusSubscriptionCreateOperator(
task_id="create_service_bus_subscription",
@@ -129,6 +137,13 @@ with DAG(
)
# [END howto_operator_delete_service_bus_subscription]
+ # [START howto_operator_delete_service_bus_topic]
+ delete_asb_topic = AzureServiceBusTopicDeleteOperator(
+ task_id="delete_asb_topic",
+ topic_name=TOPIC_NAME,
+ )
+ # [END howto_operator_delete_service_bus_topic]
+
# [START howto_operator_delete_service_bus_queue]
delete_service_bus_queue = AzureServiceBusDeleteQueueOperator(
task_id="delete_service_bus_queue", queue_name=QUEUE_NAME,
trigger_rule="all_done"
@@ -137,6 +152,7 @@ with DAG(
chain(
create_service_bus_queue,
+ create_service_bus_topic,
create_service_bus_subscription,
send_message_to_service_bus_queue,
send_list_message_to_service_bus_queue,
@@ -145,6 +161,7 @@ with DAG(
update_service_bus_subscription,
receive_message_service_bus_subscription,
delete_service_bus_subscription,
+ delete_asb_topic,
delete_service_bus_queue,
)