This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c3daef9fa93 HTTP Notifier implementation (#56160)
c3daef9fa93 is described below

commit c3daef9fa933eea002ee45a463962dc2ad9d7cc9
Author: Sean Ghaeli <[email protected]>
AuthorDate: Wed Oct 22 11:28:03 2025 -0700

    HTTP Notifier implementation (#56160)
---
 dev/breeze/tests/test_selective_checks.py          |  21 +++--
 providers/http/provider.yaml                       |   3 +
 providers/http/pyproject.toml                      |   8 ++
 .../airflow/providers/http/get_provider_info.py    |   1 +
 .../providers/http/notifications/__init__.py       |  21 +++++
 .../airflow/providers/http/notifications/http.py   | 105 +++++++++++++++++++++
 .../http/tests/unit/http/notifications/__init__.py |  16 ++++
 .../tests/unit/http/notifications/test_http.py     |  95 +++++++++++++++++++
 8 files changed, 261 insertions(+), 9 deletions(-)

diff --git a/dev/breeze/tests/test_selective_checks.py 
b/dev/breeze/tests/test_selective_checks.py
index 16468447e81..35f2cf03fe2 100644
--- a/dev/breeze/tests/test_selective_checks.py
+++ b/dev/breeze/tests/test_selective_checks.py
@@ -670,7 +670,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, 
str], stderr: str):
                     "providers/http/tests/file.py",
                 ),
                 {
-                    "selected-providers-list-as-string": "amazon apache.livy 
atlassian.jira dbt.cloud dingding discord google http pagerduty",
+                    "selected-providers-list-as-string": "amazon apache.livy 
atlassian.jira common.compat dbt.cloud dingding discord google http pagerduty",
                     "all-python-versions": 
f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']",
                     "all-python-versions-list-as-string": 
DEFAULT_PYTHON_MAJOR_MINOR_VERSION,
                     "python-versions": 
f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']",
@@ -691,7 +691,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, 
str], stderr: str):
                         [
                             {
                                 "description": "amazon...google",
-                                "test_types": "Providers[amazon] 
Providers[apache.livy,atlassian.jira,dbt.cloud,dingding,discord,http,pagerduty] 
Providers[google]",
+                                "test_types": "Providers[amazon] 
Providers[apache.livy,atlassian.jira,common.compat,dbt.cloud,dingding,discord,http,pagerduty]
 Providers[google]",
                             }
                         ]
                     ),
@@ -702,18 +702,21 @@ def assert_outputs_are_printed(expected_outputs: 
dict[str, str], stderr: str):
                                 "test_types": "Providers[amazon] 
Providers[apache.livy]",
                             },
                             {
-                                "description": "atlassian.jir...dbt.cloud",
-                                "test_types": "Providers[atlassian.jira] 
Providers[dbt.cloud]",
+                                "description": "atlassian.jir...common.compat",
+                                "test_types": "Providers[atlassian.jira] 
Providers[common.compat]",
                             },
                             {
-                                "description": "dingding...discord",
-                                "test_types": "Providers[dingding] 
Providers[discord]",
+                                "description": "dbt.cloud...dingding",
+                                "test_types": "Providers[dbt.cloud] 
Providers[dingding]",
                             },
                             {
-                                "description": "google...http",
-                                "test_types": "Providers[google] 
Providers[http]",
+                                "description": "discord...google",
+                                "test_types": "Providers[discord] 
Providers[google]",
+                            },
+                            {
+                                "description": "http...pagerduty",
+                                "test_types": "Providers[http] 
Providers[pagerduty]",
                             },
-                            {"description": "pagerduty", "test_types": 
"Providers[pagerduty]"},
                         ]
                     ),
                     "run-mypy": "true",
diff --git a/providers/http/provider.yaml b/providers/http/provider.yaml
index 2b746b517a7..6b83e101efb 100644
--- a/providers/http/provider.yaml
+++ b/providers/http/provider.yaml
@@ -97,6 +97,9 @@ hooks:
     python-modules:
       - airflow.providers.http.hooks.http
 
+notifications:
+  - airflow.providers.http.notifications.HttpNotifier
+
 triggers:
   - integration-name: Hypertext Transfer Protocol (HTTP)
     python-modules:
