This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6443b13435c feat: add async pagerduty notifier (#56540)
6443b13435c is described below

commit 6443b13435c817679c3d6dda7cd7d69cd282ea4f
Author: Sebastian Daum <[email protected]>
AuthorDate: Tue Oct 14 13:03:32 2025 +0200

    feat: add async pagerduty notifier (#56540)
---
 dev/breeze/tests/test_selective_checks.py          |  11 +-
 providers/pagerduty/pyproject.toml                 |   2 +
 .../providers/pagerduty/hooks/pagerduty_events.py  | 242 ++++++++++++++++-----
 .../providers/pagerduty/notifications/pagerduty.py |  33 ++-
 .../unit/pagerduty/hooks/test_pagerduty_events.py  |  85 +++++++-
 .../unit/pagerduty/notifications/test_pagerduty.py |  24 ++
 6 files changed, 338 insertions(+), 59 deletions(-)

diff --git a/dev/breeze/tests/test_selective_checks.py 
b/dev/breeze/tests/test_selective_checks.py
index a972f9d11ec..697b9ade50a 100644
--- a/dev/breeze/tests/test_selective_checks.py
+++ b/dev/breeze/tests/test_selective_checks.py
@@ -687,7 +687,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, 
str], stderr: str):
                     "providers/http/tests/file.py",
                 ),
                 {
-                    "selected-providers-list-as-string": "amazon apache.livy 
dbt.cloud dingding discord google http",
+                    "selected-providers-list-as-string": "amazon apache.livy 
dbt.cloud dingding discord google http pagerduty",
                     "all-python-versions": 
f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']",
                     "all-python-versions-list-as-string": 
DEFAULT_PYTHON_MAJOR_MINOR_VERSION,
                     "python-versions": 
f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']",
@@ -708,7 +708,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, 
str], stderr: str):
                         [
                             {
                                 "description": "amazon...google",
-                                "test_types": "Providers[amazon] 
Providers[apache.livy,dbt.cloud,dingding,discord,http] Providers[google]",
+                                "test_types": "Providers[amazon] 
Providers[apache.livy,dbt.cloud,dingding,discord,http,pagerduty] 
Providers[google]",
                             }
                         ]
                     ),
@@ -722,9 +722,12 @@ def assert_outputs_are_printed(expected_outputs: dict[str, 
str], stderr: str):
                                 "description": "dbt.cloud...dingding",
                                 "test_types": "Providers[dbt.cloud] 
Providers[dingding]",
                             },
-                            {"description": "discord", "test_types": 
"Providers[discord]"},
-                            {"description": "google", "test_types": 
"Providers[google]"},
+                            {
+                                "description": "discord...google",
+                                "test_types": "Providers[discord] 
Providers[google]",
+                            },
                             {"description": "http", "test_types": 
"Providers[http]"},
+                            {"description": "pagerduty", "test_types": 
"Providers[pagerduty]"},
                         ]
                     ),
                     "run-mypy": "true",
diff --git a/providers/pagerduty/pyproject.toml 
b/providers/pagerduty/pyproject.toml
index 4507c32c242..bb97ac77555 100644
--- a/providers/pagerduty/pyproject.toml
+++ b/providers/pagerduty/pyproject.toml
@@ -59,6 +59,7 @@ requires-python = ">=3.10"
 dependencies = [
     "apache-airflow>=2.10.0",
     "apache-airflow-providers-common-compat>=1.6.1",
+    "apache-airflow-providers-http",
     "pagerduty>=2.3.0",
 ]
 
@@ -68,6 +69,7 @@ dev = [
     "apache-airflow-task-sdk",
     "apache-airflow-devel-common",
     "apache-airflow-providers-common-compat",
+    "apache-airflow-providers-http",
     # Additional devel dependencies (do not remove this line and add extra 
development dependencies)
 ]
 
diff --git 
a/providers/pagerduty/src/airflow/providers/pagerduty/hooks/pagerduty_events.py 
b/providers/pagerduty/src/airflow/providers/pagerduty/hooks/pagerduty_events.py
index ef21db4b8a3..2ffa03e9e52 100644
--- 
a/providers/pagerduty/src/airflow/providers/pagerduty/hooks/pagerduty_events.py
+++ 
b/providers/pagerduty/src/airflow/providers/pagerduty/hooks/pagerduty_events.py
@@ -21,15 +21,68 @@ from __future__ import annotations
 
 from typing import TYPE_CHECKING, Any
 
+import aiohttp
 import pagerduty
