This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1f76986b7b Fix: Paginate on lastest Response (#35560)
1f76986b7b is described below

commit 1f76986b7ba19737aba77d63bbec1ce29aff55fc
Author: Joffrey Bienvenu <[email protected]>
AuthorDate: Sat Nov 11 07:55:21 2023 +0100

    Fix: Paginate on lastest Response (#35560)
    
    * fix: Use previous Response as parameter for pagination function
    
    * fix: clarify docs
    
    * fix: Add stronger test
    
    * fix: Refactor test do to three chained calls
    
    * fix: code formatting
---
 airflow/providers/http/operators/http.py    | 27 ++++++++++++++-------------
 tests/providers/http/operators/test_http.py | 20 +++++++++++---------
 2 files changed, 25 insertions(+), 22 deletions(-)

diff --git a/airflow/providers/http/operators/http.py 
b/airflow/providers/http/operators/http.py
index 216de77e31..96415ed977 100644
--- a/airflow/providers/http/operators/http.py
+++ b/airflow/providers/http/operators/http.py
@@ -53,14 +53,15 @@ class HttpOperator(BaseOperator):
     :param data: The data to pass. POST-data in POST/PUT and params
         in the URL for a GET request. (templated)
     :param headers: The HTTP headers to be added to the GET request
-    :param pagination_function: A callable that generates the parameters used 
to call the API again.
-        Typically used when the API is paginated and returns for e.g a cursor, 
a 'next page id', or
-        a 'next page URL'. When provided, the Operator will call the API 
repeatedly until this callable
-        returns None. Also, the result of the Operator will become by default 
a list of Response.text
-        objects (instead of a single response object). Same with the other 
injected functions (like
-        response_check, response_filter, ...) which will also receive a list 
of Response object. This
-        function should return a dict of parameters (`endpoint`, `data`, 
`headers`, `extra_options`),
-        which will be merged and override the one used in the initial API call.
+    :param pagination_function: A callable that generates the parameters used 
to call the API again,
+        based on the previous response. Typically used when the API is 
paginated and returns for e.g a
+        cursor, a 'next page id', or a 'next page URL'. When provided, the 
Operator will call the API
+        repeatedly until this callable returns None. Also, the result of the 
Operator will become by
+        default a list of Response.text objects (instead of a single response 
object). Same with the
+        other injected functions (like response_check, response_filter, ...) 
which will also receive a
+        list of Response object. This function receives a Response object form 
previous call, and should
+        return a dict of parameters (`endpoint`, `data`, `headers`, 
`extra_options`), which will be merged
+        and will override the one used in the initial API call.
     :param response_check: A check against the 'requests' response object.
         The callable takes the response object as the first positional argument
         and optionally any number of keyword arguments available in the 
context dictionary.
@@ -162,16 +163,16 @@ class HttpOperator(BaseOperator):
     def execute_sync(self, context: Context) -> Any:
         self.log.info("Calling HTTP method")
         response = self.hook.run(self.endpoint, self.data, self.headers, 
self.extra_options)
-        response = self.paginate_sync(first_response=response)
+        response = self.paginate_sync(response=response)
         return self.process_response(context=context, response=response)
 
-    def paginate_sync(self, first_response: Response) -> Response | 
list[Response]:
+    def paginate_sync(self, response: Response) -> Response | list[Response]:
         if not self.pagination_function:
-            return first_response
+            return response
 
-        all_responses = [first_response]
+        all_responses = [response]
         while True:
-            next_page_params = self.pagination_function(first_response)
+            next_page_params = self.pagination_function(response)
             if not next_page_params:
                 break
             response = 
self.hook.run(**self._merge_next_page_parameters(next_page_params))
diff --git a/tests/providers/http/operators/test_http.py 
b/tests/providers/http/operators/test_http.py
index a57af7d764..451cd93d44 100644
--- a/tests/providers/http/operators/test_http.py
+++ b/tests/providers/http/operators/test_http.py
@@ -118,31 +118,33 @@ class TestHttpOperator:
         pagination_function is provided, and as long as this function returns
         a dictionary that override previous' call parameters.
         """
-        has_returned: bool = False
+        iterations: int = 0
 
         def pagination_function(response: Response) -> dict | None:
             """Paginated function which returns None at the second call."""
-            nonlocal has_returned
-            if not has_returned:
-                has_returned = True
+            nonlocal iterations
+            if iterations < 2:
+                iterations += 1
                 return dict(
-                    endpoint="/",
-                    data={"cursor": "example"},
+                    endpoint=response.json()["endpoint"],
+                    data={},
                     headers={},
                     extra_options={},
                 )
             return None
 
-        requests_mock.get("http://www.example.com";, json={"value": 5})
+        requests_mock.get("http://www.example.com/foo";, json={"value": 5, 
"endpoint": "bar"})
+        requests_mock.get("http://www.example.com/bar";, json={"value": 10, 
"endpoint": "foo"})
         operator = HttpOperator(
             task_id="test_HTTP_op",
             method="GET",
-            endpoint="/",
+            endpoint="/foo",
             http_conn_id="HTTP_EXAMPLE",
             pagination_function=pagination_function,
+            response_filter=lambda resp: [entry.json()["value"] for entry in 
resp],
         )
         result = operator.execute({})
-        assert result == ['{"value": 5}', '{"value": 5}']
+        assert result == [5, 10, 5]
 
     def test_async_paginated_responses(self, requests_mock):
         """

Reply via email to