This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c3daef9fa93 HTTP Notifier implementation (#56160)
c3daef9fa93 is described below
commit c3daef9fa933eea002ee45a463962dc2ad9d7cc9
Author: Sean Ghaeli <[email protected]>
AuthorDate: Wed Oct 22 11:28:03 2025 -0700
HTTP Notifier implementation (#56160)
---
dev/breeze/tests/test_selective_checks.py | 21 +++--
providers/http/provider.yaml | 3 +
providers/http/pyproject.toml | 8 ++
.../airflow/providers/http/get_provider_info.py | 1 +
.../providers/http/notifications/__init__.py | 21 +++++
.../airflow/providers/http/notifications/http.py | 105 +++++++++++++++++++++
.../http/tests/unit/http/notifications/__init__.py | 16 ++++
.../tests/unit/http/notifications/test_http.py | 95 +++++++++++++++++++
8 files changed, 261 insertions(+), 9 deletions(-)
diff --git a/dev/breeze/tests/test_selective_checks.py
b/dev/breeze/tests/test_selective_checks.py
index 16468447e81..35f2cf03fe2 100644
--- a/dev/breeze/tests/test_selective_checks.py
+++ b/dev/breeze/tests/test_selective_checks.py
@@ -670,7 +670,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str,
str], stderr: str):
"providers/http/tests/file.py",
),
{
- "selected-providers-list-as-string": "amazon apache.livy
atlassian.jira dbt.cloud dingding discord google http pagerduty",
+ "selected-providers-list-as-string": "amazon apache.livy
atlassian.jira common.compat dbt.cloud dingding discord google http pagerduty",
"all-python-versions":
f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']",
"all-python-versions-list-as-string":
DEFAULT_PYTHON_MAJOR_MINOR_VERSION,
"python-versions":
f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']",
@@ -691,7 +691,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str,
str], stderr: str):
[
{
"description": "amazon...google",
- "test_types": "Providers[amazon]
Providers[apache.livy,atlassian.jira,dbt.cloud,dingding,discord,http,pagerduty]
Providers[google]",
+ "test_types": "Providers[amazon]
Providers[apache.livy,atlassian.jira,common.compat,dbt.cloud,dingding,discord,http,pagerduty]
Providers[google]",
}
]
),
@@ -702,18 +702,21 @@ def assert_outputs_are_printed(expected_outputs:
dict[str, str], stderr: str):
"test_types": "Providers[amazon]
Providers[apache.livy]",
},
{
- "description": "atlassian.jir...dbt.cloud",
- "test_types": "Providers[atlassian.jira]
Providers[dbt.cloud]",
+ "description": "atlassian.jir...common.compat",
+ "test_types": "Providers[atlassian.jira]
Providers[common.compat]",
},
{
- "description": "dingding...discord",
- "test_types": "Providers[dingding]
Providers[discord]",
+ "description": "dbt.cloud...dingding",
+ "test_types": "Providers[dbt.cloud]
Providers[dingding]",
},
{
- "description": "google...http",
- "test_types": "Providers[google]
Providers[http]",
+ "description": "discord...google",
+ "test_types": "Providers[discord]
Providers[google]",
+ },
+ {
+ "description": "http...pagerduty",
+ "test_types": "Providers[http]
Providers[pagerduty]",
},
- {"description": "pagerduty", "test_types":
"Providers[pagerduty]"},
]
),
"run-mypy": "true",
diff --git a/providers/http/provider.yaml b/providers/http/provider.yaml
index 2b746b517a7..6b83e101efb 100644
--- a/providers/http/provider.yaml
+++ b/providers/http/provider.yaml
@@ -97,6 +97,9 @@ hooks:
python-modules:
- airflow.providers.http.hooks.http
+notifications:
+ - airflow.providers.http.notifications.HttpNotifier
+
triggers:
- integration-name: Hypertext Transfer Protocol (HTTP)
python-modules:
diff --git a/providers/http/pyproject.toml b/providers/http/pyproject.toml
index 34ea031a61f..b70fc9f6005 100644
--- a/providers/http/pyproject.toml
+++ b/providers/http/pyproject.toml
@@ -66,11 +66,19 @@ dependencies = [
"asgiref>=2.3.0",
]
+# The optional dependencies should be modified in place in the generated file
+# Any change in the dependencies is preserved when the file is regenerated
+[project.optional-dependencies]
+"common.compat" = [
+ "apache-airflow-providers-common-compat"
+]
+
[dependency-groups]
dev = [
"apache-airflow",
"apache-airflow-task-sdk",
"apache-airflow-devel-common",
+ "apache-airflow-providers-common-compat",
# Additional devel dependencies (do not remove this line and add extra
development dependencies)
]
diff --git a/providers/http/src/airflow/providers/http/get_provider_info.py
b/providers/http/src/airflow/providers/http/get_provider_info.py
index 290339ef436..37b48c286f0 100644
--- a/providers/http/src/airflow/providers/http/get_provider_info.py
+++ b/providers/http/src/airflow/providers/http/get_provider_info.py
@@ -53,6 +53,7 @@ def get_provider_info():
"python-modules": ["airflow.providers.http.hooks.http"],
}
],
+ "notifications": ["airflow.providers.http.notifications.HttpNotifier"],
"triggers": [
{
"integration-name": "Hypertext Transfer Protocol (HTTP)",
diff --git
a/providers/http/src/airflow/providers/http/notifications/__init__.py
b/providers/http/src/airflow/providers/http/notifications/__init__.py
new file mode 100644
index 00000000000..74517c85ef7
--- /dev/null
+++ b/providers/http/src/airflow/providers/http/notifications/__init__.py
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.providers.http.notifications.http import HttpNotifier
+
+__all__ = ["HttpNotifier"]
diff --git a/providers/http/src/airflow/providers/http/notifications/http.py
b/providers/http/src/airflow/providers/http/notifications/http.py
new file mode 100644
index 00000000000..2f1adeadca5
--- /dev/null
+++ b/providers/http/src/airflow/providers/http/notifications/http.py
@@ -0,0 +1,105 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from functools import cached_property
+from typing import TYPE_CHECKING, Any
+
+import aiohttp
+
+from airflow.providers.common.compat.notifier import BaseNotifier
+from airflow.providers.http.hooks.http import HttpAsyncHook, HttpHook
+
+if TYPE_CHECKING:
+ from airflow.sdk.definitions.context import Context
+
+
+class HttpNotifier(BaseNotifier):
+ """
+ HTTP Notifier.
+
+ Sends HTTP requests to notify external systems.
+
+ :param http_conn_id: HTTP connection id that has the base URL and optional
authentication credentials.
+ :param endpoint: The endpoint to be called i.e. resource/v1/query?
+ :param method: The HTTP method to use. Defaults to POST.
+ :param data: Payload to be uploaded or request parameters
+ :param json: JSON payload to be uploaded
+ :param headers: Additional headers to be passed through as a dictionary
+ :param extra_options: Additional options to be used when executing the
request
+ """
+
+ template_fields = ("http_conn_id", "endpoint", "data", "json", "headers",
"extra_options")
+
+ def __init__(
+ self,
+ *,
+ http_conn_id: str = HttpHook.default_conn_name,
+ endpoint: str | None = None,
+ method: str = "POST",
+ data: dict[str, Any] | str | None = None,
+ json: dict[str, Any] | str | None = None,
+ headers: dict[str, Any] | None = None,
+ extra_options: dict[str, Any] | None = None,
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+ self.http_conn_id = http_conn_id
+ self.endpoint = endpoint
+ self.method = method
+ self.data = data
+ self.json = json
+ self.headers = headers
+ self.extra_options = extra_options or {}
+
+ @cached_property
+ def hook(self) -> HttpHook:
+ """HTTP Hook."""
+ return HttpHook(method=self.method, http_conn_id=self.http_conn_id)
+
+ @cached_property
+ def async_hook(self) -> HttpAsyncHook:
+ """HTTP Async Hook."""
+ return HttpAsyncHook(method=self.method,
http_conn_id=self.http_conn_id)
+
+ def notify(self, context: Context) -> None:
+ """Send HTTP notification (sync)."""
+ resp = self.hook.run(
+ endpoint=self.endpoint,
+ data=self.data,
+ headers=self.headers,
+ extra_options=self.extra_options,
+ json=self.json,
+ )
+ self.log.debug("HTTP notification sent: %s %s", resp.status_code,
resp.url)
+
+ async def async_notify(self, context: Context) -> None:
+ """Send HTTP notification (async)."""
+ async with aiohttp.ClientSession() as session:
+ resp = await self.async_hook.run(
+ session=session,
+ endpoint=self.endpoint,
+ data=self.data,
+ json=self.json,
+ headers=self.headers,
+ extra_options=self.extra_options,
+ )
+ self.log.debug("HTTP notification sent (async): %s %s",
resp.status, resp.url)
+
+
+send_http_notification = HttpNotifier
diff --git a/providers/http/tests/unit/http/notifications/__init__.py
b/providers/http/tests/unit/http/notifications/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/providers/http/tests/unit/http/notifications/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/providers/http/tests/unit/http/notifications/test_http.py
b/providers/http/tests/unit/http/notifications/test_http.py
new file mode 100644
index 00000000000..eb4eecff466
--- /dev/null
+++ b/providers/http/tests/unit/http/notifications/test_http.py
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from unittest import mock
+
+import pytest
+
+from airflow.providers.http.notifications.http import HttpNotifier,
send_http_notification
+
+
+class TestHttpNotifier:
+ def test_class_and_notifier_are_same(self):
+ assert send_http_notification is HttpNotifier
+
+ @mock.patch("airflow.providers.http.notifications.http.HttpHook")
+ def test_http_notifier(self, mock_http_hook):
+ notifier = HttpNotifier(
+ http_conn_id="test_conn_id",
+ endpoint="/testing",
+ method="POST",
+ json={"message": "testing"},
+ headers={"Content-Type": "application/json"},
+ )
+ notifier.notify({})
+
+ mock_http_hook.return_value.run.assert_called_once_with(
+ endpoint="/testing",
+ data=None,
+ headers={"Content-Type": "application/json"},
+ extra_options={},
+ json={"message": "testing"},
+ )
+ mock_http_hook.assert_called_once_with(method="POST",
http_conn_id="test_conn_id")
+
+ @pytest.mark.asyncio
+ @mock.patch("airflow.providers.http.notifications.http.HttpAsyncHook")
+ @mock.patch("aiohttp.ClientSession")
+ async def test_async_http_notifier(self, mock_session,
mock_http_async_hook):
+ mock_hook = mock_http_async_hook.return_value
+ mock_hook.run = mock.AsyncMock()
+
+ notifier = HttpNotifier(
+ http_conn_id="test_conn_id",
+ endpoint="/test",
+ method="POST",
+ json={"message": "test"},
+ )
+
+ await notifier.async_notify({})
+
+ mock_hook.run.assert_called_once_with(
+ session=mock_session.return_value.__aenter__.return_value,
+ endpoint="/test",
+ data=None,
+ json={"message": "test"},
+ headers=None,
+ extra_options={},
+ )
+
+ @mock.patch("airflow.providers.http.notifications.http.HttpHook")
+ def test_http_notifier_templated(self, mock_http_hook,
create_dag_without_db):
+ notifier = HttpNotifier(
+ endpoint="/{{ dag.dag_id }}",
+ json={"dag_id": "{{ dag.dag_id }}", "user": "{{ username }}"},
+ )
+ notifier(
+ {
+ "dag":
create_dag_without_db("test_http_notification_templated"),
+ "username": "test-user",
+ }
+ )
+
+ mock_http_hook.return_value.run.assert_called_once_with(
+ endpoint="/test_http_notification_templated",
+ data=None,
+ headers=None,
+ extra_options={},
+ json={"dag_id": "test_http_notification_templated", "user":
"test-user"},
+ )