+from asgiref.sync import sync_to_async
 
 from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpAsyncHook
 from airflow.providers.pagerduty.version_compat import BaseHook
 
 if TYPE_CHECKING:
     from datetime import datetime
 
 
+def prepare_event_data(
+    summary: str,
+    severity: str,
+    source: str = "airflow",
+    custom_details: Any | None = None,
+    component: str | None = None,
+    group: str | None = None,
+    class_type: str | None = None,
+    action: str = "trigger",
+    dedup_key: str | None = None,
+    images: list[Any] | None = None,
+    links: list[Any] | None = None,
+    action_key_name: str = "action",
+) -> dict[str, Any]:
+    """Prepare event data for send_event / post('/v2/enqueue') method."""
+    payload = {
+        "summary": summary,
+        "severity": severity,
+        "source": source,
+    }
+    if custom_details is not None:
+        payload["custom_details"] = custom_details
+    if component:
+        payload["component"] = component
+    if group:
+        payload["group"] = group
+    if class_type:
+        payload["class"] = class_type
+
+    actions = ("trigger", "acknowledge", "resolve")
+    if action not in actions:
+        raise ValueError(f"Event action must be one of: {', '.join(actions)}")
+    data = {
+        action_key_name: action,
+        "payload": payload,
+    }
+    if dedup_key:
+        data["dedup_key"] = dedup_key
+    elif action != "trigger":
+        raise ValueError(
+            f"The dedup_key property is required for 
{action_key_name}={action} events,"
+            f" and it must be a string."
+        )
+    if images is not None:
+        data["images"] = images
+    if links is not None:
+        data["links"] = links
+    return data
+
+
 class PagerdutyEventsHook(BaseHook):
     """
     This class can be used to interact with the Pagerduty Events API.
@@ -120,7 +173,7 @@ class PagerdutyEventsHook(BaseHook):
             link's text.
         :return: PagerDuty Events API v2 response.
         """
