lopezvit commented on issue #39247:
URL: https://github.com/apache/airflow/issues/39247#issuecomment-2076871267

   I have created this extended version and seems to be working correctly, 
sorry I don't have time to create a PR:
   
   
       from __future__ import annotations
       
       from airflow.configuration import conf
       from airflow.providers.http.operators.http import HttpOperator
       from airflow.utils.context import Context
       from requests import Response
       from requests.auth import AuthBase
       from typing import Any, Callable
       
       
       class RetryHttpOperator(HttpOperator):
       
           def __init__(self, *, endpoint: str | None = None, method: str = 
"POST", data: dict[str, Any] | str | None = None,
                        headers: dict[str, str] | None = None, 
pagination_function: Callable[..., Any] | None = None,
                        response_check: Callable[..., bool] | None = None, 
response_filter: Callable[..., Any] | None = None,
                        extra_options: dict[str, Any] | None = None, 
http_conn_id: str = "http_default",
                        log_response: bool = False, auth_type: type[AuthBase] | 
None = None, tcp_keep_alive: bool = True,
                        tcp_keep_alive_idle: int = 120, tcp_keep_alive_count: 
int = 20, tcp_keep_alive_interval: int = 30,
                        deferrable: bool = conf.getboolean("operators", 
"default_deferrable", fallback=False),
                        retry_args: dict[Any, Any],
                        **kwargs: Any) -> None:
               super().__init__(endpoint=endpoint, method=method, data=data, 
headers=headers,
                                pagination_function=pagination_function, 
response_check=response_check,
                                response_filter=response_filter, 
extra_options=extra_options, http_conn_id=http_conn_id,
                                log_response=log_response, auth_type=auth_type, 
tcp_keep_alive=tcp_keep_alive,
                                tcp_keep_alive_idle=tcp_keep_alive_idle, 
tcp_keep_alive_count=tcp_keep_alive_count,
                                
tcp_keep_alive_interval=tcp_keep_alive_interval, deferrable=deferrable, 
**kwargs)
               self._retry_args = retry_args
       
           def execute_sync(self, context: Context) -> Any:
               self.log.info("Calling HTTP method")
               if self._retry_args:
                   response = 
self.hook.run_with_advanced_retry(self._retry_args, self.endpoint, self.data, 
self.headers,
                                                                
self.extra_options)
               else:
                   response = self.hook.run(self.endpoint, self.data, 
self.headers, self.extra_options)
               response = self.paginate_sync(response=response)
               return self.process_response(context=context, response=response)
       
           def paginate_sync(self, response: Response) -> Response | 
list[Response]:
               if not self.pagination_function:
                   return response
       
               all_responses = [response]
               while True:
                   next_page_params = self.pagination_function(response)
                   if not next_page_params:
                       break
                   if self._retry_args:
                       response = 
self.hook.run_with_advanced_retry(self._retry_args,
                                                                    
**self._merge_next_page_parameters(next_page_params))
                   else:
                       response = 
self.hook.run(**self._merge_next_page_parameters(next_page_params))
                   all_responses.append(response)
               return all_responses
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to