This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2d3058668e5 KubernetesPodOperator PodManager retries during create pod 
on too many requests error (#58033)
2d3058668e5 is described below

commit 2d3058668e51d4a7038adb8ac48f8af6fccb5fe4
Author: AutomationDev85 <[email protected]>
AuthorDate: Mon Nov 17 09:31:27 2025 +0100

    KubernetesPodOperator PodManager retries during create pod on too many 
requests error (#58033)
    
    * Retry create pod also on too many requests issue
    
    * Fix unit test
    
    * fix static checks
    
    ---------
    
    Co-authored-by: AutomationDev85 <AutomationDev85>
---
 .../providers/cncf/kubernetes/utils/pod_manager.py | 102 +++++++++++++++------
 .../unit/cncf/kubernetes/triggers/test_pod.py      |   6 +-
 .../unit/cncf/kubernetes/utils/test_pod_manager.py |  88 +++++++++++-------
 3 files changed, 135 insertions(+), 61 deletions(-)

diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index a8cdcf9c634..e67371f3e89 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -20,6 +20,7 @@ from __future__ import annotations
 
 import asyncio
 import enum
+import functools
 import json
 import math
 import time
@@ -38,6 +39,7 @@ from pendulum import DateTime
 from pendulum.parsing.exceptions import ParserError
 from urllib3.exceptions import HTTPError, TimeoutError
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 from airflow.providers.cncf.kubernetes.callbacks import ExecutionMode, 
KubernetesPodOperatorCallback
 from airflow.providers.cncf.kubernetes.utils.container import (
@@ -71,17 +73,76 @@ Sentinel for no xcom result.
 """
 
 
-class PodLaunchFailedException(AirflowException):
-    """When pod launching fails in KubernetesPodOperator."""
+API_RETRIES = conf.getint("workers", "api_retries", fallback=5)
+API_RETRY_WAIT_MIN = conf.getfloat("workers", "api_retry_wait_min", fallback=1)
+API_RETRY_WAIT_MAX = conf.getfloat("workers", "api_retry_wait_max", 
fallback=15)
+
+_default_wait = tenacity.wait_exponential(min=API_RETRY_WAIT_MIN, 
max=API_RETRY_WAIT_MAX)
+
+
+def get_retry_after_seconds(retry_state) -> int:
+    """Extract Retry-After header from ApiException if present and log wait 
time."""
+    exception = retry_state.outcome.exception() if retry_state.outcome else 
None
+    if exception and isinstance(exception, ApiException) and 
str(exception.status) == "429":
+        retry_after = exception.headers.get("Retry-After") if 
exception.headers else None
+        if retry_after:
+            try:
+                wait_seconds = int(retry_after)
+                return wait_seconds
+            except ValueError:
+                pass
+    # Default exponential backoff
+    wait_seconds = int(_default_wait(retry_state))
+    return wait_seconds
+
+
+def generic_api_retry(func):
+    """Apply tenacity retry logic for generic Kubernetes API calls."""
+
+    @functools.wraps(func)
+    def wrapper(*args, **kwargs):
+        retry_decorator = tenacity.retry(
+            stop=tenacity.stop_after_attempt(API_RETRIES),
+            wait=get_retry_after_seconds,
+            reraise=True,
+        )
+        return retry_decorator(func)(*args, **kwargs)
+
+    return wrapper
 
 
 def should_retry_start_pod(exception: BaseException) -> bool:
     """Check if an Exception indicates a transient error and warrants 
retrying."""
     if isinstance(exception, ApiException):
-        return str(exception.status) == "409"
+        # Retry on 409 (conflict) and 429 (too many requests)
+        return str(exception.status) in ("409", "429")
     return False
 
 
+def create_pod_api_retry(func):
+    """
+    Apply tenacity retry logic for pod creation.
+
+    Retries on 409 and 429 errors, and respects Retry-After header for 429.
+    """
+
+    @functools.wraps(func)
+    def wrapper(*args, **kwargs):
+        retry_decorator = tenacity.retry(
+            stop=tenacity.stop_after_attempt(API_RETRIES),
+            wait=get_retry_after_seconds,
+            reraise=True,
+            retry=tenacity.retry_if_exception(should_retry_start_pod),
+        )
+        return retry_decorator(func)(*args, **kwargs)
+
+    return wrapper
+
+
+class PodLaunchFailedException(AirflowException):
+    """When pod launching fails in KubernetesPodOperator."""
+
+
 class PodPhase:
     """
     Possible pod phases.
@@ -355,12 +416,7 @@ class PodManager(LoggingMixin):
             if str(e.status) != "404":
                 raise
 
-    @tenacity.retry(
-        stop=tenacity.stop_after_attempt(3),
-        wait=tenacity.wait_random_exponential(),
-        reraise=True,
-        retry=tenacity.retry_if_exception(should_retry_start_pod),
-    )
+    @create_pod_api_retry
     def create_pod(self, pod: V1Pod) -> V1Pod:
         """Launch the pod asynchronously."""
         return self.run_pod_async(pod)
@@ -718,7 +774,7 @@ class PodManager(LoggingMixin):
         remote_pod = self.read_pod(pod)
         return container_is_terminated(pod=remote_pod, 
container_name=container_name)
 
-    @tenacity.retry(stop=tenacity.stop_after_attempt(6), 
wait=tenacity.wait_exponential(max=15), reraise=True)
+    @generic_api_retry
     def read_pod_logs(
         self,
         pod: V1Pod,
@@ -761,7 +817,7 @@ class PodManager(LoggingMixin):
             post_termination_timeout=post_termination_timeout,
         )
 
-    @tenacity.retry(stop=tenacity.stop_after_attempt(3), 
wait=tenacity.wait_exponential(), reraise=True)
+    @generic_api_retry
     def get_init_container_names(self, pod: V1Pod) -> list[str]:
         """
         Return container names from the POD except for the 
airflow-xcom-sidecar container.
@@ -770,7 +826,7 @@ class PodManager(LoggingMixin):
         """
         return [container_spec.name for container_spec in 
pod.spec.init_containers]
 
-    @tenacity.retry(stop=tenacity.stop_after_attempt(3), 
wait=tenacity.wait_exponential(), reraise=True)
+    @generic_api_retry
     def get_container_names(self, pod: V1Pod) -> list[str]:
         """
         Return container names from the POD except for the 
airflow-xcom-sidecar container.
@@ -784,7 +840,7 @@ class PodManager(LoggingMixin):
             if container_spec.name != PodDefaults.SIDECAR_CONTAINER_NAME
         ]
 
-    @tenacity.retry(stop=tenacity.stop_after_attempt(3), 
wait=tenacity.wait_exponential(), reraise=True)
+    @generic_api_retry
     def read_pod_events(self, pod: V1Pod) -> CoreV1EventList:
         """Read events from the POD."""
         try:
@@ -794,7 +850,7 @@ class PodManager(LoggingMixin):
         except HTTPError as e:
             raise AirflowException(f"There was an error reading the kubernetes 
API: {e}")
 
-    @tenacity.retry(stop=tenacity.stop_after_attempt(3), 
wait=tenacity.wait_exponential(), reraise=True)
+    @generic_api_retry
     def read_pod(self, pod: V1Pod) -> V1Pod:
         """Read POD information."""
         try:
@@ -839,11 +895,7 @@ class PodManager(LoggingMixin):
         finally:
             self.extract_xcom_kill(pod)
 
-    @tenacity.retry(
-        stop=tenacity.stop_after_attempt(5),
-        wait=tenacity.wait_exponential(multiplier=1, min=4, max=10),
-        reraise=True,
-    )
+    @generic_api_retry
     def extract_xcom_json(self, pod: V1Pod) -> str:
         """Retrieve XCom value and also check if xcom json is valid."""
         command = (
@@ -884,11 +936,7 @@ class PodManager(LoggingMixin):
             raise AirflowException(f"Failed to extract xcom from pod: 
{pod.metadata.name}")
         return result
 
-    @tenacity.retry(
-        stop=tenacity.stop_after_attempt(5),
-        wait=tenacity.wait_exponential(multiplier=1, min=4, max=10),
-        reraise=True,
-    )
+    @generic_api_retry
     def extract_xcom_kill(self, pod: V1Pod):
         """Kill xcom sidecar container."""
         with closing(
@@ -992,7 +1040,7 @@ class AsyncPodManager(LoggingMixin):
         self._callbacks = callbacks or []
         self.stop_watching_events = False
 
-    @tenacity.retry(stop=tenacity.stop_after_attempt(5), 
wait=tenacity.wait_exponential(), reraise=True)
+    @generic_api_retry
     async def read_pod(self, pod: V1Pod) -> V1Pod:
         """Read POD information."""
         return await self._hook.get_pod(
@@ -1000,7 +1048,7 @@ class AsyncPodManager(LoggingMixin):
             pod.metadata.namespace,
         )
 
-    @tenacity.retry(stop=tenacity.stop_after_attempt(5), 
wait=tenacity.wait_exponential(), reraise=True)
+    @generic_api_retry
     async def read_pod_events(self, pod: V1Pod) -> CoreV1EventList:
         """Get pod's events."""
         return await self._hook.get_pod_events(
@@ -1034,7 +1082,7 @@ class AsyncPodManager(LoggingMixin):
             check_interval=check_interval,
         )
 
-    @tenacity.retry(stop=tenacity.stop_after_attempt(5), 
wait=tenacity.wait_exponential(), reraise=True)
+    @generic_api_retry
     async def fetch_container_logs_before_current_sec(
         self, pod: V1Pod, container_name: str, since_time: DateTime | None = 
None
     ) -> DateTime | None:
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
index 37486591ef1..406d7f9d02c 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
@@ -297,7 +297,11 @@ class TestKubernetesPodTrigger:
 
         mock_datetime.timedelta = datetime.timedelta
         mock_datetime.timezone = datetime.timezone
-        mock_fetch_container_logs_before_current_sec.return_value = 
DateTime(2022, 1, 1)
+
+        async def async_datetime_return(*args, **kwargs):
+            return DateTime(2022, 1, 1)
+
+        mock_fetch_container_logs_before_current_sec.side_effect = 
async_datetime_return
         define_container_state.side_effect = ["running", "running", 
"terminated"]
         trigger = KubernetesPodTrigger(
             pod_name=POD_NAME,
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
index 852e5097efd..187e085ad5c 100644
--- 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
+++ 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
@@ -27,7 +27,6 @@ import pendulum
 import pytest
 import time_machine
 from kubernetes.client.rest import ApiException
-from tenacity import wait_none
 from urllib3.exceptions import HTTPError as BaseHTTPError
 
 from airflow.exceptions import AirflowException
@@ -37,6 +36,7 @@ from airflow.providers.cncf.kubernetes.utils.pod_manager 
import (
     PodLogsConsumer,
     PodManager,
     PodPhase,
+    get_retry_after_seconds,
     parse_log_line,
 )
 from airflow.utils.timezone import utc
@@ -47,6 +47,10 @@ if TYPE_CHECKING:
     from pendulum import DateTime
 
 
+def wait_none(retry_state):
+    return 0
+
+
 def test_parse_log_line():
     log_message = "This should return no timestamp"
     timestamp, line = parse_log_line(log_message)
@@ -59,6 +63,31 @@ def test_parse_log_line():
     assert line == log_message
 
 
+class DummyRetryState:
+    def __init__(self, exception=None):
+        # self.attempt_number = 1
+        self.outcome = mock.Mock() if exception is not None else None
+        if self.outcome:
+            self.outcome.exception = mock.Mock(return_value=exception)
+
+
+def test_get_retry_after_seconds_with_retry_after_header():
+    exc = ApiException(status=429)
+    exc.headers = {"Retry-After": "15"}
+    retry_state = DummyRetryState(exception=exc)
+    wait = get_retry_after_seconds(retry_state)
+    assert wait == 15
+
+
[email protected](("attempt_number", "expected_wait"), [(1, 1), (4, 8)])
+def test_get_retry_after_seconds_without_retry_after_header(attempt_number, 
expected_wait):
+    exc = ApiException(status=409)
+    retry_state = DummyRetryState(exception=exc)
+    retry_state.attempt_number = attempt_number
+    wait = get_retry_after_seconds(retry_state)
+    assert wait == expected_wait
+
+
 class TestPodManager:
     def setup_method(self):
         self.mock_kube_client = mock.Mock()
@@ -66,23 +95,6 @@ class TestPodManager:
             kube_client=self.mock_kube_client,
             callbacks=[MockKubernetesPodOperatorCallback],
         )
-        # List of PodManager methods that may use tenacity retry
-        tenacity_methods = [
-            "await_pod_start",
-            "read_pod_logs",
-            "create_pod",
-            "get_init_container_names",
-            "get_container_names",
-            "read_pod_events",
-            "read_pod",
-            "extract_xcom_json",
-            "extract_xcom_kill",
-        ]
-        # Patch tenacity retry wait for all relevant methods to disable 
waiting in tests
-        for method_name in tenacity_methods:
-            method = getattr(self.pod_manager, method_name, None)
-            if method and hasattr(method, "retry"):
-                method.retry.wait = wait_none()
 
     def test_read_pod_logs_successfully_returns_logs(self):
         mock.sentinel.metadata = mock.MagicMock()
@@ -91,6 +103,7 @@ class TestPodManager:
         assert isinstance(logs, PodLogsConsumer)
         assert logs.response == mock.sentinel.logs
 
+    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_retry_after_seconds",
 wait_none)
     def test_read_pod_logs_retries_successfully(self):
         mock.sentinel.metadata = mock.MagicMock()
         self.mock_kube_client.read_namespaced_pod_log.side_effect = [
@@ -136,6 +149,7 @@ class TestPodManager:
             self.pod_manager.fetch_container_logs(mock.MagicMock(), 
"container-name", follow=True)
             assert "[container-name] None" not in (record.message for record 
in caplog.records)
 
+    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_retry_after_seconds",
 wait_none)
     def test_read_pod_logs_retries_fails(self):
         mock.sentinel.metadata = mock.MagicMock()
         self.mock_kube_client.read_namespaced_pod_log.side_effect = [
@@ -228,6 +242,7 @@ class TestPodManager:
         events = self.pod_manager.read_pod_events(mock.sentinel)
         assert mock.sentinel.events == events
 
+    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_retry_after_seconds",
 wait_none)
     def test_read_pod_events_retries_successfully(self):
         mock.sentinel.metadata = mock.MagicMock()
         self.mock_kube_client.list_namespaced_event.side_effect = [
@@ -249,12 +264,15 @@ class TestPodManager:
             ]
         )
 
+    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_retry_after_seconds",
 wait_none)
     def test_read_pod_events_retries_fails(self):
         mock.sentinel.metadata = mock.MagicMock()
         self.mock_kube_client.list_namespaced_event.side_effect = [
             BaseHTTPError("Boom"),
             BaseHTTPError("Boom"),
             BaseHTTPError("Boom"),
+            BaseHTTPError("Boom"),
+            BaseHTTPError("Boom"),
         ]
         with pytest.raises(AirflowException):
             self.pod_manager.read_pod_events(mock.sentinel)