diff --git a/providers/http/pyproject.toml b/providers/http/pyproject.toml
index 34ea031a61f..b70fc9f6005 100644
--- a/providers/http/pyproject.toml
+++ b/providers/http/pyproject.toml
@@ -66,11 +66,19 @@ dependencies = [
     "asgiref>=2.3.0",
 ]
 
+# The optional dependencies should be modified in place in the generated file
+# Any change in the dependencies is preserved when the file is regenerated
+[project.optional-dependencies]
+"common.compat" = [
+    "apache-airflow-providers-common-compat"
+]
+
 [dependency-groups]
 dev = [
     "apache-airflow",
     "apache-airflow-task-sdk",
     "apache-airflow-devel-common",
+    "apache-airflow-providers-common-compat",
     # Additional devel dependencies (do not remove this line and add extra 
development dependencies)
 ]
 
diff --git a/providers/http/src/airflow/providers/http/get_provider_info.py 
b/providers/http/src/airflow/providers/http/get_provider_info.py
index 290339ef436..37b48c286f0 100644
--- a/providers/http/src/airflow/providers/http/get_provider_info.py
+++ b/providers/http/src/airflow/providers/http/get_provider_info.py
@@ -53,6 +53,7 @@ def get_provider_info():
                 "python-modules": ["airflow.providers.http.hooks.http"],
             }
         ],
