This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 196d336585 Add Amazon EventBridge PutRule hook and operator (#32869)
196d336585 is described below
commit 196d3365852452f1651e0091aad1451564da453b
Author: Maham Ali <[email protected]>
AuthorDate: Mon Jul 31 11:55:28 2023 -0700
Add Amazon EventBridge PutRule hook and operator (#32869)
---
airflow/providers/amazon/aws/hooks/eventbridge.py | 65 ++++++++++++++++-
.../providers/amazon/aws/operators/eventbridge.py | 85 ++++++++++++++++++++++
.../operators/eventbridge.rst | 24 +++++-
.../providers/amazon/aws/hooks/test_eventbridge.py | 11 +++
.../amazon/aws/operators/test_eventbridge.py | 44 +++++++++--
.../providers/amazon/aws/example_eventbridge.py | 25 ++++++-
6 files changed, 241 insertions(+), 13 deletions(-)
diff --git a/airflow/providers/amazon/aws/hooks/eventbridge.py
b/airflow/providers/amazon/aws/hooks/eventbridge.py
index d9450ae7e6..41c3de871f 100644
--- a/airflow/providers/amazon/aws/hooks/eventbridge.py
+++ b/airflow/providers/amazon/aws/hooks/eventbridge.py
@@ -16,12 +16,75 @@
# under the License.
from __future__ import annotations
+import json
+
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+from airflow.providers.amazon.aws.utils import trim_none_values
+
+
+def _validate_json(pattern: str) -> None:
+ try:
+ json.loads(pattern)
+ except ValueError:
+ raise ValueError("`event_pattern` must be a valid JSON string.")
class EventBridgeHook(AwsBaseHook):
"""Amazon EventBridge Hook."""
def __init__(self, *args, **kwargs):
- """Creating object."""
super().__init__(client_type="events", *args, **kwargs)
+
+ def put_rule(
+ self,
+ name: str,
+ description: str | None = None,
+ event_bus_name: str | None = None,
+ event_pattern: str | None = None,
+ role_arn: str | None = None,
+ schedule_expression: str | None = None,
+ state: str | None = None,
+ tags: list[dict] | None = None,
+ **kwargs,
+ ):
+ """
+ Create or update an EventBridge rule.
+
+ :param name: name of the rule to create or update (required)
+ :param description: description of the rule
+ :param event_bus_name: name or ARN of the event bus to associate with
this rule
+ :param event_pattern: pattern of events to be matched to this rule
+ :param role_arn: the Amazon Resource Name of the IAM role associated
with the rule
+ :param schedule_expression: the scheduling expression (for example, a
cron or rate expression)
+ :param state: indicates whether rule is set to be "ENABLED" or
"DISABLED"
+ :param tags: list of key-value pairs to associate with the rule
+
+ """
+ if not (event_pattern or schedule_expression):
+ raise ValueError(
+ "One of `event_pattern` or `schedule_expression` are required
in order to "
+ "put or update your rule."
+ )
+
+ if state and state not in ["ENABLED", "DISABLED"]:
+ raise ValueError("`state` must be specified as ENABLED or
DISABLED.")
+
+ if event_pattern:
+ _validate_json(event_pattern)
+
+ put_rule_kwargs: dict[str, str | list] = {
+ **trim_none_values(
+ {
+ "Name": name,
+ "Description": description,
+ "EventBusName": event_bus_name,
+ "EventPattern": event_pattern,
+ "RoleArn": role_arn,
+ "ScheduleExpression": schedule_expression,
+ "State": state,
+ "Tags": tags,
+ }
+ )
+ }
+
+ return self.conn.put_rule(**put_rule_kwargs)
diff --git a/airflow/providers/amazon/aws/operators/eventbridge.py
b/airflow/providers/amazon/aws/operators/eventbridge.py
index ceeab782a7..5e5a8f15ce 100644
--- a/airflow/providers/amazon/aws/operators/eventbridge.py
+++ b/airflow/providers/amazon/aws/operators/eventbridge.py
@@ -32,6 +32,10 @@ class EventBridgePutEventsOperator(BaseOperator):
"""
Put Events onto Amazon EventBridge.
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:EventBridgePutEventsOperator`
+
:param entries: the list of events to be put onto EventBridge, each event
is a dict (required)
:param endpoint_id: the URL subdomain of the endpoint
:param aws_conn_id: the AWS connection to use
@@ -85,3 +89,84 @@ class EventBridgePutEventsOperator(BaseOperator):
if self.do_xcom_push:
return [e["EventId"] for e in response["Entries"]]
+
+
+class EventBridgePutRuleOperator(BaseOperator):
+ """
+ Create or update a specified EventBridge rule.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:EventBridgePutRuleOperator`
+
+ :param name: name of the rule to create or update (required)
+ :param description: description of the rule
+ :param event_bus_name: name or ARN of the event bus to associate with this
rule
+ :param event_pattern: pattern of events to be matched to this rule
+ :param role_arn: the Amazon Resource Name of the IAM role associated with
the rule
+ :param schedule_expression: the scheduling expression (for example, a cron
or rate expression)
+ :param state: indicates whether rule is set to be "ENABLED" or "DISABLED"
+ :param tags: list of key-value pairs to associate with the rule
+ :param region: the region where rule is to be created or updated
+
+ """
+
+ template_fields: Sequence[str] = (
+ "aws_conn_id",
+ "name",
+ "description",
+ "event_bus_name",
+ "event_pattern",
+ "role_arn",
+ "schedule_expression",
+ "state",
+ "tags",
+ "region_name",
+ )
+
+ def __init__(
+ self,
+ *,
+ name: str,
+ description: str | None = None,
+ event_bus_name: str | None = None,
+ event_pattern: str | None = None,
+ role_arn: str | None = None,
+ schedule_expression: str | None = None,
+ state: str | None = None,
+ tags: list | None = None,
+ region_name: str | None = None,
+ aws_conn_id: str = "aws_default",
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+ self.name = name
+ self.description = description
+ self.event_bus_name = event_bus_name
+ self.event_pattern = event_pattern
+ self.role_arn = role_arn
+ self.region_name = region_name
+ self.schedule_expression = schedule_expression
+ self.state = state
+ self.tags = tags
+ self.aws_conn_id = aws_conn_id
+
+ @cached_property
+ def hook(self) -> EventBridgeHook:
+ """Create and return an EventBridgeHook."""
+ return EventBridgeHook(aws_conn_id=self.aws_conn_id,
region_name=self.region_name)
+
+ def execute(self, context: Context):
+
+ self.log.info('Sending rule "%s" to EventBridge.', self.name)
+
+ return self.hook.put_rule(
+ name=self.name,
+ description=self.description,
+ event_bus_name=self.event_bus_name,
+ event_pattern=self.event_pattern,
+ role_arn=self.role_arn,
+ schedule_expression=self.schedule_expression,
+ state=self.state,
+ tags=self.tags,
+ )
diff --git a/docs/apache-airflow-providers-amazon/operators/eventbridge.rst
b/docs/apache-airflow-providers-amazon/operators/eventbridge.rst
index ce0c449f05..0e6a04d49e 100644
--- a/docs/apache-airflow-providers-amazon/operators/eventbridge.rst
+++ b/docs/apache-airflow-providers-amazon/operators/eventbridge.rst
@@ -15,9 +15,9 @@
specific language governing permissions and limitations
under the License.
-========================================
+==================
Amazon EventBridge
-========================================
+==================
`Amazon Eventbridge <https://docs.aws.amazon.com/eventbridge/>`__ is a
serverless event bus service that makes it easy
to connect your applications with data from a variety of sources. EventBridge
delivers a stream of real-time data from
@@ -34,8 +34,11 @@ Prerequisite Tasks
Operators
---------
+.. _howto/operator:EventBridgePutEventsOperator:
+
+
Send events to Amazon EventBridge
-==========================================
+=================================
To send custom events to Amazon EventBridge, use
:class:`~airflow.providers.amazon.aws.operators.eventbridge.EventBridgePutEventsOperator`.
@@ -46,6 +49,21 @@ To send custom events to Amazon EventBridge, use
:start-after: [START howto_operator_eventbridge_put_events]
:end-before: [END howto_operator_eventbridge_put_events]
+.. _howto/operator:EventBridgePutRuleOperator:
+
+
+Create or update a rule on Amazon EventBridge
+======================================================
+
+To create or update a rule on EventBridge, use
+:class:`~airflow.providers.amazon.aws.operators.eventbridge.EventBridgePutRuleOperator`.
+
+.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_eventbridge.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_eventbridge_put_rule]
+ :end-before: [END howto_operator_eventbridge_put_rule]
+
Reference
---------
diff --git a/tests/providers/amazon/aws/hooks/test_eventbridge.py
b/tests/providers/amazon/aws/hooks/test_eventbridge.py
index 680686854e..1693f56811 100644
--- a/tests/providers/amazon/aws/hooks/test_eventbridge.py
+++ b/tests/providers/amazon/aws/hooks/test_eventbridge.py
@@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations
+import pytest
from moto import mock_events
from airflow.providers.amazon.aws.hooks.eventbridge import EventBridgeHook
@@ -26,3 +27,13 @@ class TestEventBridgeHook:
def test_conn_returns_a_boto3_connection(self):
hook = EventBridgeHook(aws_conn_id="aws_default")
assert hook.conn is not None
+
+ def test_put_rule(self):
+ hook = EventBridgeHook(aws_conn_id="aws_default")
+ response = hook.put_rule(name="test", event_pattern='{"source":
["aws.s3"]}', state="ENABLED")
+ assert "RuleArn" in response
+
+ def test_put_rule_with_bad_json_fails(self):
+ hook = EventBridgeHook(aws_conn_id="aws_default")
+ with pytest.raises(ValueError):
+ hook.put_rule(name="test", event_pattern="invalid json",
state="ENABLED")
diff --git a/tests/providers/amazon/aws/operators/test_eventbridge.py
b/tests/providers/amazon/aws/operators/test_eventbridge.py
index b09a1817b9..c6a5b7e916 100644
--- a/tests/providers/amazon/aws/operators/test_eventbridge.py
+++ b/tests/providers/amazon/aws/operators/test_eventbridge.py
@@ -23,17 +23,20 @@ import pytest
from airflow import AirflowException
from airflow.providers.amazon.aws.hooks.eventbridge import EventBridgeHook
-from airflow.providers.amazon.aws.operators.eventbridge import
EventBridgePutEventsOperator
+from airflow.providers.amazon.aws.operators.eventbridge import (
+ EventBridgePutEventsOperator,
+ EventBridgePutRuleOperator,
+)
-TASK_ID = "events_putevents_job"
ENTRIES = [{"Detail": "test-detail", "Source": "test-source", "DetailType":
"test-detail-type"}]
FAILED_ENTRIES_RESPONSE = [{"ErrorCode": "test_code"}, {"ErrorCode":
"test_code"}]
+EVENT_PATTERN = '{"source": ["aws.s3"]}'
class TestEventBridgePutEventsOperator:
def test_init(self):
operator = EventBridgePutEventsOperator(
- task_id=TASK_ID,
+ task_id="put_events_job",
entries=ENTRIES,
)
@@ -46,7 +49,7 @@ class TestEventBridgePutEventsOperator:
mock_conn.put_events.return_value = hook_response
operator = EventBridgePutEventsOperator(
- task_id=TASK_ID,
+ task_id="put_events_job",
entries=ENTRIES,
)
@@ -56,7 +59,6 @@ class TestEventBridgePutEventsOperator:
@mock.patch.object(EventBridgeHook, "conn")
def test_failed_to_send(self, mock_conn: MagicMock):
-
hook_response = {
"FailedEntryCount": 1,
"Entries": FAILED_ENTRIES_RESPONSE,
@@ -65,9 +67,39 @@ class TestEventBridgePutEventsOperator:
mock_conn.put_events.return_value = hook_response
operator = EventBridgePutEventsOperator(
- task_id=TASK_ID,
+ task_id="failed_put_events_job",
entries=ENTRIES,
)
with pytest.raises(AirflowException):
operator.execute(None)
+
+
+class TestEventBridgePutRuleOperator:
+ def test_init(self):
+ operator = EventBridgePutRuleOperator(
+ task_id="events_put_rule_job", name="match_s3_events",
event_pattern=EVENT_PATTERN
+ )
+
+ assert operator.event_pattern == EVENT_PATTERN
+
+ @mock.patch.object(EventBridgeHook, "conn")
+ def test_execute(self, mock_conn: MagicMock):
+ hook_response = {"RuleArn":
"arn:aws:events:us-east-1:123456789012:rule/test"}
+ mock_conn.put_rule.return_value = hook_response
+
+ operator = EventBridgePutRuleOperator(
+ task_id="events_put_rule_job", name="match_s3_events",
event_pattern=EVENT_PATTERN
+ )
+
+ result = operator.execute(None)
+
+ assert result == hook_response
+
+ def test_put_rule_with_bad_json_fails(self):
+ operator = EventBridgePutRuleOperator(
+ task_id="failed_put_rule_job", name="match_s3_events",
event_pattern="invalid json"
+ )
+
+ with pytest.raises(ValueError):
+ operator.execute(None)
diff --git a/tests/system/providers/amazon/aws/example_eventbridge.py
b/tests/system/providers/amazon/aws/example_eventbridge.py
index f6208108c9..ed68be3bd1 100644
--- a/tests/system/providers/amazon/aws/example_eventbridge.py
+++ b/tests/system/providers/amazon/aws/example_eventbridge.py
@@ -19,7 +19,12 @@ from __future__ import annotations
from datetime import datetime
from airflow import DAG
-from airflow.providers.amazon.aws.operators.eventbridge import
EventBridgePutEventsOperator
+from airflow.models.baseoperator import chain
+from airflow.providers.amazon.aws.operators.eventbridge import (
+ EventBridgePutEventsOperator,
+ EventBridgePutRuleOperator,
+)
+from tests.system.providers.amazon.aws.utils import ENV_ID_KEY,
SystemTestContextBuilder
DAG_ID = "example_eventbridge"
ENTRIES = [
@@ -31,6 +36,8 @@ ENTRIES = [
}
]
+sys_test_context_task = SystemTestContextBuilder().build()
+
with DAG(
dag_id=DAG_ID,
schedule="@once",
@@ -38,13 +45,25 @@ with DAG(
tags=["example"],
catchup=False,
) as dag:
+ test_context = sys_test_context_task()
- # [START howto_operator_eventbridge_put_events]
+ env_id = test_context[ENV_ID_KEY]
+ # [START howto_operator_eventbridge_put_events]
put_events = EventBridgePutEventsOperator(task_id="put_events_task",
entries=ENTRIES)
-
# [END howto_operator_eventbridge_put_events]
+ # [START howto_operator_eventbridge_put_rule]
+ put_rule = EventBridgePutRuleOperator(
+ task_id="put_rule_task",
+ name="Example Rule",
+ event_pattern='{"source": ["example.myapp"]}',
+ description="This rule matches events from example.myapp.",
+ )
+ # [END howto_operator_eventbridge_put_rule]
+
+ chain(test_context, put_events, put_rule)
+
from tests.system.utils import get_test_run # noqa: E402