@@ -265,6 +283,7 @@ class TestPodManager:
         pod_info = self.pod_manager.read_pod(mock.sentinel)
         assert mock.sentinel.pod_info == pod_info
 
+    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_retry_after_seconds",
 wait_none)
     def test_read_pod_retries_successfully(self):
         mock.sentinel.metadata = mock.MagicMock()
         self.mock_kube_client.read_namespaced_pod.side_effect = [
@@ -297,6 +316,7 @@ class TestPodManager:
         self.mock_kube_client.read_namespaced_pod_log.return_value = 
mock_response
         self.pod_manager.fetch_container_logs(mock.sentinel, "base")
 
+    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_retry_after_seconds",
 wait_none)
     def test_monitor_pod_logs_failures_non_fatal(self):
         mock.sentinel.metadata = mock.MagicMock()
         running_status = mock.MagicMock()
@@ -320,12 +340,15 @@ class TestPodManager:
 
         self.pod_manager.fetch_container_logs(mock.sentinel, "base")
 
+    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_retry_after_seconds",
 wait_none)
     def test_read_pod_retries_fails(self):
         mock.sentinel.metadata = mock.MagicMock()
         self.mock_kube_client.read_namespaced_pod.side_effect = [
             BaseHTTPError("Boom"),
             BaseHTTPError("Boom"),
             BaseHTTPError("Boom"),
+            BaseHTTPError("Boom"),
+            BaseHTTPError("Boom"),
         ]
         with pytest.raises(AirflowException):
             self.pod_manager.read_pod(mock.sentinel)
