This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1f76986b7b Fix: Paginate on lastest Response (#35560)
1f76986b7b is described below
commit 1f76986b7ba19737aba77d63bbec1ce29aff55fc
Author: Joffrey Bienvenu <[email protected]>
AuthorDate: Sat Nov 11 07:55:21 2023 +0100
Fix: Paginate on lastest Response (#35560)
* fix: Use previous Response as parameter for pagination function
* fix: clarify docs
* fix: Add stronger test
* fix: Refactor test do to three chained calls
* fix: code formatting
---
airflow/providers/http/operators/http.py | 27 ++++++++++++++-------------
tests/providers/http/operators/test_http.py | 20 +++++++++++---------
2 files changed, 25 insertions(+), 22 deletions(-)
diff --git a/airflow/providers/http/operators/http.py
b/airflow/providers/http/operators/http.py
index 216de77e31..96415ed977 100644
--- a/airflow/providers/http/operators/http.py
+++ b/airflow/providers/http/operators/http.py
@@ -53,14 +53,15 @@ class HttpOperator(BaseOperator):
:param data: The data to pass. POST-data in POST/PUT and params
in the URL for a GET request. (templated)
:param headers: The HTTP headers to be added to the GET request
- :param pagination_function: A callable that generates the parameters used
to call the API again.
- Typically used when the API is paginated and returns for e.g a cursor,
a 'next page id', or
- a 'next page URL'. When provided, the Operator will call the API
repeatedly until this callable
- returns None. Also, the result of the Operator will become by default
a list of Response.text
- objects (instead of a single response object). Same with the other
injected functions (like
- response_check, response_filter, ...) which will also receive a list
of Response object. This
- function should return a dict of parameters (`endpoint`, `data`,
`headers`, `extra_options`),
- which will be merged and override the one used in the initial API call.
+ :param pagination_function: A callable that generates the parameters used
to call the API again,
+ based on the previous response. Typically used when the API is
paginated and returns for e.g a
+ cursor, a 'next page id', or a 'next page URL'. When provided, the
Operator will call the API
+ repeatedly until this callable returns None. Also, the result of the
Operator will become by
+ default a list of Response.text objects (instead of a single response
object). Same with the
+ other injected functions (like response_check, response_filter, ...)
which will also receive a
+ list of Response object. This function receives a Response object form
previous call, and should
+ return a dict of parameters (`endpoint`, `data`, `headers`,
`extra_options`), which will be merged
+ and will override the one used in the initial API call.
:param response_check: A check against the 'requests' response object.
The callable takes the response object as the first positional argument
and optionally any number of keyword arguments available in the
context dictionary.
@@ -162,16 +163,16 @@ class HttpOperator(BaseOperator):
def execute_sync(self, context: Context) -> Any:
self.log.info("Calling HTTP method")
response = self.hook.run(self.endpoint, self.data, self.headers,
self.extra_options)
- response = self.paginate_sync(first_response=response)
+ response = self.paginate_sync(response=response)
return self.process_response(context=context, response=response)
- def paginate_sync(self, first_response: Response) -> Response |
list[Response]:
+ def paginate_sync(self, response: Response) -> Response | list[Response]:
if not self.pagination_function:
- return first_response
+ return response
- all_responses = [first_response]
+ all_responses = [response]
while True:
- next_page_params = self.pagination_function(first_response)
+ next_page_params = self.pagination_function(response)
if not next_page_params:
break
response =
self.hook.run(**self._merge_next_page_parameters(next_page_params))
diff --git a/tests/providers/http/operators/test_http.py
b/tests/providers/http/operators/test_http.py
index a57af7d764..451cd93d44 100644
--- a/tests/providers/http/operators/test_http.py
+++ b/tests/providers/http/operators/test_http.py
@@ -118,31 +118,33 @@ class TestHttpOperator:
pagination_function is provided, and as long as this function returns
a dictionary that override previous' call parameters.
"""
- has_returned: bool = False
+ iterations: int = 0
def pagination_function(response: Response) -> dict | None:
"""Paginated function which returns None at the second call."""
- nonlocal has_returned
- if not has_returned:
- has_returned = True
+ nonlocal iterations
+ if iterations < 2:
+ iterations += 1
return dict(
- endpoint="/",
- data={"cursor": "example"},
+ endpoint=response.json()["endpoint"],
+ data={},
headers={},
extra_options={},
)
return None
- requests_mock.get("http://www.example.com", json={"value": 5})
+ requests_mock.get("http://www.example.com/foo", json={"value": 5,
"endpoint": "bar"})
+ requests_mock.get("http://www.example.com/bar", json={"value": 10,
"endpoint": "foo"})
operator = HttpOperator(
task_id="test_HTTP_op",
method="GET",
- endpoint="/",
+ endpoint="/foo",
http_conn_id="HTTP_EXAMPLE",
pagination_function=pagination_function,
+ response_filter=lambda resp: [entry.json()["value"] for entry in
resp],
)
result = operator.execute({})
- assert result == ['{"value": 5}', '{"value": 5}']
+ assert result == [5, 10, 5]
def test_async_paginated_responses(self, requests_mock):
"""