lopezvit commented on issue #39247:
URL: https://github.com/apache/airflow/issues/39247#issuecomment-2076871267
I have created this extended version and seems to be working correctly,
sorry I don't have time to create a PR:
from __future__ import annotations
from airflow.configuration import conf
from airflow.providers.http.operators.http import HttpOperator
from airflow.utils.context import Context
from requests import Response
from requests.auth import AuthBase
from typing import Any, Callable
class RetryHttpOperator(HttpOperator):
def __init__(self, *, endpoint: str | None = None, method: str =
"POST", data: dict[str, Any] | str | None = None,
headers: dict[str, str] | None = None,
pagination_function: Callable[..., Any] | None = None,
response_check: Callable[..., bool] | None = None,
response_filter: Callable[..., Any] | None = None,
extra_options: dict[str, Any] | None = None,
http_conn_id: str = "http_default",
log_response: bool = False, auth_type: type[AuthBase] |
None = None, tcp_keep_alive: bool = True,
tcp_keep_alive_idle: int = 120, tcp_keep_alive_count:
int = 20, tcp_keep_alive_interval: int = 30,
deferrable: bool = conf.getboolean("operators",
"default_deferrable", fallback=False),
retry_args: dict[Any, Any],
**kwargs: Any) -> None:
super().__init__(endpoint=endpoint, method=method, data=data,
headers=headers,
pagination_function=pagination_function,
response_check=response_check,
response_filter=response_filter,
extra_options=extra_options, http_conn_id=http_conn_id,
log_response=log_response, auth_type=auth_type,
tcp_keep_alive=tcp_keep_alive,
tcp_keep_alive_idle=tcp_keep_alive_idle,
tcp_keep_alive_count=tcp_keep_alive_count,
tcp_keep_alive_interval=tcp_keep_alive_interval, deferrable=deferrable,
**kwargs)
self._retry_args = retry_args
def execute_sync(self, context: Context) -> Any:
self.log.info("Calling HTTP method")
if self._retry_args:
response =
self.hook.run_with_advanced_retry(self._retry_args, self.endpoint, self.data,
self.headers,
self.extra_options)
else:
response = self.hook.run(self.endpoint, self.data,
self.headers, self.extra_options)
response = self.paginate_sync(response=response)
return self.process_response(context=context, response=response)
def paginate_sync(self, response: Response) -> Response |
list[Response]:
if not self.pagination_function:
return response
all_responses = [response]
while True:
next_page_params = self.pagination_function(response)
if not next_page_params:
break
if self._retry_args:
response =
self.hook.run_with_advanced_retry(self._retry_args,
**self._merge_next_page_parameters(next_page_params))
else:
response =
self.hook.run(**self._merge_next_page_parameters(next_page_params))
all_responses.append(response)
return all_responses
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]