@@ -363,6 +386,7 @@ class TestPodManager:
             ]
         )
 
+    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_retry_after_seconds",
 wait_none)
     
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
     def test_fetch_container_logs_failures(self, mock_container_is_running):
         MockWrapper.reset()
@@ -415,21 +439,25 @@ class TestPodManager:
         assert "message3 line1" in caplog.text
         assert "ERROR" not in caplog.text
 
+    @pytest.mark.parametrize("status", [409, 429])
+    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_retry_after_seconds",
 wait_none)
     
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.run_pod_async")
-    def test_start_pod_retries_on_409_error(self, mock_run_pod_async):
+    def test_start_pod_retries_on_409_or_429_error(self, mock_run_pod_async, 
status):
         mock_run_pod_async.side_effect = [
-            ApiException(status=409),
+            ApiException(status=status),
             mock.MagicMock(),
         ]
         self.pod_manager.create_pod(mock.sentinel)
         assert mock_run_pod_async.call_count == 2
 
+    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_retry_after_seconds",
 wait_none)
     
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.run_pod_async")
     def test_start_pod_fails_on_other_exception(self, mock_run_pod_async):
         mock_run_pod_async.side_effect = [ApiException(status=504)]
         with pytest.raises(ApiException):
             self.pod_manager.create_pod(mock.sentinel)
 
