This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new eea53a22c9 Add Eventbridge PutEvents operator and hook (#32498)
eea53a22c9 is described below

commit eea53a22c92bdcb8ef532e41d3a912f05b5736c0
Author: Maham Ali <[email protected]>
AuthorDate: Thu Jul 20 00:16:53 2023 -0700

    Add Eventbridge PutEvents operator and hook (#32498)
    
    * Add Eventbridge PutEvents operator and hook
    
    ---------
    
    Co-authored-by: Maham Ali <[email protected]>
    Co-authored-by: RaphaĆ«l Vandon <[email protected]>
---
 airflow/providers/amazon/aws/hooks/eventbridge.py  |  27 +++++++
 .../providers/amazon/aws/operators/eventbridge.py  |  87 +++++++++++++++++++++
 airflow/providers/amazon/provider.yaml             |  12 +++
 .../operators/eventbridge.rst                      |  53 +++++++++++++
 .../aws/Amazon-EventBridge_64.png                  | Bin 0 -> 3221 bytes
 .../providers/amazon/aws/hooks/test_eventbridge.py |  28 +++++++
 .../amazon/aws/operators/test_eventbridge.py       |  73 +++++++++++++++++
 .../providers/amazon/aws/example_eventbridge.py    |  52 ++++++++++++
 8 files changed, 332 insertions(+)

diff --git a/airflow/providers/amazon/aws/hooks/eventbridge.py 
b/airflow/providers/amazon/aws/hooks/eventbridge.py
new file mode 100644
index 0000000000..d9450ae7e6
--- /dev/null
+++ b/airflow/providers/amazon/aws/hooks/eventbridge.py
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class EventBridgeHook(AwsBaseHook):
+    """Amazon EventBridge Hook."""
+
+    def __init__(self, *args, **kwargs):
+        """Creating object."""
+        super().__init__(client_type="events", *args, **kwargs)
diff --git a/airflow/providers/amazon/aws/operators/eventbridge.py 
b/airflow/providers/amazon/aws/operators/eventbridge.py
new file mode 100644
index 0000000000..ceeab782a7
--- /dev/null
+++ b/airflow/providers/amazon/aws/operators/eventbridge.py
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from functools import cached_property
+from typing import TYPE_CHECKING, Sequence
+
+from airflow import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.eventbridge import EventBridgeHook
+from airflow.providers.amazon.aws.utils import trim_none_values
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class EventBridgePutEventsOperator(BaseOperator):
+    """
+    Put Events onto Amazon EventBridge.
+
+    :param entries: the list of events to be put onto EventBridge, each event 
is a dict (required)
+    :param endpoint_id: the URL subdomain of the endpoint
+    :param aws_conn_id: the AWS connection to use
+    :param region_name: the region where events are to be sent
+
+    """
+
+    template_fields: Sequence[str] = ("entries", "endpoint_id", "aws_conn_id", 
"region_name")
+
+    def __init__(
+        self,
+        *,
+        entries: list[dict],
+        endpoint_id: str | None = None,
+        aws_conn_id: str = "aws_default",
+        region_name: str | None = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.entries = entries
+        self.endpoint_id = endpoint_id
+        self.aws_conn_id = aws_conn_id
+        self.region_name = region_name
+
+    @cached_property
+    def hook(self) -> EventBridgeHook:
+        """Create and return an EventBridgeHook."""
+        return EventBridgeHook(aws_conn_id=self.aws_conn_id, 
region_name=self.region_name)
+
+    def execute(self, context: Context):
+
+        response = self.hook.conn.put_events(
+            **trim_none_values(
+                {
+                    "Entries": self.entries,
+                    "EndpointId": self.endpoint_id,
+                }
+            )
+        )
+
+        self.log.info("Sent %d events to EventBridge.", len(self.entries))
+
+        if response.get("FailedEntryCount"):
+            for event in response["Entries"]:
+                if "ErrorCode" in event:
+                    self.log.error(event)
+
+            raise AirflowException(
+                f"{response['FailedEntryCount']} entries in this request have 
failed to send."
+            )
+
+        if self.do_xcom_push:
+            return [e["EventId"] for e in response["Entries"]]
diff --git a/airflow/providers/amazon/provider.yaml 
b/airflow/providers/amazon/provider.yaml
index e8ba23afc4..ff7ed91525 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -154,6 +154,12 @@ integrations:
       - /docs/apache-airflow-providers-amazon/operators/emr/emr_serverless.rst
     logo: /integration-logos/aws/[email protected]
     tags: [aws]
+  - integration-name: Amazon EventBridge
+    external-doc-url: 
https://docs.aws.amazon.com/eventbridge/latest/APIReference/Welcome.html
+    how-to-guide:
+      - /docs/apache-airflow-providers-amazon/operators/eventbridge.rst
+    logo: /integration-logos/aws/Amazon-EventBridge_64.png
+    tags: [aws]
   - integration-name: Amazon Glacier
     external-doc-url: https://aws.amazon.com/glacier/
     logo: /integration-logos/aws/[email protected]
@@ -307,6 +313,9 @@ operators:
   - integration-name: Amazon EMR on EKS
     python-modules:
       - airflow.providers.amazon.aws.operators.emr
+  - integration-name: Amazon EventBridge
+    python-modules:
+      - airflow.providers.amazon.aws.operators.eventbridge
   - integration-name: Amazon Glacier
     python-modules:
       - airflow.providers.amazon.aws.operators.glacier
@@ -457,6 +466,9 @@ hooks:
   - integration-name: Amazon EMR on EKS
     python-modules:
       - airflow.providers.amazon.aws.hooks.emr
+  - integration-name: Amazon EventBridge
+    python-modules:
+      - airflow.providers.amazon.aws.hooks.eventbridge
   - integration-name: Amazon Glacier
     python-modules:
       - airflow.providers.amazon.aws.hooks.glacier
diff --git a/docs/apache-airflow-providers-amazon/operators/eventbridge.rst 
b/docs/apache-airflow-providers-amazon/operators/eventbridge.rst
new file mode 100644
index 0000000000..ce0c449f05
--- /dev/null
+++ b/docs/apache-airflow-providers-amazon/operators/eventbridge.rst
@@ -0,0 +1,53 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+========================================
+Amazon EventBridge
+========================================
+
+`Amazon Eventbridge <https://docs.aws.amazon.com/eventbridge/>`__ is a 
serverless event bus service that makes it easy
+to connect your applications with data from a variety of sources. EventBridge 
delivers a stream of real-time data from
+your own applications, software-as-a-service (SaaS) applications, and AWS 
services and routes that data to targets such
+as AWS Lambda. You can set up routing rules to determine where to send your 
data to build application architectures
+that react in real time to all of your data sources. EventBridge enables you 
to build event-driven architectures
+that are loosely coupled and distributed.
+
+Prerequisite Tasks
+------------------
+
+.. include:: ../_partials/prerequisite_tasks.rst
+
+Operators
+---------
+
+Send events to Amazon EventBridge
+==========================================
+
+To send custom events to Amazon EventBridge, use
+:class:`~airflow.providers.amazon.aws.operators.eventbridge.EventBridgePutEventsOperator`.
+
+.. exampleinclude:: 
/../../tests/system/providers/amazon/aws/example_eventbridge.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_eventbridge_put_events]
+    :end-before: [END howto_operator_eventbridge_put_events]
+
+
+Reference
+---------
+
+* `AWS boto3 library documentation for EventBridge 
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/events.html>`__
diff --git a/docs/integration-logos/aws/Amazon-EventBridge_64.png 
b/docs/integration-logos/aws/Amazon-EventBridge_64.png
new file mode 100644
index 0000000000..51085d2c26
Binary files /dev/null and 
b/docs/integration-logos/aws/Amazon-EventBridge_64.png differ
diff --git a/tests/providers/amazon/aws/hooks/test_eventbridge.py 
b/tests/providers/amazon/aws/hooks/test_eventbridge.py
new file mode 100644
index 0000000000..680686854e
--- /dev/null
+++ b/tests/providers/amazon/aws/hooks/test_eventbridge.py
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from moto import mock_events
+
+from airflow.providers.amazon.aws.hooks.eventbridge import EventBridgeHook
+
+
+@mock_events
+class TestEventBridgeHook:
+    def test_conn_returns_a_boto3_connection(self):
+        hook = EventBridgeHook(aws_conn_id="aws_default")
+        assert hook.conn is not None
diff --git a/tests/providers/amazon/aws/operators/test_eventbridge.py 
b/tests/providers/amazon/aws/operators/test_eventbridge.py
new file mode 100644
index 0000000000..b09a1817b9
--- /dev/null
+++ b/tests/providers/amazon/aws/operators/test_eventbridge.py
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+from unittest.mock import MagicMock
+
+import pytest
+
+from airflow import AirflowException
+from airflow.providers.amazon.aws.hooks.eventbridge import EventBridgeHook
+from airflow.providers.amazon.aws.operators.eventbridge import 
EventBridgePutEventsOperator
+
+TASK_ID = "events_putevents_job"
+ENTRIES = [{"Detail": "test-detail", "Source": "test-source", "DetailType": 
"test-detail-type"}]
+FAILED_ENTRIES_RESPONSE = [{"ErrorCode": "test_code"}, {"ErrorCode": 
"test_code"}]
+
+
+class TestEventBridgePutEventsOperator:
+    def test_init(self):
+        operator = EventBridgePutEventsOperator(
+            task_id=TASK_ID,
+            entries=ENTRIES,
+        )
+
+        assert operator.entries == ENTRIES
+
+    @mock.patch.object(EventBridgeHook, "conn")
+    def test_execute(self, mock_conn: MagicMock):
+        hook_response = {"FailedEntryCount": 0, "Entries": [{"EventId": 
"foobar"}]}
+
+        mock_conn.put_events.return_value = hook_response
+
+        operator = EventBridgePutEventsOperator(
+            task_id=TASK_ID,
+            entries=ENTRIES,
+        )
+
+        result = operator.execute(None)
+
+        assert result == ["foobar"]
+
+    @mock.patch.object(EventBridgeHook, "conn")
+    def test_failed_to_send(self, mock_conn: MagicMock):
+
+        hook_response = {
+            "FailedEntryCount": 1,
+            "Entries": FAILED_ENTRIES_RESPONSE,
+        }
+
+        mock_conn.put_events.return_value = hook_response
+
+        operator = EventBridgePutEventsOperator(
+            task_id=TASK_ID,
+            entries=ENTRIES,
+        )
+
+        with pytest.raises(AirflowException):
+            operator.execute(None)
diff --git a/tests/system/providers/amazon/aws/example_eventbridge.py 
b/tests/system/providers/amazon/aws/example_eventbridge.py
new file mode 100644
index 0000000000..f6208108c9
--- /dev/null
+++ b/tests/system/providers/amazon/aws/example_eventbridge.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime
+
+from airflow import DAG
+from airflow.providers.amazon.aws.operators.eventbridge import 
EventBridgePutEventsOperator
+
+DAG_ID = "example_eventbridge"
+ENTRIES = [
+    {
+        "Detail": '{"event-name": "custom-event"}',
+        "EventBusName": "custom-bus",
+        "Source": "example.myapp",
+        "DetailType": "Sample Custom Event",
+    }
+]
+
+with DAG(
+    dag_id=DAG_ID,
+    schedule="@once",
+    start_date=datetime(2021, 1, 1),
+    tags=["example"],
+    catchup=False,
+) as dag:
+
+    # [START howto_operator_eventbridge_put_events]
+
+    put_events = EventBridgePutEventsOperator(task_id="put_events_task", 
entries=ENTRIES)
+
+    # [END howto_operator_eventbridge_put_events]
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)

Reply via email to