This is an automated email from the ASF dual-hosted git repository.

bugraoz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fe5730407ee Add retry mechanism to airflowctl and remove flaky 
integration mark (#63016)
fe5730407ee is described below

commit fe5730407ee4c7e172399fcca25b2c984ebcb5b5
Author: Bugra Ozturk <[email protected]>
AuthorDate: Mon Mar 9 18:12:12 2026 +0100

    Add retry mechanism to airflowctl and remove flaky integration mark (#63016)
    
    * tenacity added to include retry over httpx
---
 .../airflowctl_tests/test_airflowctl_commands.py   |  2 -
 airflow-ctl/docs/cli-and-env-variables-ref.rst     | 18 +++++++
 airflow-ctl/pyproject.toml                         |  1 +
 airflow-ctl/src/airflowctl/api/client.py           | 39 ++++++++++++++
 airflow-ctl/tests/airflow_ctl/api/test_client.py   | 62 ++++++++++++++++++++++
 5 files changed, 120 insertions(+), 2 deletions(-)

diff --git 
a/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py 
b/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py
index 3f89a7c9698..d9d002e9eda 100644
--- a/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py
+++ b/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py
@@ -131,7 +131,6 @@ TEST_COMMANDS_SKIP_KEYRING = [LOGIN_COMMAND_SKIP_KEYRING] + 
[
 ]
 
 
[email protected](reruns=3, reruns_delay=1)
 @pytest.mark.parametrize(
     "command",
     TEST_COMMANDS_DEBUG_MODE,
@@ -144,7 +143,6 @@ def test_airflowctl_commands(command: str, run_command):
     run_command(command, env_vars, skip_login=True)
 
 
[email protected](reruns=3, reruns_delay=1)
 @pytest.mark.parametrize(
     "command",
     TEST_COMMANDS_SKIP_KEYRING,
diff --git a/airflow-ctl/docs/cli-and-env-variables-ref.rst 
b/airflow-ctl/docs/cli-and-env-variables-ref.rst
index 2d63d47d5be..66e3638a4a7 100644
--- a/airflow-ctl/docs/cli-and-env-variables-ref.rst
+++ b/airflow-ctl/docs/cli-and-env-variables-ref.rst
@@ -60,3 +60,21 @@ Environment Variables
     It disables some features such as keyring integration and save credentials 
to file.
     It is only meant to use if either you are developing airflowctl or running 
API integration tests.
     Please do not use this variable unless you know what you are doing.
+
+.. envvar:: AIRFLOW_CLI_API_RETRIES
+
+    The number of times to retry an API call if it fails. This is
+    only used if you are using the Airflow API and have not set up
+    authentication using a different method. The default value is 3.
+
+.. envvar:: AIRFLOW_CLI_API_RETRY_WAIT_MIN
+
+    The minimum amount of time to wait between API retries in seconds.
+    This is only used if you are using the Airflow API and have not set up
+    authentication using a different method. The default value is 1 second.
+
+.. envvar:: AIRFLOW_CLI_API_RETRY_WAIT_MAX
+
+    The maximum amount of time to wait between API retries in seconds.
+    This is only used if you are using the Airflow API and have not set up
+    authentication using a different method. The default value is 10 seconds.
diff --git a/airflow-ctl/pyproject.toml b/airflow-ctl/pyproject.toml
index 45e42a0c5b3..fec7206a230 100644
--- a/airflow-ctl/pyproject.toml
+++ b/airflow-ctl/pyproject.toml
@@ -40,6 +40,7 @@ dependencies = [
     "structlog>=25.4.0",
     "uuid6>=2024.7.10",
     "tabulate>=0.9.0",
+    "tenacity>=9.1.4",
 ]
 
 classifiers = [
diff --git a/airflow-ctl/src/airflowctl/api/client.py 
b/airflow-ctl/src/airflowctl/api/client.py
index 7c03dae3055..0ef5d7cb164 100644
--- a/airflow-ctl/src/airflowctl/api/client.py
+++ b/airflow-ctl/src/airflowctl/api/client.py
@@ -21,6 +21,7 @@ import contextlib
 import enum
 import getpass
 import json
+import logging
 import os
 import sys
 from collections.abc import Callable
@@ -32,6 +33,13 @@ import keyring
 import structlog
 from httpx import URL
 from keyring.errors import NoKeyringError
+from tenacity import (
+    before_log,
+    retry,
+    retry_if_exception,
+    stop_after_attempt,
+    wait_random_exponential,
+)
 from uuid6 import uuid7
 
 from airflowctl import __version__ as version
@@ -261,6 +269,20 @@ class BearerAuth(httpx.Auth):
         yield request
 
 
+def _should_retry_api_request(exception: BaseException) -> bool:
+    """Determine if an API request should be retried based on the exception 
type."""
+    if isinstance(exception, httpx.HTTPStatusError):
+        return exception.response.status_code >= 500
+
+    return isinstance(exception, httpx.RequestError)
+
+
+# API Client Retry Configuration
+API_RETRIES = int(os.getenv("AIRFLOW_CLI_API_RETRIES", "3"))
+API_RETRY_WAIT_MIN = int(os.getenv("AIRFLOW_CLI_API_RETRY_WAIT_MIN", "1"))
+API_RETRY_WAIT_MAX = int(os.getenv("AIRFLOW_CLI_API_RETRY_WAIT_MAX", "10"))
+
+
 class Client(httpx.Client):
     """Client for the Airflow REST API."""
 
@@ -298,6 +320,23 @@ class Client(httpx.Client):
             return f"{base_url}/auth"
         return f"{base_url}/api/v2"
 
+    @retry(
+        retry=retry_if_exception(_should_retry_api_request),
+        stop=stop_after_attempt(API_RETRIES),
+        wait=wait_random_exponential(min=API_RETRY_WAIT_MIN, 
max=API_RETRY_WAIT_MAX),
+        before_sleep=before_log(log, logging.WARNING),
+        reraise=True,
+    )
+    def request(self, *args, **kwargs):
+        """Implement a convenience for httpx.Client.request with a retry 
layer."""
+        # Set content type as convenience if not already set
+        if kwargs.get("content", None) is not None and "content-type" not in (
+            kwargs.get("headers", {}) or {}
+        ):
+            kwargs["headers"] = {"content-type": "application/json"}
+
+        return super().request(*args, **kwargs)
+
     @lru_cache()  # type: ignore[prop-decorator]
     @property
     def login(self):
diff --git a/airflow-ctl/tests/airflow_ctl/api/test_client.py 
b/airflow-ctl/tests/airflow_ctl/api/test_client.py
index f79322d16fb..0617d62276a 100644
--- a/airflow-ctl/tests/airflow_ctl/api/test_client.py
+++ b/airflow-ctl/tests/airflow_ctl/api/test_client.py
@@ -25,6 +25,7 @@ from unittest.mock import MagicMock, patch
 
 import httpx
 import pytest
+import time_machine
 from httpx import URL
 
 from airflowctl.api.client import Client, ClientKind, Credentials, 
_bounded_get_new_password
@@ -32,6 +33,15 @@ from airflowctl.api.operations import ServerResponseError
 from airflowctl.exceptions import AirflowCtlCredentialNotFoundException, 
AirflowCtlKeyringException
 
 
+def make_client_w_responses(responses: list[httpx.Response]) -> Client:
+    """Get a client with custom responses."""
+
+    def handle_request(request: httpx.Request) -> httpx.Response:
+        return responses.pop(0)
+
+    return Client(base_url="", token="", mounts={"'http://": 
httpx.MockTransport(handle_request)})
+
+
 @pytest.fixture(autouse=True)
 def unique_config_dir():
     temp_dir = tempfile.mkdtemp()
@@ -314,3 +324,55 @@ class TestSaveKeyringPatching:
 
         assert not hasattr(mock_backend, "_get_new_password")
         mock_keyring.set_password.assert_called_once_with("airflowctl", 
"api_token_production", "token")
+
+    def test_retry_handling_unrecoverable_error(self):
+        with time_machine.travel("2023-01-01T00:00:00Z", tick=False):
+            responses: list[httpx.Response] = [
+                *[httpx.Response(500, text="Internal Server Error")] * 6,
+                httpx.Response(200, json={"detail": "Recovered from error - 
but will fail before"}),
+                httpx.Response(400, json={"detail": "Should not get here"}),
+            ]
+            client = make_client_w_responses(responses)
+
+            with pytest.raises(httpx.HTTPStatusError) as err:
+                client.get("http://error";)
+            assert not isinstance(err.value, ServerResponseError)
+            assert len(responses) == 5
+
+    def test_retry_handling_recovered(self):
+        with time_machine.travel("2023-01-01T00:00:00Z", tick=False):
+            responses: list[httpx.Response] = [
+                *[httpx.Response(500, text="Internal Server Error")] * 2,
+                httpx.Response(200, json={"detail": "Recovered from error"}),
+                httpx.Response(400, json={"detail": "Should not get here"}),
+            ]
+            client = make_client_w_responses(responses)
+
+            response = client.get("http://error";)
+            assert response.status_code == 200
+            assert len(responses) == 1
+
+    def test_retry_handling_non_retry_error(self):
+        with time_machine.travel("2023-01-01T00:00:00Z", tick=False):
+            responses: list[httpx.Response] = [
+                httpx.Response(422, json={"detail": "Somehow this is a bad 
request"}),
+                httpx.Response(400, json={"detail": "Should not get here"}),
+            ]
+            client = make_client_w_responses(responses)
+
+            with pytest.raises(ServerResponseError) as err:
+                client.get("http://error";)
+            assert len(responses) == 1
+            assert err.value.args == ("Client error message: {'detail': 
'Somehow this is a bad request'}",)
+
+    def test_retry_handling_ok(self):
+        with time_machine.travel("2023-01-01T00:00:00Z", tick=False):
+            responses: list[httpx.Response] = [
+                httpx.Response(200, json={"detail": "Recovered from error"}),
+                httpx.Response(400, json={"detail": "Should not get here"}),
+            ]
+            client = make_client_w_responses(responses)
+
+            response = client.get("http://error";)
+            assert response.status_code == 200
+            assert len(responses) == 1

Reply via email to