+    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_retry_after_seconds",
 wait_none)
     
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.run_pod_async")
     def test_start_pod_retries_three_times(self, mock_run_pod_async):
         mock_run_pod_async.side_effect = [
@@ -437,11 +465,13 @@ class TestPodManager:
             ApiException(status=409),
             ApiException(status=409),
             ApiException(status=409),
+            ApiException(status=409),
+            ApiException(status=409),
         ]
         with pytest.raises(ApiException):
             self.pod_manager.create_pod(mock.sentinel)
 
-        assert mock_run_pod_async.call_count == 3
+        assert mock_run_pod_async.call_count == 5
 
     @pytest.mark.asyncio
     async def 
test_start_pod_raises_informative_error_on_scheduled_timeout(self):
@@ -649,6 +679,7 @@ class TestPodManager:
         assert ret == xcom_json
         assert mock_exec_xcom_kill.call_count == 1
 
+    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_retry_after_seconds",
 wait_none)
     
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.kubernetes_stream")
     
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.extract_xcom_kill")
     def test_extract_xcom_failure(self, mock_exec_xcom_kill, 
mock_kubernetes_stream):
@@ -677,6 +708,7 @@ class TestPodManager:
         assert ret == xcom_result
         assert mock_exec_xcom_kill.call_count == 1
 