+        "notifications": ["airflow.providers.http.notifications.HttpNotifier"],
         "triggers": [
             {
                 "integration-name": "Hypertext Transfer Protocol (HTTP)",
diff --git 
a/providers/http/src/airflow/providers/http/notifications/__init__.py 
b/providers/http/src/airflow/providers/http/notifications/__init__.py
new file mode 100644
index 00000000000..74517c85ef7
--- /dev/null
+++ b/providers/http/src/airflow/providers/http/notifications/__init__.py
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.providers.http.notifications.http import HttpNotifier
+
+__all__ = ["HttpNotifier"]
diff --git a/providers/http/src/airflow/providers/http/notifications/http.py 
b/providers/http/src/airflow/providers/http/notifications/http.py
new file mode 100644
index 00000000000..2f1adeadca5
--- /dev/null
+++ b/providers/http/src/airflow/providers/http/notifications/http.py
@@ -0,0 +1,105 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from functools import cached_property
+from typing import TYPE_CHECKING, Any
+
+import aiohttp
+
+from airflow.providers.common.compat.notifier import BaseNotifier
+from airflow.providers.http.hooks.http import HttpAsyncHook, HttpHook
+
+if TYPE_CHECKING:
+    from airflow.sdk.definitions.context import Context
+
+
+class HttpNotifier(BaseNotifier):
+    """
+    HTTP Notifier.
+
+    Sends HTTP requests to notify external systems.
+
+    :param http_conn_id: HTTP connection id that has the base URL and optional 
authentication credentials.
+    :param endpoint: The endpoint to be called i.e. resource/v1/query?
+    :param method: The HTTP method to use. Defaults to POST.
+    :param data: Payload to be uploaded or request parameters
+    :param json: JSON payload to be uploaded
+    :param headers: Additional headers to be passed through as a dictionary
+    :param extra_options: Additional options to be used when executing the 
request
+    """
+
+    template_fields = ("http_conn_id", "endpoint", "data", "json", "headers", 
"extra_options")
+
+    def __init__(
+        self,
+        *,
+        http_conn_id: str = HttpHook.default_conn_name,
+        endpoint: str | None = None,
+        method: str = "POST",
+        data: dict[str, Any] | str | None = None,
+        json: dict[str, Any] | str | None = None,
+        headers: dict[str, Any] | None = None,
+        extra_options: dict[str, Any] | None = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.http_conn_id = http_conn_id
+        self.endpoint = endpoint
+        self.method = method
+        self.data = data
+        self.json = json
+        self.headers = headers
+        self.extra_options = extra_options or {}
+
+    @cached_property
+    def hook(self) -> HttpHook:
+        """HTTP Hook."""
+        return HttpHook(method=self.method, http_conn_id=self.http_conn_id)
+
+    @cached_property
+    def async_hook(self) -> HttpAsyncHook:
+        """HTTP Async Hook."""
+        return HttpAsyncHook(method=self.method, 
http_conn_id=self.http_conn_id)
+
+    def notify(self, context: Context) -> None:
+        """Send HTTP notification (sync)."""
+        resp = self.hook.run(
+            endpoint=self.endpoint,
+            data=self.data,
+            headers=self.headers,
+            extra_options=self.extra_options,
+            json=self.json,
+        )
+        self.log.debug("HTTP notification sent: %s %s", resp.status_code, 
resp.url)
+
+    async def async_notify(self, context: Context) -> None:
+        """Send HTTP notification (async)."""
+        async with aiohttp.ClientSession() as session:
+            resp = await self.async_hook.run(
+                session=session,
+                endpoint=self.endpoint,
+                data=self.data,
+                json=self.json,
+                headers=self.headers,
+                extra_options=self.extra_options,
+            )
+            self.log.debug("HTTP notification sent (async): %s %s", 
resp.status, resp.url)
+
+
+send_http_notification = HttpNotifier
diff --git a/providers/http/tests/unit/http/notifications/__init__.py 
b/providers/http/tests/unit/http/notifications/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/providers/http/tests/unit/http/notifications/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/providers/http/tests/unit/http/notifications/test_http.py 
b/providers/http/tests/unit/http/notifications/test_http.py
new file mode 100644
index 00000000000..eb4eecff466
--- /dev/null
+++ b/providers/http/tests/unit/http/notifications/test_http.py
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from unittest import mock
+
+import pytest
+
+from airflow.providers.http.notifications.http import HttpNotifier, 
send_http_notification
+
+
+class TestHttpNotifier:
+    def test_class_and_notifier_are_same(self):
+        assert send_http_notification is HttpNotifier
+
+    @mock.patch("airflow.providers.http.notifications.http.HttpHook")
+    def test_http_notifier(self, mock_http_hook):
+        notifier = HttpNotifier(
+            http_conn_id="test_conn_id",
+            endpoint="/testing",
+            method="POST",
+            json={"message": "testing"},
+            headers={"Content-Type": "application/json"},
+        )
+        notifier.notify({})
+
+        mock_http_hook.return_value.run.assert_called_once_with(
+            endpoint="/testing",
+            data=None,
+            headers={"Content-Type": "application/json"},
+            extra_options={},
+            json={"message": "testing"},
+        )
+        mock_http_hook.assert_called_once_with(method="POST", 
http_conn_id="test_conn_id")
+
+    @pytest.mark.asyncio
+    @mock.patch("airflow.providers.http.notifications.http.HttpAsyncHook")
+    @mock.patch("aiohttp.ClientSession")
+    async def test_async_http_notifier(self, mock_session, 
mock_http_async_hook):
+        mock_hook = mock_http_async_hook.return_value
+        mock_hook.run = mock.AsyncMock()
+
+        notifier = HttpNotifier(
+            http_conn_id="test_conn_id",
+            endpoint="/test",
+            method="POST",
+            json={"message": "test"},
+        )
+
+        await notifier.async_notify({})
+
+        mock_hook.run.assert_called_once_with(
+            session=mock_session.return_value.__aenter__.return_value,
+            endpoint="/test",
+            data=None,
+            json={"message": "test"},
+            headers=None,
+            extra_options={},
+        )
+
+    @mock.patch("airflow.providers.http.notifications.http.HttpHook")
+    def test_http_notifier_templated(self, mock_http_hook, 
create_dag_without_db):
+        notifier = HttpNotifier(
+            endpoint="/{{ dag.dag_id }}",
+            json={"dag_id": "{{ dag.dag_id }}", "user": "{{ username }}"},
+        )
+        notifier(
+            {
+                "dag": 
create_dag_without_db("test_http_notification_templated"),
+                "username": "test-user",
+            }
+        )
+
+        mock_http_hook.return_value.run.assert_called_once_with(
+            endpoint="/test_http_notification_templated",
+            data=None,
+            headers=None,
+            extra_options={},
+            json={"dag_id": "test_http_notification_templated", "user": 
"test-user"},
+        )

Reply via email to