This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 196d336585 Add Amazon EventBridge PutRule hook and operator (#32869)
196d336585 is described below

commit 196d3365852452f1651e0091aad1451564da453b
Author: Maham Ali <[email protected]>
AuthorDate: Mon Jul 31 11:55:28 2023 -0700

    Add Amazon EventBridge PutRule hook and operator (#32869)
---
 airflow/providers/amazon/aws/hooks/eventbridge.py  | 65 ++++++++++++++++-
 .../providers/amazon/aws/operators/eventbridge.py  | 85 ++++++++++++++++++++++
 .../operators/eventbridge.rst                      | 24 +++++-
 .../providers/amazon/aws/hooks/test_eventbridge.py | 11 +++
 .../amazon/aws/operators/test_eventbridge.py       | 44 +++++++++--
 .../providers/amazon/aws/example_eventbridge.py    | 25 ++++++-
 6 files changed, 241 insertions(+), 13 deletions(-)

diff --git a/airflow/providers/amazon/aws/hooks/eventbridge.py 
b/airflow/providers/amazon/aws/hooks/eventbridge.py
index d9450ae7e6..41c3de871f 100644
--- a/airflow/providers/amazon/aws/hooks/eventbridge.py
+++ b/airflow/providers/amazon/aws/hooks/eventbridge.py
@@ -16,12 +16,75 @@
 # under the License.
 from __future__ import annotations
 
+import json
+
 from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+from airflow.providers.amazon.aws.utils import trim_none_values
+
+
+def _validate_json(pattern: str) -> None:
+    try:
+        json.loads(pattern)
+    except ValueError:
+        raise ValueError("`event_pattern` must be a valid JSON string.")
 
 
 class EventBridgeHook(AwsBaseHook):
     """Amazon EventBridge Hook."""
 
     def __init__(self, *args, **kwargs):
-        """Creating object."""
         super().__init__(client_type="events", *args, **kwargs)
+
+    def put_rule(
+        self,
+        name: str,
+        description: str | None = None,
+        event_bus_name: str | None = None,
+        event_pattern: str | None = None,
+        role_arn: str | None = None,
+        schedule_expression: str | None = None,
+        state: str | None = None,
+        tags: list[dict] | None = None,
+        **kwargs,
+    ):
+        """
+        Create or update an EventBridge rule.
+
+        :param name: name of the rule to create or update (required)
+        :param description: description of the rule
+        :param event_bus_name: name or ARN of the event bus to associate with 
this rule
+        :param event_pattern: pattern of events to be matched to this rule
+        :param role_arn: the Amazon Resource Name of the IAM role associated 
with the rule
+        :param schedule_expression: the scheduling expression (for example, a 
cron or rate expression)
+        :param state: indicates whether rule is set to be "ENABLED" or 
"DISABLED"
+        :param tags: list of key-value pairs to associate with the rule
+
+        """
+        if not (event_pattern or schedule_expression):
+            raise ValueError(
+                "One of `event_pattern` or `schedule_expression` are required 
in order to "
+                "put or update your rule."
+            )
+
+        if state and state not in ["ENABLED", "DISABLED"]:
+            raise ValueError("`state` must be specified as ENABLED or 
DISABLED.")
+
+        if event_pattern:
+            _validate_json(event_pattern)
+
+        put_rule_kwargs: dict[str, str | list] = {
+            **trim_none_values(
+                {
+                    "Name": name,
+                    "Description": description,
+                    "EventBusName": event_bus_name,
+                    "EventPattern": event_pattern,
+                    "RoleArn": role_arn,
+                    "ScheduleExpression": schedule_expression,
+                    "State": state,
+                    "Tags": tags,
+                }
+            )
+        }
+
+        return self.conn.put_rule(**put_rule_kwargs)
diff --git a/airflow/providers/amazon/aws/operators/eventbridge.py 
b/airflow/providers/amazon/aws/operators/eventbridge.py
index ceeab782a7..5e5a8f15ce 100644
--- a/airflow/providers/amazon/aws/operators/eventbridge.py
+++ b/airflow/providers/amazon/aws/operators/eventbridge.py
@@ -32,6 +32,10 @@ class EventBridgePutEventsOperator(BaseOperator):
     """
     Put Events onto Amazon EventBridge.
 
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EventBridgePutEventsOperator`
+
     :param entries: the list of events to be put onto EventBridge, each event 
is a dict (required)
     :param endpoint_id: the URL subdomain of the endpoint
     :param aws_conn_id: the AWS connection to use
@@ -85,3 +89,84 @@ class EventBridgePutEventsOperator(BaseOperator):
 
         if self.do_xcom_push:
             return [e["EventId"] for e in response["Entries"]]
+
+
+class EventBridgePutRuleOperator(BaseOperator):
+    """
+    Create or update a specified EventBridge rule.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:EventBridgePutRuleOperator`
+
+    :param name: name of the rule to create or update (required)
+    :param description: description of the rule
+    :param event_bus_name: name or ARN of the event bus to associate with this 
rule
+    :param event_pattern: pattern of events to be matched to this rule
+    :param role_arn: the Amazon Resource Name of the IAM role associated with 
the rule
+    :param schedule_expression: the scheduling expression (for example, a cron 
or rate expression)
+    :param state: indicates whether rule is set to be "ENABLED" or "DISABLED"
+    :param tags: list of key-value pairs to associate with the rule
+    :param region: the region where rule is to be created or updated
+
+    """
+
+    template_fields: Sequence[str] = (
+        "aws_conn_id",
+        "name",
+        "description",
+        "event_bus_name",
+        "event_pattern",
+        "role_arn",
+        "schedule_expression",
+        "state",
+        "tags",
+        "region_name",
+    )
+
+    def __init__(
+        self,
+        *,
+        name: str,
+        description: str | None = None,
+        event_bus_name: str | None = None,
+        event_pattern: str | None = None,
+        role_arn: str | None = None,
+        schedule_expression: str | None = None,
+        state: str | None = None,
+        tags: list | None = None,
+        region_name: str | None = None,
+        aws_conn_id: str = "aws_default",
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.name = name
+        self.description = description
+        self.event_bus_name = event_bus_name
+        self.event_pattern = event_pattern
+        self.role_arn = role_arn
+        self.region_name = region_name
+        self.schedule_expression = schedule_expression
+        self.state = state
+        self.tags = tags
+        self.aws_conn_id = aws_conn_id
+
+    @cached_property
+    def hook(self) -> EventBridgeHook:
+        """Create and return an EventBridgeHook."""
+        return EventBridgeHook(aws_conn_id=self.aws_conn_id, 
region_name=self.region_name)
+
+    def execute(self, context: Context):
+
+        self.log.info('Sending rule "%s" to EventBridge.', self.name)
+
+        return self.hook.put_rule(
+            name=self.name,
+            description=self.description,
+            event_bus_name=self.event_bus_name,
+            event_pattern=self.event_pattern,
+            role_arn=self.role_arn,
+            schedule_expression=self.schedule_expression,
+            state=self.state,
+            tags=self.tags,
+        )
diff --git a/docs/apache-airflow-providers-amazon/operators/eventbridge.rst 
b/docs/apache-airflow-providers-amazon/operators/eventbridge.rst
index ce0c449f05..0e6a04d49e 100644
--- a/docs/apache-airflow-providers-amazon/operators/eventbridge.rst
+++ b/docs/apache-airflow-providers-amazon/operators/eventbridge.rst
@@ -15,9 +15,9 @@
     specific language governing permissions and limitations
     under the License.
 
-========================================
+==================
 Amazon EventBridge
-========================================
+==================
 
 `Amazon Eventbridge <https://docs.aws.amazon.com/eventbridge/>`__ is a 
serverless event bus service that makes it easy
 to connect your applications with data from a variety of sources. EventBridge 
delivers a stream of real-time data from
@@ -34,8 +34,11 @@ Prerequisite Tasks
 Operators
 ---------
 
+.. _howto/operator:EventBridgePutEventsOperator:
+
+
 Send events to Amazon EventBridge
-==========================================
+=================================
 
 To send custom events to Amazon EventBridge, use
 
:class:`~airflow.providers.amazon.aws.operators.eventbridge.EventBridgePutEventsOperator`.
@@ -46,6 +49,21 @@ To send custom events to Amazon EventBridge, use
     :start-after: [START howto_operator_eventbridge_put_events]
     :end-before: [END howto_operator_eventbridge_put_events]
 
+.. _howto/operator:EventBridgePutRuleOperator:
+
+
+Create or update a rule on Amazon EventBridge
+======================================================
+
+To create or update a rule on EventBridge, use
+:class:`~airflow.providers.amazon.aws.operators.eventbridge.EventBridgePutRuleOperator`.
+
+.. exampleinclude:: 
/../../tests/system/providers/amazon/aws/example_eventbridge.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_eventbridge_put_rule]
+    :end-before: [END howto_operator_eventbridge_put_rule]
+
 
 Reference
 ---------
diff --git a/tests/providers/amazon/aws/hooks/test_eventbridge.py 
b/tests/providers/amazon/aws/hooks/test_eventbridge.py
index 680686854e..1693f56811 100644
--- a/tests/providers/amazon/aws/hooks/test_eventbridge.py
+++ b/tests/providers/amazon/aws/hooks/test_eventbridge.py
@@ -16,6 +16,7 @@
 # under the License.
 from __future__ import annotations
 
+import pytest
 from moto import mock_events
 
 from airflow.providers.amazon.aws.hooks.eventbridge import EventBridgeHook
@@ -26,3 +27,13 @@ class TestEventBridgeHook:
     def test_conn_returns_a_boto3_connection(self):
         hook = EventBridgeHook(aws_conn_id="aws_default")
         assert hook.conn is not None
+
+    def test_put_rule(self):
+        hook = EventBridgeHook(aws_conn_id="aws_default")
+        response = hook.put_rule(name="test", event_pattern='{"source": 
["aws.s3"]}', state="ENABLED")
+        assert "RuleArn" in response
+
+    def test_put_rule_with_bad_json_fails(self):
+        hook = EventBridgeHook(aws_conn_id="aws_default")
+        with pytest.raises(ValueError):
+            hook.put_rule(name="test", event_pattern="invalid json", 
state="ENABLED")
diff --git a/tests/providers/amazon/aws/operators/test_eventbridge.py 
b/tests/providers/amazon/aws/operators/test_eventbridge.py
index b09a1817b9..c6a5b7e916 100644
--- a/tests/providers/amazon/aws/operators/test_eventbridge.py
+++ b/tests/providers/amazon/aws/operators/test_eventbridge.py
@@ -23,17 +23,20 @@ import pytest
 
 from airflow import AirflowException
 from airflow.providers.amazon.aws.hooks.eventbridge import EventBridgeHook
-from airflow.providers.amazon.aws.operators.eventbridge import 
EventBridgePutEventsOperator
+from airflow.providers.amazon.aws.operators.eventbridge import (
+    EventBridgePutEventsOperator,
+    EventBridgePutRuleOperator,
+)
 
-TASK_ID = "events_putevents_job"
 ENTRIES = [{"Detail": "test-detail", "Source": "test-source", "DetailType": 
"test-detail-type"}]
 FAILED_ENTRIES_RESPONSE = [{"ErrorCode": "test_code"}, {"ErrorCode": 
"test_code"}]
+EVENT_PATTERN = '{"source": ["aws.s3"]}'
 
 
 class TestEventBridgePutEventsOperator:
     def test_init(self):
         operator = EventBridgePutEventsOperator(
-            task_id=TASK_ID,
+            task_id="put_events_job",
             entries=ENTRIES,
         )
 
@@ -46,7 +49,7 @@ class TestEventBridgePutEventsOperator:
         mock_conn.put_events.return_value = hook_response
 
         operator = EventBridgePutEventsOperator(
-            task_id=TASK_ID,
+            task_id="put_events_job",
             entries=ENTRIES,
         )
 
@@ -56,7 +59,6 @@ class TestEventBridgePutEventsOperator:
 
     @mock.patch.object(EventBridgeHook, "conn")
     def test_failed_to_send(self, mock_conn: MagicMock):
-
         hook_response = {
             "FailedEntryCount": 1,
             "Entries": FAILED_ENTRIES_RESPONSE,
@@ -65,9 +67,39 @@ class TestEventBridgePutEventsOperator:
         mock_conn.put_events.return_value = hook_response
 
         operator = EventBridgePutEventsOperator(
-            task_id=TASK_ID,
+            task_id="failed_put_events_job",
             entries=ENTRIES,
         )
 
         with pytest.raises(AirflowException):
             operator.execute(None)
+
+
+class TestEventBridgePutRuleOperator:
+    def test_init(self):
+        operator = EventBridgePutRuleOperator(
+            task_id="events_put_rule_job", name="match_s3_events", 
event_pattern=EVENT_PATTERN
+        )
+
+        assert operator.event_pattern == EVENT_PATTERN
+
+    @mock.patch.object(EventBridgeHook, "conn")
+    def test_execute(self, mock_conn: MagicMock):
+        hook_response = {"RuleArn": 
"arn:aws:events:us-east-1:123456789012:rule/test"}
+        mock_conn.put_rule.return_value = hook_response
+
+        operator = EventBridgePutRuleOperator(
+            task_id="events_put_rule_job", name="match_s3_events", 
event_pattern=EVENT_PATTERN
+        )
+
+        result = operator.execute(None)
+
+        assert result == hook_response
+
+    def test_put_rule_with_bad_json_fails(self):
+        operator = EventBridgePutRuleOperator(
+            task_id="failed_put_rule_job", name="match_s3_events", 
event_pattern="invalid json"
+        )
+
+        with pytest.raises(ValueError):
+            operator.execute(None)
diff --git a/tests/system/providers/amazon/aws/example_eventbridge.py 
b/tests/system/providers/amazon/aws/example_eventbridge.py
index f6208108c9..ed68be3bd1 100644
--- a/tests/system/providers/amazon/aws/example_eventbridge.py
+++ b/tests/system/providers/amazon/aws/example_eventbridge.py
@@ -19,7 +19,12 @@ from __future__ import annotations
 from datetime import datetime
 
 from airflow import DAG
-from airflow.providers.amazon.aws.operators.eventbridge import 
EventBridgePutEventsOperator
+from airflow.models.baseoperator import chain
+from airflow.providers.amazon.aws.operators.eventbridge import (
+    EventBridgePutEventsOperator,
+    EventBridgePutRuleOperator,
+)
+from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, 
SystemTestContextBuilder
 
 DAG_ID = "example_eventbridge"
 ENTRIES = [
@@ -31,6 +36,8 @@ ENTRIES = [
     }
 ]
 
+sys_test_context_task = SystemTestContextBuilder().build()
+
 with DAG(
     dag_id=DAG_ID,
     schedule="@once",
@@ -38,13 +45,25 @@ with DAG(
     tags=["example"],
     catchup=False,
 ) as dag:
+    test_context = sys_test_context_task()
 
-    # [START howto_operator_eventbridge_put_events]
+    env_id = test_context[ENV_ID_KEY]
 
+    # [START howto_operator_eventbridge_put_events]
     put_events = EventBridgePutEventsOperator(task_id="put_events_task", 
entries=ENTRIES)
-
     # [END howto_operator_eventbridge_put_events]
 
+    # [START howto_operator_eventbridge_put_rule]
+    put_rule = EventBridgePutRuleOperator(
+        task_id="put_rule_task",
+        name="Example Rule",
+        event_pattern='{"source": ["example.myapp"]}',
+        description="This rule matches events from example.myapp.",
+    )
+    # [END howto_operator_eventbridge_put_rule]
+
+    chain(test_context, put_events, put_rule)
+
 
 from tests.system.utils import get_test_run  # noqa: E402
 

Reply via email to