This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6443b13435c feat: add async pagerduty notifier (#56540)
6443b13435c is described below
commit 6443b13435c817679c3d6dda7cd7d69cd282ea4f
Author: Sebastian Daum <[email protected]>
AuthorDate: Tue Oct 14 13:03:32 2025 +0200
feat: add async pagerduty notifier (#56540)
---
dev/breeze/tests/test_selective_checks.py | 11 +-
providers/pagerduty/pyproject.toml | 2 +
.../providers/pagerduty/hooks/pagerduty_events.py | 242 ++++++++++++++++-----
.../providers/pagerduty/notifications/pagerduty.py | 33 ++-
.../unit/pagerduty/hooks/test_pagerduty_events.py | 85 +++++++-
.../unit/pagerduty/notifications/test_pagerduty.py | 24 ++
6 files changed, 338 insertions(+), 59 deletions(-)
diff --git a/dev/breeze/tests/test_selective_checks.py
b/dev/breeze/tests/test_selective_checks.py
index a972f9d11ec..697b9ade50a 100644
--- a/dev/breeze/tests/test_selective_checks.py
+++ b/dev/breeze/tests/test_selective_checks.py
@@ -687,7 +687,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str,
str], stderr: str):
"providers/http/tests/file.py",
),
{
- "selected-providers-list-as-string": "amazon apache.livy
dbt.cloud dingding discord google http",
+ "selected-providers-list-as-string": "amazon apache.livy
dbt.cloud dingding discord google http pagerduty",
"all-python-versions":
f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']",
"all-python-versions-list-as-string":
DEFAULT_PYTHON_MAJOR_MINOR_VERSION,
"python-versions":
f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']",
@@ -708,7 +708,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str,
str], stderr: str):
[
{
"description": "amazon...google",
- "test_types": "Providers[amazon]
Providers[apache.livy,dbt.cloud,dingding,discord,http] Providers[google]",
+ "test_types": "Providers[amazon]
Providers[apache.livy,dbt.cloud,dingding,discord,http,pagerduty]
Providers[google]",
}
]
),
@@ -722,9 +722,12 @@ def assert_outputs_are_printed(expected_outputs: dict[str,
str], stderr: str):
"description": "dbt.cloud...dingding",
"test_types": "Providers[dbt.cloud]
Providers[dingding]",
},
- {"description": "discord", "test_types":
"Providers[discord]"},
- {"description": "google", "test_types":
"Providers[google]"},
+ {
+ "description": "discord...google",
+ "test_types": "Providers[discord]
Providers[google]",
+ },
{"description": "http", "test_types":
"Providers[http]"},
+ {"description": "pagerduty", "test_types":
"Providers[pagerduty]"},
]
),
"run-mypy": "true",
diff --git a/providers/pagerduty/pyproject.toml
b/providers/pagerduty/pyproject.toml
index 4507c32c242..bb97ac77555 100644
--- a/providers/pagerduty/pyproject.toml
+++ b/providers/pagerduty/pyproject.toml
@@ -59,6 +59,7 @@ requires-python = ">=3.10"
dependencies = [
"apache-airflow>=2.10.0",
"apache-airflow-providers-common-compat>=1.6.1",
+ "apache-airflow-providers-http",
"pagerduty>=2.3.0",
]
@@ -68,6 +69,7 @@ dev = [
"apache-airflow-task-sdk",
"apache-airflow-devel-common",
"apache-airflow-providers-common-compat",
+ "apache-airflow-providers-http",
# Additional devel dependencies (do not remove this line and add extra
development dependencies)
]
diff --git
a/providers/pagerduty/src/airflow/providers/pagerduty/hooks/pagerduty_events.py
b/providers/pagerduty/src/airflow/providers/pagerduty/hooks/pagerduty_events.py
index ef21db4b8a3..2ffa03e9e52 100644
---
a/providers/pagerduty/src/airflow/providers/pagerduty/hooks/pagerduty_events.py
+++
b/providers/pagerduty/src/airflow/providers/pagerduty/hooks/pagerduty_events.py
@@ -21,15 +21,68 @@ from __future__ import annotations
from typing import TYPE_CHECKING, Any
+import aiohttp
import pagerduty
+from asgiref.sync import sync_to_async
from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpAsyncHook
from airflow.providers.pagerduty.version_compat import BaseHook
if TYPE_CHECKING:
from datetime import datetime
+def prepare_event_data(
+ summary: str,
+ severity: str,
+ source: str = "airflow",
+ custom_details: Any | None = None,
+ component: str | None = None,
+ group: str | None = None,
+ class_type: str | None = None,
+ action: str = "trigger",
+ dedup_key: str | None = None,
+ images: list[Any] | None = None,
+ links: list[Any] | None = None,
+ action_key_name: str = "action",
+) -> dict[str, Any]:
+ """Prepare event data for send_event / post('/v2/enqueue') method."""
+ payload = {
+ "summary": summary,
+ "severity": severity,
+ "source": source,
+ }
+ if custom_details is not None:
+ payload["custom_details"] = custom_details
+ if component:
+ payload["component"] = component
+ if group:
+ payload["group"] = group
+ if class_type:
+ payload["class"] = class_type
+
+ actions = ("trigger", "acknowledge", "resolve")
+ if action not in actions:
+ raise ValueError(f"Event action must be one of: {', '.join(actions)}")
+ data = {
+ action_key_name: action,
+ "payload": payload,
+ }
+ if dedup_key:
+ data["dedup_key"] = dedup_key
+ elif action != "trigger":
+ raise ValueError(
+ f"The dedup_key property is required for
{action_key_name}={action} events,"
+ f" and it must be a string."
+ )
+ if images is not None:
+ data["images"] = images
+ if links is not None:
+ data["links"] = links
+ return data
+
+
class PagerdutyEventsHook(BaseHook):
"""
This class can be used to interact with the Pagerduty Events API.
@@ -120,7 +173,7 @@ class PagerdutyEventsHook(BaseHook):
link's text.
:return: PagerDuty Events API v2 response.
"""
- data = PagerdutyEventsHook.prepare_event_data(
+ data = prepare_event_data(
summary=summary,
severity=severity,
source=source,
@@ -137,57 +190,6 @@ class PagerdutyEventsHook(BaseHook):
client = pagerduty.EventsApiV2Client(self.integration_key)
return client.send_event(**data)
- @staticmethod
- def prepare_event_data(
- summary,
- severity,
- source,
- custom_details,
- component,
- group,
- class_type,
- action,
- dedup_key,
- images,
- links,
- action_key_name: str = "action",
- ) -> dict:
- """Prepare event data for send_event / post('/v2/enqueue') method."""
- payload = {
- "summary": summary,
- "severity": severity,
- "source": source,
- }
- if custom_details is not None:
- payload["custom_details"] = custom_details
- if component:
- payload["component"] = component
- if group:
- payload["group"] = group
- if class_type:
- payload["class"] = class_type
-
- actions = ("trigger", "acknowledge", "resolve")
- if action not in actions:
- raise ValueError(f"Event action must be one of: {',
'.join(actions)}")
- data = {
- action_key_name: action,
- "payload": payload,
- }
- if dedup_key:
- data["dedup_key"] = dedup_key
- elif action != "trigger":
- raise ValueError(
- f"The dedup_key property is required for
{action_key_name}={action} events,"
- f" and it must be a string."
- )
- if images is not None:
- data["images"] = images
- if links is not None:
- data["links"] = links
-
- return data
-
def create_change_event(
self,
summary: str,
@@ -237,3 +239,139 @@ class PagerdutyEventsHook(BaseHook):
except Exception:
return False, "connection test failed, invalid routing key"
return True, "connection tested successfully"
+
+
+class PagerdutyEventsAsyncHook(HttpAsyncHook):
+ """
+ This class can be used to interact with the Pagerduty Events API via async
http.
+
+ Documentation on how to use the PagerDuty Events API can be found at:
+ https://developer.pagerduty.com/docs/events-api-v2-overview
+
+ It takes both an Events API token and a PagerDuty connection with the
Events API token
+ (i.e. Integration key) as the password/Pagerduty API token. If both
supplied, the token will be used.
+
+ :param integration_key: PagerDuty Events API token
+ :param pagerduty_events_conn_id: connection that has PagerDuty integration
key in the Pagerduty
+ API token field
+ :param api_version: api version to use
+ """
+
+ default_headers = {
+ "Content-Type": "application/json",
+ "Accept": "application/json",
+ }
+ conn_name_attr = "pagerduty_events_conn_id"
+ default_conn_name = "pagerduty_events_default"
+ conn_type = "pagerduty_events"
+ hook_name = "Async Pagerduty Events"
+
+ def __init__(
+ self,
+ integration_key: str | None = None,
+ pagerduty_events_conn_id: str | None = None,
+ api_version: str = "v2",
+ ) -> None:
+ super().__init__()
+ self.integration_key = integration_key
+ self.pagerduty_events_conn_id = pagerduty_events_conn_id
+ self.api_version = api_version
+ self.method = "POST"
+ self.base_url: str = "https://events.pagerduty.com"
+ self.http_conn_id = ""
+
+ async def get_integration_key(self) -> str:
+ """Get integration key from the connection."""
+ if self.integration_key:
+ return self.integration_key
+
+ if self.pagerduty_events_conn_id is not None:
+ conn = await
sync_to_async(self.get_connection)(self.pagerduty_events_conn_id)
+ self.integration_key = conn.password
+ if self.integration_key:
+ return self.integration_key
+
+ raise AirflowException(
+ "Cannot get integration key: No valid integration key nor
pagerduty_events_conn_id supplied."
+ )
+
+ async def send_event(
+ self,
+ summary: str,
+ severity: str,
+ source: str = "airflow",
+ action: str = "trigger",
+ dedup_key: str | None = None,
+ custom_details: Any | None = None,
+ group: str | None = None,
+ component: str | None = None,
+ class_type: str | None = None,
+ images: list[Any] | None = None,
+ links: list[Any] | None = None,
+ ) -> str:
+ """
+ Create event for service integration.
+
+ :param summary: Summary for the event
+ :param severity: Severity for the event, needs to be one of: info,
warning, error, critical
+ :param source: Specific human-readable unique identifier, such as a
+ hostname, for the system having the problem.
+ :param action: Event action, needs to be one of: trigger, acknowledge,
+ resolve. Default to trigger if not specified.
+ :param dedup_key: A string which identifies the alert triggered for
the given event.
+ Required for the actions acknowledge and resolve.
+ :param custom_details: Free-form details from the event. Can be a
dictionary or a string.
+ If a dictionary is passed it will show up in PagerDuty as a table.
+ :param group: A cluster or grouping of sources. For example, sources
+ "prod-datapipe-02" and "prod-datapipe-03" might both be part of
"prod-datapipe"
+ :param component: The part or component of the affected system that is
broken.
+ :param class_type: The class/type of the event.
+ :param images: List of images to include. Each dictionary in the list
accepts the following keys:
+ `src`: The source (URL) of the image being attached to the
incident. This image must be served via
+ HTTPS.
+ `href`: [Optional] URL to make the image a clickable link.
+ `alt`: [Optional] Alternative text for the image.
+ :param links: List of links to include. Each dictionary in the list
accepts the following keys:
+ `href`: URL of the link to be attached.
+ `text`: [Optional] Plain text that describes the purpose of the
link, and can be used as the
+ link's text.
+ :return: PagerDuty Events API response.
+ """
+ event = {"event_action": action}
+
+ integration_key = await self.get_integration_key()
+ # add routing key
+ event["routing_key"] = integration_key
+
+ data = prepare_event_data(
+ summary=summary,
+ severity=severity,
+ source=source,
+ custom_details=custom_details,
+ component=component,
+ group=group,
+ class_type=class_type,
+ action=action,
+ dedup_key=dedup_key,
+ images=images,
+ links=links,
+ )
+
+ event.update(data)
+
+ if isinstance(dedup_key, str):
+ event["dedup_key"] = dedup_key
+ elif not action == "trigger":
+ raise ValueError(
+ f"The dedup_key property is required forevent_action={action}
events, and it must be a string."
+ )
+
+ async with aiohttp.ClientSession() as session:
+ res = await super().run(
+ session=session,
+ endpoint=f"{self.api_version}/enqueue",
+ json=event,
+ headers=self.default_headers,
+ )
+ res_body = await res.json()
+ return res_body["dedup_key"]
diff --git
a/providers/pagerduty/src/airflow/providers/pagerduty/notifications/pagerduty.py
b/providers/pagerduty/src/airflow/providers/pagerduty/notifications/pagerduty.py
index b976cfff5dd..bd38e379346 100644
---
a/providers/pagerduty/src/airflow/providers/pagerduty/notifications/pagerduty.py
+++
b/providers/pagerduty/src/airflow/providers/pagerduty/notifications/pagerduty.py
@@ -21,7 +21,8 @@ from functools import cached_property
from typing import Any
from airflow.providers.common.compat.notifier import BaseNotifier
-from airflow.providers.pagerduty.hooks.pagerduty_events import
PagerdutyEventsHook
+from airflow.providers.pagerduty.hooks.pagerduty_events import
PagerdutyEventsAsyncHook, PagerdutyEventsHook
+from airflow.providers.pagerduty.version_compat import AIRFLOW_V_3_1_PLUS
class PagerdutyNotifier(BaseNotifier):
@@ -86,8 +87,13 @@ class PagerdutyNotifier(BaseNotifier):
links: list[Any] | None = None,
pagerduty_events_conn_id: str | None = "pagerduty_events_default",
integration_key: str | None = None,
+ **kwargs,
):
- super().__init__()
+ if AIRFLOW_V_3_1_PLUS:
+ # Support for passing context was added in 3.1.0
+ super().__init__(**kwargs)
+ else:
+ super().__init__()
self.pagerduty_events_conn_id = pagerduty_events_conn_id
self.integration_key = integration_key
self.summary = summary
@@ -109,6 +115,13 @@ class PagerdutyNotifier(BaseNotifier):
pagerduty_events_conn_id=self.pagerduty_events_conn_id,
integration_key=self.integration_key
)
+ @cached_property
+ def hook_async(self) -> PagerdutyEventsAsyncHook:
+ """Pagerduty Events Async Hook."""
+ return PagerdutyEventsAsyncHook(
+ pagerduty_events_conn_id=self.pagerduty_events_conn_id,
integration_key=self.integration_key
+ )
+
def notify(self, context):
"""Send a alert to a pagerduty event v2 API."""
self.hook.send_event(
@@ -125,5 +138,21 @@ class PagerdutyNotifier(BaseNotifier):
links=self.links,
)
+ async def async_notify(self, context) -> None:
+ """Send a alert to a pagerduty event v2 API using async HTTP."""
+ await self.hook_async.send_event(
+ summary=self.summary,
+ severity=self.severity,
+ source=self.source,
+ action=self.action,
+ dedup_key=self.dedup_key,
+ custom_details=self.custom_details,
+ group=self.group,
+ component=self.component,
+ class_type=self.class_type,
+ images=self.images,
+ links=self.links,
+ )
+
send_pagerduty_notification = PagerdutyNotifier
diff --git
a/providers/pagerduty/tests/unit/pagerduty/hooks/test_pagerduty_events.py
b/providers/pagerduty/tests/unit/pagerduty/hooks/test_pagerduty_events.py
index 8d1136900cc..dea0cc5f620 100644
--- a/providers/pagerduty/tests/unit/pagerduty/hooks/test_pagerduty_events.py
+++ b/providers/pagerduty/tests/unit/pagerduty/hooks/test_pagerduty_events.py
@@ -17,10 +17,17 @@
# under the License.
from __future__ import annotations
+from unittest import mock
+
import pytest
+from aioresponses import aioresponses
from airflow.models import Connection
-from airflow.providers.pagerduty.hooks.pagerduty_events import
PagerdutyEventsHook
+from airflow.providers.pagerduty.hooks.pagerduty_events import (
+ PagerdutyEventsAsyncHook,
+ PagerdutyEventsHook,
+ prepare_event_data,
+)
DEFAULT_CONN_ID = "pagerduty_events_default"
@@ -32,6 +39,40 @@ def events_connections(create_connection_without_db):
)
[email protected]
+def aioresponse():
+ """
+ Creates mock async API response.
+ """
+ with aioresponses() as async_response:
+ yield async_response
+
+
+class TestPrepareEventData:
+ def test_prepare_event_data(self):
+ exp_event_data = {
+ "action": "trigger",
+ "dedup_key": "random",
+ "payload": {
+ "severity": "error",
+ "source": "airflow_test",
+ "summary": "test",
+ },
+ }
+ even_data = prepare_event_data(
+ summary="test", source="airflow_test", severity="error",
dedup_key="random"
+ )
+ assert even_data == exp_event_data
+
+ def test_prepare_event_data_invalid_action(self):
+ with pytest.raises(ValueError):
+ prepare_event_data(summary="test", severity="error",
action="should_raise")
+
+ def test_prepare_event_missing_dedup_key(self):
+ with pytest.raises(ValueError):
+ prepare_event_data(summary="test", severity="error",
action="acknowledge")
+
+
class TestPagerdutyEventsHook:
def test_get_integration_key_from_password(self, events_connections):
hook = PagerdutyEventsHook(pagerduty_events_conn_id=DEFAULT_CONN_ID)
@@ -62,3 +103,45 @@ class TestPagerdutyEventsHook:
requests_mock.post("https://events.pagerduty.com/v2/enqueue",
json=mock_response_body)
resp = hook.send_event(summary="test", source="airflow_test",
severity="error", dedup_key=dedup_key)
assert resp == dedup_key
+
+
+class TestPagerdutyEventsAsyncHook:
+ @pytest.mark.asyncio
+ async def test_get_integration_key_from_password(self, events_connections):
+ hook =
PagerdutyEventsAsyncHook(pagerduty_events_conn_id=DEFAULT_CONN_ID)
+ integration_key = await hook.get_integration_key()
+ assert integration_key == "events_token", "token initialised."
+
+ @pytest.mark.asyncio
+ async def test_get_integration_key_parameter_override(self,
events_connections):
+ hook = PagerdutyEventsAsyncHook(
+ integration_key="override_key",
pagerduty_events_conn_id=DEFAULT_CONN_ID
+ )
+ integration_key = await hook.get_integration_key()
+ assert integration_key == "override_key", "token initialised."
+
+ @pytest.mark.asyncio
+ async def test_send_event_with_payload(self, events_connections,
aioresponse):
+ hook =
PagerdutyEventsAsyncHook(pagerduty_events_conn_id=DEFAULT_CONN_ID)
+
+ with mock.patch("aiohttp.ClientSession.post",
new_callable=mock.AsyncMock) as mocked_function:
+ await hook.send_event(summary="test", source="airflow_test",
severity="error", dedup_key="random")
+ assert mocked_function.call_args.kwargs.get("json") == {
+ "event_action": "trigger",
+ "routing_key": "events_token",
+ "action": "trigger",
+ "payload": {"summary": "test", "severity": "error", "source":
"airflow_test"},
+ "dedup_key": "random",
+ }
+ assert mocked_function.call_args.kwargs.get("headers") ==
PagerdutyEventsAsyncHook.default_headers
+ assert mocked_function.call_args.kwargs.get("auth") is None
+
+ @pytest.mark.asyncio
+ async def test_send_event_with_success(self, events_connections,
aioresponse):
+ hook =
PagerdutyEventsAsyncHook(pagerduty_events_conn_id=DEFAULT_CONN_ID)
+ exp_response = {"dedup_key": "random"}
+ aioresponse.post("https://events.pagerduty.com/v2/enqueue",
status=200, payload=exp_response)
+ res = await hook.send_event(
+ summary="test", source="airflow_test", severity="error",
dedup_key="random"
+ )
+ assert res == exp_response["dedup_key"]
diff --git
a/providers/pagerduty/tests/unit/pagerduty/notifications/test_pagerduty.py
b/providers/pagerduty/tests/unit/pagerduty/notifications/test_pagerduty.py
index 47394077a77..8f1dd65fb94 100644
--- a/providers/pagerduty/tests/unit/pagerduty/notifications/test_pagerduty.py
+++ b/providers/pagerduty/tests/unit/pagerduty/notifications/test_pagerduty.py
@@ -19,6 +19,8 @@ from __future__ import annotations
from unittest import mock
+import pytest
+
from airflow import DAG
from airflow.providers.pagerduty.hooks.pagerduty_events import
PagerdutyEventsHook
from airflow.providers.pagerduty.notifications.pagerduty import (
@@ -107,3 +109,25 @@ class TestPagerdutyNotifier:
images=None,
links=None,
)
+
+ @pytest.mark.asyncio
+ @mock.patch(
+
"airflow.providers.pagerduty.notifications.pagerduty.PagerdutyEventsAsyncHook.send_event",
+ new_callable=mock.AsyncMock,
+ )
+ async def test_async_notifier(self, mock_async_hook,
create_dag_without_db):
+ notifier = send_pagerduty_notification(summary="DISK at 99%",
severity="critical", action="trigger")
+ await notifier.async_notify({"dag":
create_dag_without_db("test_pagerduty_notifier")})
+ mock_async_hook.assert_called_once_with(
+ summary="DISK at 99%",
+ severity="critical",
+ action="trigger",
+ source="airflow",
+ class_type=None,
+ component=None,
+ custom_details=None,
+ group=None,
+ images=None,
+ links=None,
+ dedup_key=None,
+ )