pankajastro commented on code in PR #24625:
URL: https://github.com/apache/airflow/pull/24625#discussion_r910148275


##########
airflow/providers/microsoft/azure/operators/asb.py:
##########
@@ -204,3 +204,145 @@ def execute(self, context: "Context") -> None:
 
         # delete queue with name
         hook.delete_queue(self.queue_name)
+
+
+class AzureServiceBusSubscriptionCreateOperator(BaseOperator):
+    """
+    Create an Azure Service Bus Topic Subscription under a Service Bus 
Namespace
+    by using ServiceBusAdministrationClient
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:AzureServiceBusSubscriptionCreateOperator`
+
+    :param topic_name: The topic that will own the to-be-created subscription.
+    :param subscription_name: Name of the subscription that need to be created
+    :param lock_duration: ISO 8601 time span duration of a peek-lock; that is, 
the amount of time that
+        the message is locked for other receivers. The maximum value for 
LockDuration is 5 minutes; the
+        default value is 1 minute. Input value of either type 
~datetime.timedelta or string in ISO 8601
+        duration format like "PT300S" is accepted.
+    :param requires_session: A value that indicates whether the queue supports 
the concept of sessions.
+    :param default_message_time_to_live: ISO 8601 default message time span to 
live value. This is the
+        duration after which the message expires, starting from when the 
message is sent to
+        Service Bus. This is the default value used when TimeToLive is not set 
on a message itself.
+        Input value of either type ~datetime.timedelta or string in ISO 8601 
duration
+        format like "PT300S" is accepted.
+    :param dead_lettering_on_message_expiration: A value that indicates 
whether this subscription has
+        dead letter support when a message expires.
+    :param dead_lettering_on_filter_evaluation_exceptions: A value that 
indicates whether this
+        subscription has dead letter support when a message expires.
+    :param max_delivery_count: The maximum delivery count. A message is 
automatically dead lettered
+        after this number of deliveries. Default value is 10.
+    :param enable_batched_operations: Value that indicates whether server-side 
batched
+        operations are enabled.
+    :param forward_to: The name of the recipient entity to which all the 
messages sent to the
+        subscription are forwarded to.
+    :param user_metadata: Metadata associated with the subscription. Maximum 
number of characters is 1024.
+    :param forward_dead_lettered_messages_to: The name of the recipient entity 
to which all the
+        messages sent to the subscription are forwarded to.
+    :param auto_delete_on_idle: ISO 8601 time Span idle interval after which 
the subscription is
+        automatically deleted. The minimum duration is 5 minutes. Input value 
of either
+        type ~datetime.timedelta or string in ISO 8601 duration format like 
"PT300S" is accepted.
+    :param azure_service_bus_conn_id: Reference to the
+        :ref:`Azure Service Bus 
connection<howto/connection:azure_service_bus>`.
+    """
+
+    template_fields: Sequence[str] = ("topic_name", "subscription_name")
+    ui_color = "#e4f0e8"
+
+    def __init__(
+        self,
+        *,
+        topic_name: str,
+        subscription_name: str,
+        azure_service_bus_conn_id: str = 'azure_service_bus_default',
+        lock_duration: Optional[Union[datetime.timedelta, str]] = None,
+        requires_session: Optional[bool] = None,
+        default_message_time_to_live: Optional[Union[datetime.timedelta, str]] 
= None,
+        dead_lettering_on_message_expiration: Optional[bool] = True,
+        dead_lettering_on_filter_evaluation_exceptions: Optional[bool] = None,
+        max_delivery_count: Optional[int] = 10,
+        enable_batched_operations: Optional[bool] = True,
+        forward_to: Optional[str] = None,
+        user_metadata: Optional[str] = None,
+        forward_dead_lettered_messages_to: Optional[str] = None,
+        auto_delete_on_idle: Optional[Union[datetime.timedelta, str]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.topic_name = topic_name
+        self.subscription_name = subscription_name
+        self.lock_duration = lock_duration
+        self.requires_session = requires_session
+        self.default_message_time_to_live = default_message_time_to_live
+        self.dead_lettering_on_message_expiration = 
dead_lettering_on_message_expiration
+        self.dead_lettering_on_filter_evaluation_exceptions = 
dead_lettering_on_filter_evaluation_exceptions
+        self.max_delivery_count = max_delivery_count
+        self.enable_batched_operations = enable_batched_operations
+        self.forward_to = forward_to
+        self.user_metadata = user_metadata
+        self.forward_dead_lettered_messages_to = 
forward_dead_lettered_messages_to
+        self.auto_delete_on_idle = auto_delete_on_idle
+        self.azure_service_bus_conn_id = azure_service_bus_conn_id
+
+    def execute(self, context: "Context") -> None:
+        """Creates Subscription in Service Bus namespace, by connecting to 
Service Bus Admin client"""
+        # Create the hook
+        hook = 
AdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)
+
+        # create subscription with name
+        subscription = hook.create_subscription(
+            self.subscription_name,
+            self.topic_name,
+            self.lock_duration,
+            self.requires_session,
+            self.default_message_time_to_live,
+            self.dead_lettering_on_message_expiration,
+            self.dead_lettering_on_filter_evaluation_exceptions,
+            self.max_delivery_count,
+            self.enable_batched_operations,
+            self.forward_to,
+            self.user_metadata,
+            self.forward_dead_lettered_messages_to,
+            self.auto_delete_on_idle,
+        )
+        self.log.info("Created Subcription %s", subscription.name)
+
+
+class AzureServiceBusSubscriptionDeleteOperator(BaseOperator):
+    """
+    Deletes the topic subscription in the Azure ServiceBus namespace

Review Comment:
   ```suggestion
       Deletes a topic subscription in the Azure ServiceBus namespace
   ```



##########
airflow/providers/microsoft/azure/operators/asb.py:
##########
@@ -204,3 +204,145 @@ def execute(self, context: "Context") -> None:
 
         # delete queue with name
         hook.delete_queue(self.queue_name)
+
+
+class AzureServiceBusSubscriptionCreateOperator(BaseOperator):
+    """
+    Create an Azure Service Bus Topic Subscription under a Service Bus 
Namespace
+    by using ServiceBusAdministrationClient
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:AzureServiceBusSubscriptionCreateOperator`
+
+    :param topic_name: The topic that will own the to-be-created subscription.
+    :param subscription_name: Name of the subscription that need to be created
+    :param lock_duration: ISO 8601 time span duration of a peek-lock; that is, 
the amount of time that
+        the message is locked for other receivers. The maximum value for 
LockDuration is 5 minutes; the
+        default value is 1 minute. Input value of either type 
~datetime.timedelta or string in ISO 8601
+        duration format like "PT300S" is accepted.
+    :param requires_session: A value that indicates whether the queue supports 
the concept of sessions.
+    :param default_message_time_to_live: ISO 8601 default message time span to 
live value. This is the
+        duration after which the message expires, starting from when the 
message is sent to
+        Service Bus. This is the default value used when TimeToLive is not set 
on a message itself.
+        Input value of either type ~datetime.timedelta or string in ISO 8601 
duration
+        format like "PT300S" is accepted.
+    :param dead_lettering_on_message_expiration: A value that indicates 
whether this subscription has
+        dead letter support when a message expires.
+    :param dead_lettering_on_filter_evaluation_exceptions: A value that 
indicates whether this
+        subscription has dead letter support when a message expires.
+    :param max_delivery_count: The maximum delivery count. A message is 
automatically dead lettered
+        after this number of deliveries. Default value is 10.
+    :param enable_batched_operations: Value that indicates whether server-side 
batched
+        operations are enabled.
+    :param forward_to: The name of the recipient entity to which all the 
messages sent to the
+        subscription are forwarded to.
+    :param user_metadata: Metadata associated with the subscription. Maximum 
number of characters is 1024.
+    :param forward_dead_lettered_messages_to: The name of the recipient entity 
to which all the
+        messages sent to the subscription are forwarded to.
+    :param auto_delete_on_idle: ISO 8601 time Span idle interval after which 
the subscription is
+        automatically deleted. The minimum duration is 5 minutes. Input value 
of either
+        type ~datetime.timedelta or string in ISO 8601 duration format like 
"PT300S" is accepted.
+    :param azure_service_bus_conn_id: Reference to the
+        :ref:`Azure Service Bus 
connection<howto/connection:azure_service_bus>`.
+    """
+
+    template_fields: Sequence[str] = ("topic_name", "subscription_name")
+    ui_color = "#e4f0e8"
+
+    def __init__(
+        self,
+        *,
+        topic_name: str,
+        subscription_name: str,
+        azure_service_bus_conn_id: str = 'azure_service_bus_default',
+        lock_duration: Optional[Union[datetime.timedelta, str]] = None,
+        requires_session: Optional[bool] = None,
+        default_message_time_to_live: Optional[Union[datetime.timedelta, str]] 
= None,
+        dead_lettering_on_message_expiration: Optional[bool] = True,
+        dead_lettering_on_filter_evaluation_exceptions: Optional[bool] = None,
+        max_delivery_count: Optional[int] = 10,
+        enable_batched_operations: Optional[bool] = True,
+        forward_to: Optional[str] = None,
+        user_metadata: Optional[str] = None,
+        forward_dead_lettered_messages_to: Optional[str] = None,
+        auto_delete_on_idle: Optional[Union[datetime.timedelta, str]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.topic_name = topic_name
+        self.subscription_name = subscription_name
+        self.lock_duration = lock_duration
+        self.requires_session = requires_session
+        self.default_message_time_to_live = default_message_time_to_live
+        self.dead_lettering_on_message_expiration = 
dead_lettering_on_message_expiration
+        self.dead_lettering_on_filter_evaluation_exceptions = 
dead_lettering_on_filter_evaluation_exceptions
+        self.max_delivery_count = max_delivery_count
+        self.enable_batched_operations = enable_batched_operations
+        self.forward_to = forward_to
+        self.user_metadata = user_metadata
+        self.forward_dead_lettered_messages_to = 
forward_dead_lettered_messages_to
+        self.auto_delete_on_idle = auto_delete_on_idle
+        self.azure_service_bus_conn_id = azure_service_bus_conn_id
+
+    def execute(self, context: "Context") -> None:
+        """Creates Subscription in Service Bus namespace, by connecting to 
Service Bus Admin client"""
+        # Create the hook
+        hook = 
AdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)
+
+        # create subscription with name
+        subscription = hook.create_subscription(
+            self.subscription_name,
+            self.topic_name,
+            self.lock_duration,
+            self.requires_session,
+            self.default_message_time_to_live,
+            self.dead_lettering_on_message_expiration,
+            self.dead_lettering_on_filter_evaluation_exceptions,
+            self.max_delivery_count,
+            self.enable_batched_operations,
+            self.forward_to,
+            self.user_metadata,
+            self.forward_dead_lettered_messages_to,
+            self.auto_delete_on_idle,
+        )
+        self.log.info("Created Subcription %s", subscription.name)
+
+
+class AzureServiceBusSubscriptionDeleteOperator(BaseOperator):
+    """
+    Deletes the topic subscription in the Azure ServiceBus namespace

Review Comment:
   ```suggestion
       Delete a topic subscription in the Azure ServiceBus namespace
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to