This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-10-test by this push:
new 954abd1f2ee Add retry on error 502 and 504 (#42994) (#43044)
954abd1f2ee is described below
commit 954abd1f2eee4e9ad3caf33348270aa6f92f04c3
Author: Jens Scheffler <[email protected]>
AuthorDate: Tue Oct 15 21:17:04 2024 +0200
Add retry on error 502 and 504 (#42994) (#43044)
* Add retry on error 502 and 504
* fix mypy findings
* Add pytest
* Convert response code to HTTPStatus
* Add docs to retriable exception
* extend docs for AirflowHttpException
* Fix syntax and typos
* fix pytest
* fix static checks
* fix some static checks
* Fix ruff
* fix pre-commit
---------
Co-authored-by: Majoros Donat (XC-DX/EET2-Bp) <[email protected]>
(cherry picked from commit 477d74718adb9b3d68621910af061891b9bdcc17)
Co-authored-by: majorosdonat <[email protected]>
---
airflow/api_internal/internal_api_call.py | 33 +++++++++++++++++++++++++---
tests/api_internal/test_internal_api_call.py | 23 +++++++++++++++++++
2 files changed, 53 insertions(+), 3 deletions(-)
diff --git a/airflow/api_internal/internal_api_call.py
b/airflow/api_internal/internal_api_call.py
index 8838377877b..064834d7c86 100644
--- a/airflow/api_internal/internal_api_call.py
+++ b/airflow/api_internal/internal_api_call.py
@@ -21,6 +21,7 @@ import inspect
import json
import logging
from functools import wraps
+from http import HTTPStatus
from typing import Callable, TypeVar
from urllib.parse import urlparse
@@ -40,6 +41,14 @@ RT = TypeVar("RT")
logger = logging.getLogger(__name__)
+class AirflowHttpException(AirflowException):
+ """Raise when there is a problem during an http request on the internal
API decorator."""
+
+ def __init__(self, message: str, status_code: HTTPStatus):
+ super().__init__(message)
+ self.status_code = status_code
+
+
class InternalApiConfig:
"""Stores and caches configuration for Internal API."""
@@ -105,10 +114,27 @@ def internal_api_call(func: Callable[PS, RT]) ->
Callable[PS, RT]:
"""
from requests.exceptions import ConnectionError
+ def _is_retryable_exception(exception: BaseException) -> bool:
+ """
+ Evaluate which exception types to retry.
+
+ This is especially demanded for cases where an application gateway or
Kubernetes ingress can
+ not find a running instance of a webserver hosting the API (HTTP
502+504) or when the
+ HTTP request fails in general on network level.
+
+ Note that we want to fail on other general errors on the webserver not
to send bad requests in an endless loop.
+ """
+ retryable_status_codes = (HTTPStatus.BAD_GATEWAY,
HTTPStatus.GATEWAY_TIMEOUT)
+ return (
+ isinstance(exception, AirflowHttpException)
+ and exception.status_code in retryable_status_codes
+ or isinstance(exception, (ConnectionError, NewConnectionError))
+ )
+
@tenacity.retry(
stop=tenacity.stop_after_attempt(10),
wait=tenacity.wait_exponential(min=1),
- retry=tenacity.retry_if_exception_type((NewConnectionError,
ConnectionError)),
+ retry=tenacity.retry_if_exception(_is_retryable_exception),
before_sleep=tenacity.before_log(logger, logging.WARNING),
)
def make_jsonrpc_request(method_name: str, params_json: str) -> bytes:
@@ -126,9 +152,10 @@ def internal_api_call(func: Callable[PS, RT]) ->
Callable[PS, RT]:
internal_api_endpoint = InternalApiConfig.get_internal_api_endpoint()
response = requests.post(url=internal_api_endpoint,
data=json.dumps(data), headers=headers)
if response.status_code != 200:
- raise AirflowException(
+ raise AirflowHttpException(
f"Got {response.status_code}:{response.reason} when sending "
- f"the internal api request: {response.text}"
+ f"the internal api request: {response.text}",
+ HTTPStatus(response.status_code),
)
return response.content
diff --git a/tests/api_internal/test_internal_api_call.py
b/tests/api_internal/test_internal_api_call.py
index d779b504ea4..c619e1a695f 100644
--- a/tests/api_internal/test_internal_api_call.py
+++ b/tests/api_internal/test_internal_api_call.py
@@ -25,6 +25,7 @@ from unittest import mock
import pytest
import requests
+from tenacity import RetryError
from airflow.__main__ import configure_internal_api
from airflow.api_internal.internal_api_call import InternalApiConfig,
internal_api_call
@@ -266,6 +267,28 @@ class TestInternalApiCall:
assert call_kwargs["headers"]["Content-Type"] == "application/json"
assert "Authorization" in call_kwargs["headers"]
+ @conf_vars(
+ {
+ ("core", "database_access_isolation"): "true",
+ ("core", "internal_api_url"): "http://localhost:8888",
+ ("database", "sql_alchemy_conn"): "none://",
+ }
+ )
+ @mock.patch("airflow.api_internal.internal_api_call.requests")
+ @mock.patch("tenacity.time.sleep")
+ def test_retry_on_bad_gateway(self, mock_sleep, mock_requests):
+ configure_internal_api(Namespace(subcommand="dag-processor"), conf)
+ response = requests.Response()
+ response.status_code = 502
+ response.reason = "Bad Gateway"
+ response._content = b"Bad Gateway"
+
+ mock_sleep = lambda *_, **__: None # noqa: F841
+ mock_requests.post.return_value = response
+ with pytest.raises(RetryError):
+ TestInternalApiCall.fake_method_with_params("fake-dag",
task_id=123, session="session")
+ assert mock_requests.post.call_count == 10
+
@conf_vars(
{
("core", "database_access_isolation"): "true",