-        data = PagerdutyEventsHook.prepare_event_data(
+        data = prepare_event_data(
             summary=summary,
             severity=severity,
             source=source,
@@ -137,57 +190,6 @@ class PagerdutyEventsHook(BaseHook):
         client = pagerduty.EventsApiV2Client(self.integration_key)
         return client.send_event(**data)
 
-    @staticmethod
-    def prepare_event_data(
-        summary,
-        severity,
-        source,
-        custom_details,
-        component,
-        group,
-        class_type,
-        action,
-        dedup_key,
-        images,
-        links,
-        action_key_name: str = "action",
-    ) -> dict:
-        """Prepare event data for send_event / post('/v2/enqueue') method."""
-        payload = {
-            "summary": summary,
-            "severity": severity,
-            "source": source,
-        }
-        if custom_details is not None:
-            payload["custom_details"] = custom_details
-        if component:
-            payload["component"] = component
-        if group:
-            payload["group"] = group
-        if class_type:
-            payload["class"] = class_type
-
-        actions = ("trigger", "acknowledge", "resolve")
-        if action not in actions:
-            raise ValueError(f"Event action must be one of: {', 
'.join(actions)}")
-        data = {
-            action_key_name: action,
-            "payload": payload,
-        }
-        if dedup_key:
-            data["dedup_key"] = dedup_key
-        elif action != "trigger":
-            raise ValueError(
-                f"The dedup_key property is required for 
{action_key_name}={action} events,"
-                f" and it must be a string."
-            )
-        if images is not None:
-            data["images"] = images
-        if links is not None:
-            data["links"] = links
-
-        return data
-
     def create_change_event(
         self,
         summary: str,
@@ -237,3 +239,139 @@ class PagerdutyEventsHook(BaseHook):
         except Exception:
             return False, "connection test failed, invalid routing key"
         return True, "connection tested successfully"
+
+
+class PagerdutyEventsAsyncHook(HttpAsyncHook):
+    """
+    This class can be used to interact with the Pagerduty Events API via async 
http.
+
+    Documentation on how to use the PagerDuty Events API can be found at:
+        https://developer.pagerduty.com/docs/events-api-v2-overview
+
+    It takes both an Events API token and a PagerDuty connection with the 
Events API token
+     (i.e. Integration key) as the password/Pagerduty API token. If both 
supplied, the token will be used.
+
+    :param integration_key: PagerDuty Events API token
+    :param pagerduty_events_conn_id: connection that has PagerDuty integration 
key in the Pagerduty
+     API token field
+    :param api_version: api version to use
+    """
+
+    default_headers = {
+        "Content-Type": "application/json",
+        "Accept": "application/json",
+    }
+    conn_name_attr = "pagerduty_events_conn_id"
+    default_conn_name = "pagerduty_events_default"
+    conn_type = "pagerduty_events"
+    hook_name = "Async Pagerduty Events"
+
+    def __init__(
+        self,
+        integration_key: str | None = None,
+        pagerduty_events_conn_id: str | None = None,
+        api_version: str = "v2",
+    ) -> None:
+        super().__init__()
+        self.integration_key = integration_key
+        self.pagerduty_events_conn_id = pagerduty_events_conn_id
+        self.api_version = api_version
+        self.method = "POST"
+        self.base_url: str = "https://events.pagerduty.com";
+        self.http_conn_id = ""
+
+    async def get_integration_key(self) -> str:
+        """Get integration key from the connection."""
+        if self.integration_key:
+            return self.integration_key
+
+        if self.pagerduty_events_conn_id is not None:
+            conn = await 
sync_to_async(self.get_connection)(self.pagerduty_events_conn_id)
+            self.integration_key = conn.password
+            if self.integration_key:
+                return self.integration_key
+
+        raise AirflowException(
+            "Cannot get integration key: No valid integration key nor 
pagerduty_events_conn_id supplied."
+        )
+
+    async def send_event(
+        self,
+        summary: str,
+        severity: str,
+        source: str = "airflow",
+        action: str = "trigger",
+        dedup_key: str | None = None,
+        custom_details: Any | None = None,
+        group: str | None = None,
+        component: str | None = None,
+        class_type: str | None = None,
+        images: list[Any] | None = None,
+        links: list[Any] | None = None,
+    ) -> str:
+        """
+        Create event for service integration.
+
+        :param summary: Summary for the event
+        :param severity: Severity for the event, needs to be one of: info, 
warning, error, critical
+        :param source: Specific human-readable unique identifier, such as a
+            hostname, for the system having the problem.
+        :param action: Event action, needs to be one of: trigger, acknowledge,
+            resolve. Default to trigger if not specified.
+        :param dedup_key: A string which identifies the alert triggered for 
the given event.
+            Required for the actions acknowledge and resolve.
+        :param custom_details: Free-form details from the event. Can be a 
dictionary or a string.
+            If a dictionary is passed it will show up in PagerDuty as a table.
+        :param group: A cluster or grouping of sources. For example, sources
+            "prod-datapipe-02" and "prod-datapipe-03" might both be part of 
"prod-datapipe"
+        :param component: The part or component of the affected system that is 
broken.
+        :param class_type: The class/type of the event.
+        :param images: List of images to include. Each dictionary in the list 
accepts the following keys:
+            `src`: The source (URL) of the image being attached to the 
incident. This image must be served via
+            HTTPS.
+            `href`: [Optional] URL to make the image a clickable link.
+            `alt`: [Optional] Alternative text for the image.
+        :param links: List of links to include. Each dictionary in the list 
accepts the following keys:
+            `href`: URL of the link to be attached.
+            `text`: [Optional] Plain text that describes the purpose of the 
link, and can be used as the
+            link's text.
+        :return: PagerDuty Events API response.
+        """
+        event = {"event_action": action}
+
+        integration_key = await self.get_integration_key()
+        # add routing key
+        event["routing_key"] = integration_key
+
+        data = prepare_event_data(
+            summary=summary,
+            severity=severity,
+            source=source,
+            custom_details=custom_details,
+            component=component,
+            group=group,
+            class_type=class_type,
+            action=action,
+            dedup_key=dedup_key,
+            images=images,
+            links=links,
+        )
+
+        event.update(data)
+
+        if isinstance(dedup_key, str):
+            event["dedup_key"] = dedup_key
+        elif not action == "trigger":
+            raise ValueError(
+                f"The dedup_key property is required forevent_action={action} 
events, and it must be a string."
+            )
+
+        async with aiohttp.ClientSession() as session:
+            res = await super().run(
+                session=session,
+                endpoint=f"{self.api_version}/enqueue",
+                json=event,
+                headers=self.default_headers,
+            )
+            res_body = await res.json()
+            return res_body["dedup_key"]
diff --git 
a/providers/pagerduty/src/airflow/providers/pagerduty/notifications/pagerduty.py
 
b/providers/pagerduty/src/airflow/providers/pagerduty/notifications/pagerduty.py
index b976cfff5dd..bd38e379346 100644
--- 
a/providers/pagerduty/src/airflow/providers/pagerduty/notifications/pagerduty.py
+++ 
b/providers/pagerduty/src/airflow/providers/pagerduty/notifications/pagerduty.py
@@ -21,7 +21,8 @@ from functools import cached_property
 from typing import Any
 
 from airflow.providers.common.compat.notifier import BaseNotifier
-from airflow.providers.pagerduty.hooks.pagerduty_events import 
PagerdutyEventsHook
+from airflow.providers.pagerduty.hooks.pagerduty_events import 
PagerdutyEventsAsyncHook, PagerdutyEventsHook
+from airflow.providers.pagerduty.version_compat import AIRFLOW_V_3_1_PLUS
 
 
 class PagerdutyNotifier(BaseNotifier):
@@ -86,8 +87,13 @@ class PagerdutyNotifier(BaseNotifier):
         links: list[Any] | None = None,
         pagerduty_events_conn_id: str | None = "pagerduty_events_default",
         integration_key: str | None = None,
+        **kwargs,
     ):
-        super().__init__()
+        if AIRFLOW_V_3_1_PLUS:
+            #  Support for passing context was added in 3.1.0
+            super().__init__(**kwargs)
+        else:
+            super().__init__()
         self.pagerduty_events_conn_id = pagerduty_events_conn_id
         self.integration_key = integration_key
         self.summary = summary
@@ -109,6 +115,13 @@ class PagerdutyNotifier(BaseNotifier):
             pagerduty_events_conn_id=self.pagerduty_events_conn_id, 
integration_key=self.integration_key
         )
 
+    @cached_property
+    def hook_async(self) -> PagerdutyEventsAsyncHook:
+        """Pagerduty Events Async Hook."""
+        return PagerdutyEventsAsyncHook(
+            pagerduty_events_conn_id=self.pagerduty_events_conn_id, 
integration_key=self.integration_key
+        )
+
     def notify(self, context):
         """Send a alert to a pagerduty event v2 API."""
         self.hook.send_event(
@@ -125,5 +138,21 @@ class PagerdutyNotifier(BaseNotifier):
             links=self.links,
         )
 
+    async def async_notify(self, context) -> None:
+        """Send a alert to a pagerduty event v2 API using async HTTP."""
+        await self.hook_async.send_event(
+            summary=self.summary,
+            severity=self.severity,
+            source=self.source,
+            action=self.action,
+            dedup_key=self.dedup_key,
+            custom_details=self.custom_details,
+            group=self.group,
+            component=self.component,
+            class_type=self.class_type,
+            images=self.images,
+            links=self.links,
+        )
+
 
 send_pagerduty_notification = PagerdutyNotifier
diff --git 
a/providers/pagerduty/tests/unit/pagerduty/hooks/test_pagerduty_events.py 
b/providers/pagerduty/tests/unit/pagerduty/hooks/test_pagerduty_events.py
index 8d1136900cc..dea0cc5f620 100644
--- a/providers/pagerduty/tests/unit/pagerduty/hooks/test_pagerduty_events.py
+++ b/providers/pagerduty/tests/unit/pagerduty/hooks/test_pagerduty_events.py
@@ -17,10 +17,17 @@
 # under the License.
 from __future__ import annotations
 
+from unittest import mock
+
 import pytest
+from aioresponses import aioresponses
 
 from airflow.models import Connection
-from airflow.providers.pagerduty.hooks.pagerduty_events import 
PagerdutyEventsHook
+from airflow.providers.pagerduty.hooks.pagerduty_events import (
+    PagerdutyEventsAsyncHook,
+    PagerdutyEventsHook,
+    prepare_event_data,
+)
 
 DEFAULT_CONN_ID = "pagerduty_events_default"
 
@@ -32,6 +39,40 @@ def events_connections(create_connection_without_db):
     )
 
 
[email protected]
+def aioresponse():
+    """
+    Creates mock async API response.
+    """
+    with aioresponses() as async_response:
+        yield async_response
+
+
+class TestPrepareEventData:
+    def test_prepare_event_data(self):
+        exp_event_data = {
+            "action": "trigger",
+            "dedup_key": "random",
+            "payload": {
+                "severity": "error",
+                "source": "airflow_test",
+                "summary": "test",
+            },
+        }
+        even_data = prepare_event_data(
+            summary="test", source="airflow_test", severity="error", 
dedup_key="random"
+        )
+        assert even_data == exp_event_data
+
+    def test_prepare_event_data_invalid_action(self):
+        with pytest.raises(ValueError):
+            prepare_event_data(summary="test", severity="error", 
action="should_raise")
+
+    def test_prepare_event_missing_dedup_key(self):
+        with pytest.raises(ValueError):
+            prepare_event_data(summary="test", severity="error", 
action="acknowledge")
+
+
 class TestPagerdutyEventsHook:
     def test_get_integration_key_from_password(self, events_connections):
         hook = PagerdutyEventsHook(pagerduty_events_conn_id=DEFAULT_CONN_ID)
@@ -62,3 +103,45 @@ class TestPagerdutyEventsHook:
         requests_mock.post("https://events.pagerduty.com/v2/enqueue";, 
json=mock_response_body)
         resp = hook.send_event(summary="test", source="airflow_test", 
severity="error", dedup_key=dedup_key)
         assert resp == dedup_key
+
+
+class TestPagerdutyEventsAsyncHook:
+    @pytest.mark.asyncio
+    async def test_get_integration_key_from_password(self, events_connections):
+        hook = 
PagerdutyEventsAsyncHook(pagerduty_events_conn_id=DEFAULT_CONN_ID)
+        integration_key = await hook.get_integration_key()
+        assert integration_key == "events_token", "token initialised."
+
+    @pytest.mark.asyncio
+    async def test_get_integration_key_parameter_override(self, 
events_connections):
+        hook = PagerdutyEventsAsyncHook(
+            integration_key="override_key", 
pagerduty_events_conn_id=DEFAULT_CONN_ID
+        )
+        integration_key = await hook.get_integration_key()
+        assert integration_key == "override_key", "token initialised."
+
+    @pytest.mark.asyncio
+    async def test_send_event_with_payload(self, events_connections, 
aioresponse):
+        hook = 
PagerdutyEventsAsyncHook(pagerduty_events_conn_id=DEFAULT_CONN_ID)
+
+        with mock.patch("aiohttp.ClientSession.post", 
new_callable=mock.AsyncMock) as mocked_function:
+            await hook.send_event(summary="test", source="airflow_test", 
severity="error", dedup_key="random")
+            assert mocked_function.call_args.kwargs.get("json") == {
+                "event_action": "trigger",
+                "routing_key": "events_token",
+                "action": "trigger",
+                "payload": {"summary": "test", "severity": "error", "source": 
"airflow_test"},
+                "dedup_key": "random",
+            }
+            assert mocked_function.call_args.kwargs.get("headers") == 
PagerdutyEventsAsyncHook.default_headers
+            assert mocked_function.call_args.kwargs.get("auth") is None
+
+    @pytest.mark.asyncio
+    async def test_send_event_with_success(self, events_connections, 
aioresponse):
+        hook = 
PagerdutyEventsAsyncHook(pagerduty_events_conn_id=DEFAULT_CONN_ID)
+        exp_response = {"dedup_key": "random"}
+        aioresponse.post("https://events.pagerduty.com/v2/enqueue";, 
status=200, payload=exp_response)
+        res = await hook.send_event(
+            summary="test", source="airflow_test", severity="error", 
dedup_key="random"
+        )
+        assert res == exp_response["dedup_key"]
diff --git 
a/providers/pagerduty/tests/unit/pagerduty/notifications/test_pagerduty.py 
b/providers/pagerduty/tests/unit/pagerduty/notifications/test_pagerduty.py
index 47394077a77..8f1dd65fb94 100644
--- a/providers/pagerduty/tests/unit/pagerduty/notifications/test_pagerduty.py
+++ b/providers/pagerduty/tests/unit/pagerduty/notifications/test_pagerduty.py
@@ -19,6 +19,8 @@ from __future__ import annotations
 
 from unittest import mock
 
+import pytest
+
 from airflow import DAG
 from airflow.providers.pagerduty.hooks.pagerduty_events import 
PagerdutyEventsHook
 from airflow.providers.pagerduty.notifications.pagerduty import (
@@ -107,3 +109,25 @@ class TestPagerdutyNotifier:
             images=None,
             links=None,
         )
+
+    @pytest.mark.asyncio
+    @mock.patch(
+        
"airflow.providers.pagerduty.notifications.pagerduty.PagerdutyEventsAsyncHook.send_event",
+        new_callable=mock.AsyncMock,
+    )
+    async def test_async_notifier(self, mock_async_hook, 
create_dag_without_db):
+        notifier = send_pagerduty_notification(summary="DISK at 99%", 
severity="critical", action="trigger")
+        await notifier.async_notify({"dag": 
create_dag_without_db("test_pagerduty_notifier")})
+        mock_async_hook.assert_called_once_with(
+            summary="DISK at 99%",
+            severity="critical",
+            action="trigger",
+            source="airflow",
+            class_type=None,
+            component=None,
+            custom_details=None,
+            group=None,
+            images=None,
+            links=None,
+            dedup_key=None,
+        )

Reply via email to