This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7f26bcd3999 Add poll_interval attribute to HttpEventTrigger (#57583)
7f26bcd3999 is described below
commit 7f26bcd399945c695feee5827c6cd27b6a8f90c6
Author: Karen Braganza <[email protected]>
AuthorDate: Thu Nov 13 16:54:16 2025 -0500
Add poll_interval attribute to HttpEventTrigger (#57583)
* Add poll_interval attribute to HttpEventTrigger
* Add info on poll_interval to HttpEventTrigger docs
* Add info on poll_interval to HttpEventTrigger docs
* Add poll_interval attribute to tests
* Update providers/http/docs/triggers.rst
Co-authored-by: Ryan Hatter <[email protected]>
---------
Co-authored-by: Ryan Hatter <[email protected]>
---
providers/http/docs/triggers.rst | 11 ++++++++---
providers/http/src/airflow/providers/http/triggers/http.py | 7 ++++++-
providers/http/tests/unit/http/triggers/test_http.py | 3 +++
3 files changed, 17 insertions(+), 4 deletions(-)
diff --git a/providers/http/docs/triggers.rst b/providers/http/docs/triggers.rst
index 752e2cd9571..f14bf15c629 100644
--- a/providers/http/docs/triggers.rst
+++ b/providers/http/docs/triggers.rst
@@ -30,7 +30,7 @@ enabling event-driven DAGs based on API responses.
How It Works
------------
-1. Sends requests to an API.
+1. Sends requests to an API every ``poll_interval`` seconds (default 60).
2. Uses the callable at ``response_check_path`` to evaluate the API response.
3. If the callable returns ``True``, a ``TriggerEvent`` is emitted. This will
trigger DAGs using this ``AssetWatcher`` for scheduling.
@@ -85,6 +85,7 @@ Here's an example of using the HttpEventTrigger in an
AssetWatcher to monitor th
http_conn_id="http_default", # HTTP connection with
https://api.github.com/ as the Host
headers=headers,
response_check_path="dags.check_airflow_releases.check_github_api_response", #
Path to the check_github_api_response callable
+ poll_interval=600, # Poll API every 600 seconds
)
asset = Asset(
@@ -133,11 +134,15 @@ Parameters
``response_check_path``
Path to callable that evaluates whether the API response passes the
conditions set by the user to trigger DAGs
+``poll_interval``
+ How often, in seconds, the trigger should send a request to the API.
+
Important Notes
---------------
1. A ``response_check_path`` value is required.
2. The ``response_check_path`` must contain the path to an asynchronous
callable. Synchronous callables will raise an exception.
-3. This trigger does not automatically record the previous API response.
-4. The previous response may have to be persisted manually though
``Variable.set()`` in the ``response_check_path`` callable to prevent the
trigger from emitting events repeatedly for the same API response.
+3. The ``poll_interval`` defaults to 60 seconds. This may be changed to avoid
hitting API rate limits.
+4. This trigger does not automatically record the previous API response.
+5. The previous response may have to be persisted manually though
``Variable.set()`` in the ``response_check_path`` callable to prevent the
trigger from emitting events repeatedly for the same API response.
diff --git a/providers/http/src/airflow/providers/http/triggers/http.py
b/providers/http/src/airflow/providers/http/triggers/http.py
index 6cddca683d2..86d81e19791 100644
--- a/providers/http/src/airflow/providers/http/triggers/http.py
+++ b/providers/http/src/airflow/providers/http/triggers/http.py
@@ -253,6 +253,7 @@ class HttpEventTrigger(HttpTrigger, BaseEventTrigger):
:param headers: Additional headers to be passed through as a dict.
:param data: Payload to be uploaded or request parameters.
:param extra_options: Additional kwargs to pass when creating a request.
+ :parama poll_interval: How often, in seconds, the trigger should send a
request to the API.
"""
def __init__(
@@ -265,9 +266,11 @@ class HttpEventTrigger(HttpTrigger, BaseEventTrigger):
headers: dict[str, str] | None = None,
data: dict[str, Any] | str | None = None,
extra_options: dict[str, Any] | None = None,
+ poll_interval: float = 60.0,
):
super().__init__(http_conn_id, auth_type, method, endpoint, headers,
data, extra_options)
self.response_check_path = response_check_path
+ self.poll_interval = poll_interval
def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serialize HttpEventTrigger arguments and classpath."""
@@ -276,12 +279,13 @@ class HttpEventTrigger(HttpTrigger, BaseEventTrigger):
{
"http_conn_id": self.http_conn_id,
"method": self.method,
- "auth_type": self.auth_type,
+ "auth_type": serialize_auth_type(self.auth_type),
"endpoint": self.endpoint,
"headers": self.headers,
"data": self.data,
"extra_options": self.extra_options,
"response_check_path": self.response_check_path,
+ "poll_interval": self.poll_interval,
},
)
@@ -293,6 +297,7 @@ class HttpEventTrigger(HttpTrigger, BaseEventTrigger):
response = await super()._get_response(hook)
if await self._run_response_check(response):
break
+ await asyncio.sleep(self.poll_interval)
yield TriggerEvent(
{
"status": "success",
diff --git a/providers/http/tests/unit/http/triggers/test_http.py
b/providers/http/tests/unit/http/triggers/test_http.py
index 8313f69f654..0e47c5559c1 100644
--- a/providers/http/tests/unit/http/triggers/test_http.py
+++ b/providers/http/tests/unit/http/triggers/test_http.py
@@ -43,6 +43,7 @@ TEST_HEADERS = {"Authorization": "Bearer test"}
TEST_DATA = {"key": "value"}
TEST_EXTRA_OPTIONS: dict[str, Any] = {}
TEST_RESPONSE_CHECK_PATH = "mock.path"
+TEST_POLL_INTERVAL = 5
@pytest.fixture
@@ -81,6 +82,7 @@ def event_trigger():
data=TEST_DATA,
extra_options=TEST_EXTRA_OPTIONS,
response_check_path=TEST_RESPONSE_CHECK_PATH,
+ poll_interval=TEST_POLL_INTERVAL,
)
@@ -232,6 +234,7 @@ class TestHttpEventTrigger:
"data": TEST_DATA,
"extra_options": TEST_EXTRA_OPTIONS,
"response_check_path": TEST_RESPONSE_CHECK_PATH,
+ "poll_interval": TEST_POLL_INTERVAL,
}
@pytest.mark.asyncio