This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new eea53a22c9 Add Eventbridge PutEvents operator and hook (#32498)
eea53a22c9 is described below
commit eea53a22c92bdcb8ef532e41d3a912f05b5736c0
Author: Maham Ali <[email protected]>
AuthorDate: Thu Jul 20 00:16:53 2023 -0700
Add Eventbridge PutEvents operator and hook (#32498)
* Add Eventbridge PutEvents operator and hook
---------
Co-authored-by: Maham Ali <[email protected]>
Co-authored-by: Raphaƫl Vandon <[email protected]>
---
airflow/providers/amazon/aws/hooks/eventbridge.py | 27 +++++++
.../providers/amazon/aws/operators/eventbridge.py | 87 +++++++++++++++++++++
airflow/providers/amazon/provider.yaml | 12 +++
.../operators/eventbridge.rst | 53 +++++++++++++
.../aws/Amazon-EventBridge_64.png | Bin 0 -> 3221 bytes
.../providers/amazon/aws/hooks/test_eventbridge.py | 28 +++++++
.../amazon/aws/operators/test_eventbridge.py | 73 +++++++++++++++++
.../providers/amazon/aws/example_eventbridge.py | 52 ++++++++++++
8 files changed, 332 insertions(+)
diff --git a/airflow/providers/amazon/aws/hooks/eventbridge.py
b/airflow/providers/amazon/aws/hooks/eventbridge.py
new file mode 100644
index 0000000000..d9450ae7e6
--- /dev/null
+++ b/airflow/providers/amazon/aws/hooks/eventbridge.py
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class EventBridgeHook(AwsBaseHook):
+ """Amazon EventBridge Hook."""
+
+ def __init__(self, *args, **kwargs):
+ """Creating object."""
+ super().__init__(client_type="events", *args, **kwargs)
diff --git a/airflow/providers/amazon/aws/operators/eventbridge.py
b/airflow/providers/amazon/aws/operators/eventbridge.py
new file mode 100644
index 0000000000..ceeab782a7
--- /dev/null
+++ b/airflow/providers/amazon/aws/operators/eventbridge.py
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from functools import cached_property
+from typing import TYPE_CHECKING, Sequence
+
+from airflow import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.eventbridge import EventBridgeHook
+from airflow.providers.amazon.aws.utils import trim_none_values
+
+if TYPE_CHECKING:
+ from airflow.utils.context import Context
+
+
+class EventBridgePutEventsOperator(BaseOperator):
+ """
+ Put Events onto Amazon EventBridge.
+
+ :param entries: the list of events to be put onto EventBridge, each event
is a dict (required)
+ :param endpoint_id: the URL subdomain of the endpoint
+ :param aws_conn_id: the AWS connection to use
+ :param region_name: the region where events are to be sent
+
+ """
+
+ template_fields: Sequence[str] = ("entries", "endpoint_id", "aws_conn_id",
"region_name")
+
+ def __init__(
+ self,
+ *,
+ entries: list[dict],
+ endpoint_id: str | None = None,
+ aws_conn_id: str = "aws_default",
+ region_name: str | None = None,
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+ self.entries = entries
+ self.endpoint_id = endpoint_id
+ self.aws_conn_id = aws_conn_id
+ self.region_name = region_name
+
+ @cached_property
+ def hook(self) -> EventBridgeHook:
+ """Create and return an EventBridgeHook."""
+ return EventBridgeHook(aws_conn_id=self.aws_conn_id,
region_name=self.region_name)
+
+ def execute(self, context: Context):
+
+ response = self.hook.conn.put_events(
+ **trim_none_values(
+ {
+ "Entries": self.entries,
+ "EndpointId": self.endpoint_id,
+ }
+ )
+ )
+
+ self.log.info("Sent %d events to EventBridge.", len(self.entries))
+
+ if response.get("FailedEntryCount"):
+ for event in response["Entries"]:
+ if "ErrorCode" in event:
+ self.log.error(event)
+
+ raise AirflowException(
+ f"{response['FailedEntryCount']} entries in this request have
failed to send."
+ )
+
+ if self.do_xcom_push:
+ return [e["EventId"] for e in response["Entries"]]
diff --git a/airflow/providers/amazon/provider.yaml
b/airflow/providers/amazon/provider.yaml
index e8ba23afc4..ff7ed91525 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -154,6 +154,12 @@ integrations:
- /docs/apache-airflow-providers-amazon/operators/emr/emr_serverless.rst
logo: /integration-logos/aws/[email protected]
tags: [aws]
+ - integration-name: Amazon EventBridge
+ external-doc-url:
https://docs.aws.amazon.com/eventbridge/latest/APIReference/Welcome.html
+ how-to-guide:
+ - /docs/apache-airflow-providers-amazon/operators/eventbridge.rst
+ logo: /integration-logos/aws/Amazon-EventBridge_64.png
+ tags: [aws]
- integration-name: Amazon Glacier
external-doc-url: https://aws.amazon.com/glacier/
logo: /integration-logos/aws/[email protected]
@@ -307,6 +313,9 @@ operators:
- integration-name: Amazon EMR on EKS
python-modules:
- airflow.providers.amazon.aws.operators.emr
+ - integration-name: Amazon EventBridge
+ python-modules:
+ - airflow.providers.amazon.aws.operators.eventbridge
- integration-name: Amazon Glacier
python-modules:
- airflow.providers.amazon.aws.operators.glacier
@@ -457,6 +466,9 @@ hooks:
- integration-name: Amazon EMR on EKS
python-modules:
- airflow.providers.amazon.aws.hooks.emr
+ - integration-name: Amazon EventBridge
+ python-modules:
+ - airflow.providers.amazon.aws.hooks.eventbridge
- integration-name: Amazon Glacier
python-modules:
- airflow.providers.amazon.aws.hooks.glacier
diff --git a/docs/apache-airflow-providers-amazon/operators/eventbridge.rst
b/docs/apache-airflow-providers-amazon/operators/eventbridge.rst
new file mode 100644
index 0000000000..ce0c449f05
--- /dev/null
+++ b/docs/apache-airflow-providers-amazon/operators/eventbridge.rst
@@ -0,0 +1,53 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+========================================
+Amazon EventBridge
+========================================
+
+`Amazon Eventbridge <https://docs.aws.amazon.com/eventbridge/>`__ is a
serverless event bus service that makes it easy
+to connect your applications with data from a variety of sources. EventBridge
delivers a stream of real-time data from
+your own applications, software-as-a-service (SaaS) applications, and AWS
services and routes that data to targets such
+as AWS Lambda. You can set up routing rules to determine where to send your
data to build application architectures
+that react in real time to all of your data sources. EventBridge enables you
to build event-driven architectures
+that are loosely coupled and distributed.
+
+Prerequisite Tasks
+------------------
+
+.. include:: ../_partials/prerequisite_tasks.rst
+
+Operators
+---------
+
+Send events to Amazon EventBridge
+==========================================
+
+To send custom events to Amazon EventBridge, use
+:class:`~airflow.providers.amazon.aws.operators.eventbridge.EventBridgePutEventsOperator`.
+
+.. exampleinclude::
/../../tests/system/providers/amazon/aws/example_eventbridge.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_eventbridge_put_events]
+ :end-before: [END howto_operator_eventbridge_put_events]
+
+
+Reference
+---------
+
+* `AWS boto3 library documentation for EventBridge
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/events.html>`__
diff --git a/docs/integration-logos/aws/Amazon-EventBridge_64.png
b/docs/integration-logos/aws/Amazon-EventBridge_64.png
new file mode 100644
index 0000000000..51085d2c26
Binary files /dev/null and
b/docs/integration-logos/aws/Amazon-EventBridge_64.png differ
diff --git a/tests/providers/amazon/aws/hooks/test_eventbridge.py
b/tests/providers/amazon/aws/hooks/test_eventbridge.py
new file mode 100644
index 0000000000..680686854e
--- /dev/null
+++ b/tests/providers/amazon/aws/hooks/test_eventbridge.py
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from moto import mock_events
+
+from airflow.providers.amazon.aws.hooks.eventbridge import EventBridgeHook
+
+
+@mock_events
+class TestEventBridgeHook:
+ def test_conn_returns_a_boto3_connection(self):
+ hook = EventBridgeHook(aws_conn_id="aws_default")
+ assert hook.conn is not None
diff --git a/tests/providers/amazon/aws/operators/test_eventbridge.py
b/tests/providers/amazon/aws/operators/test_eventbridge.py
new file mode 100644
index 0000000000..b09a1817b9
--- /dev/null
+++ b/tests/providers/amazon/aws/operators/test_eventbridge.py
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+from unittest.mock import MagicMock
+
+import pytest
+
+from airflow import AirflowException
+from airflow.providers.amazon.aws.hooks.eventbridge import EventBridgeHook
+from airflow.providers.amazon.aws.operators.eventbridge import
EventBridgePutEventsOperator
+
+TASK_ID = "events_putevents_job"
+ENTRIES = [{"Detail": "test-detail", "Source": "test-source", "DetailType":
"test-detail-type"}]
+FAILED_ENTRIES_RESPONSE = [{"ErrorCode": "test_code"}, {"ErrorCode":
"test_code"}]
+
+
+class TestEventBridgePutEventsOperator:
+ def test_init(self):
+ operator = EventBridgePutEventsOperator(
+ task_id=TASK_ID,
+ entries=ENTRIES,
+ )
+
+ assert operator.entries == ENTRIES
+
+ @mock.patch.object(EventBridgeHook, "conn")
+ def test_execute(self, mock_conn: MagicMock):
+ hook_response = {"FailedEntryCount": 0, "Entries": [{"EventId":
"foobar"}]}
+
+ mock_conn.put_events.return_value = hook_response
+
+ operator = EventBridgePutEventsOperator(
+ task_id=TASK_ID,
+ entries=ENTRIES,
+ )
+
+ result = operator.execute(None)
+
+ assert result == ["foobar"]
+
+ @mock.patch.object(EventBridgeHook, "conn")
+ def test_failed_to_send(self, mock_conn: MagicMock):
+
+ hook_response = {
+ "FailedEntryCount": 1,
+ "Entries": FAILED_ENTRIES_RESPONSE,
+ }
+
+ mock_conn.put_events.return_value = hook_response
+
+ operator = EventBridgePutEventsOperator(
+ task_id=TASK_ID,
+ entries=ENTRIES,
+ )
+
+ with pytest.raises(AirflowException):
+ operator.execute(None)
diff --git a/tests/system/providers/amazon/aws/example_eventbridge.py
b/tests/system/providers/amazon/aws/example_eventbridge.py
new file mode 100644
index 0000000000..f6208108c9
--- /dev/null
+++ b/tests/system/providers/amazon/aws/example_eventbridge.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime
+
+from airflow import DAG
+from airflow.providers.amazon.aws.operators.eventbridge import
EventBridgePutEventsOperator
+
+DAG_ID = "example_eventbridge"
+ENTRIES = [
+ {
+ "Detail": '{"event-name": "custom-event"}',
+ "EventBusName": "custom-bus",
+ "Source": "example.myapp",
+ "DetailType": "Sample Custom Event",
+ }
+]
+
+with DAG(
+ dag_id=DAG_ID,
+ schedule="@once",
+ start_date=datetime(2021, 1, 1),
+ tags=["example"],
+ catchup=False,
+) as dag:
+
+ # [START howto_operator_eventbridge_put_events]
+
+ put_events = EventBridgePutEventsOperator(task_id="put_events_task",
entries=ENTRIES)
+
+ # [END howto_operator_eventbridge_put_events]
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)