+    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_retry_after_seconds",
 wait_none)
     
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.kubernetes_stream")
     
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.extract_xcom_kill")
     def test_extract_xcom_none(self, mock_exec_xcom_kill, 
mock_kubernetes_stream):
@@ -690,6 +722,7 @@ class TestPodManager:
             self.pod_manager.extract_xcom(pod=mock_pod)
         assert mock_exec_xcom_kill.call_count == 1
 
+    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_retry_after_seconds",
 wait_none)
     
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_terminated")
     
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
     def test_await_xcom_sidecar_container_timeout(
@@ -740,18 +773,6 @@ class TestAsyncPodManager:
             async_hook=self.mock_async_hook,
             callbacks=[],
         )
-        # List of PodManager methods that may use tenacity retry
-        tenacity_methods = [
-            "await_pod_start",
-            "fetch_container_logs_before_current_sec",
-            "read_pod_events",
-            "read_pod",
-        ]
-        # Patch tenacity retry wait for all relevant methods to disable 
waiting in tests
-        for method_name in tenacity_methods:
-            method = getattr(self.async_pod_manager, method_name, None)
-            if method and hasattr(method, "retry"):
-                method.retry.wait = wait_none()
 
     @pytest.mark.asyncio
     async def 
test_start_pod_raises_informative_error_on_scheduled_timeout(self):
@@ -951,6 +972,7 @@ class TestAsyncPodManager:
                     unexpected_call = mock.call("[%s] %s", container_name, 
not_expected)
                     assert unexpected_call not in mock_log_info.mock_calls
 
+    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_retry_after_seconds",
 wait_none)
     @pytest.mark.asyncio
     async def 
test_fetch_container_logs_before_current_sec_error_handling(self):
         pod = mock.MagicMock()

Reply via email to