This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2d3058668e5 KubernetesPodOperator PodManager retries during create pod
on too many requests error (#58033)
2d3058668e5 is described below
commit 2d3058668e51d4a7038adb8ac48f8af6fccb5fe4
Author: AutomationDev85 <[email protected]>
AuthorDate: Mon Nov 17 09:31:27 2025 +0100
KubernetesPodOperator PodManager retries during create pod on too many
requests error (#58033)
* Retry create pod also on too many requests issue
* Fix unit test
* fix static checks
---------
Co-authored-by: AutomationDev85 <AutomationDev85>
---
.../providers/cncf/kubernetes/utils/pod_manager.py | 102 +++++++++++++++------
.../unit/cncf/kubernetes/triggers/test_pod.py | 6 +-
.../unit/cncf/kubernetes/utils/test_pod_manager.py | 88 +++++++++++-------
3 files changed, 135 insertions(+), 61 deletions(-)
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index a8cdcf9c634..e67371f3e89 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -20,6 +20,7 @@ from __future__ import annotations
import asyncio
import enum
+import functools
import json
import math
import time
@@ -38,6 +39,7 @@ from pendulum import DateTime
from pendulum.parsing.exceptions import ParserError
from urllib3.exceptions import HTTPError, TimeoutError
+from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.providers.cncf.kubernetes.callbacks import ExecutionMode,
KubernetesPodOperatorCallback
from airflow.providers.cncf.kubernetes.utils.container import (
@@ -71,17 +73,76 @@ Sentinel for no xcom result.
"""
-class PodLaunchFailedException(AirflowException):
- """When pod launching fails in KubernetesPodOperator."""
+API_RETRIES = conf.getint("workers", "api_retries", fallback=5)
+API_RETRY_WAIT_MIN = conf.getfloat("workers", "api_retry_wait_min", fallback=1)
+API_RETRY_WAIT_MAX = conf.getfloat("workers", "api_retry_wait_max",
fallback=15)
+
+_default_wait = tenacity.wait_exponential(min=API_RETRY_WAIT_MIN,
max=API_RETRY_WAIT_MAX)
+
+
+def get_retry_after_seconds(retry_state) -> int:
+ """Extract Retry-After header from ApiException if present and log wait
time."""
+ exception = retry_state.outcome.exception() if retry_state.outcome else
None
+ if exception and isinstance(exception, ApiException) and
str(exception.status) == "429":
+ retry_after = exception.headers.get("Retry-After") if
exception.headers else None
+ if retry_after:
+ try:
+ wait_seconds = int(retry_after)
+ return wait_seconds
+ except ValueError:
+ pass
+ # Default exponential backoff
+ wait_seconds = int(_default_wait(retry_state))
+ return wait_seconds
+
+
+def generic_api_retry(func):
+ """Apply tenacity retry logic for generic Kubernetes API calls."""
+
+ @functools.wraps(func)
+ def wrapper(*args, **kwargs):
+ retry_decorator = tenacity.retry(
+ stop=tenacity.stop_after_attempt(API_RETRIES),
+ wait=get_retry_after_seconds,
+ reraise=True,
+ )
+ return retry_decorator(func)(*args, **kwargs)
+
+ return wrapper
def should_retry_start_pod(exception: BaseException) -> bool:
"""Check if an Exception indicates a transient error and warrants
retrying."""
if isinstance(exception, ApiException):
- return str(exception.status) == "409"
+ # Retry on 409 (conflict) and 429 (too many requests)
+ return str(exception.status) in ("409", "429")
return False
+def create_pod_api_retry(func):
+ """
+ Apply tenacity retry logic for pod creation.
+
+ Retries on 409 and 429 errors, and respects Retry-After header for 429.
+ """
+
+ @functools.wraps(func)
+ def wrapper(*args, **kwargs):
+ retry_decorator = tenacity.retry(
+ stop=tenacity.stop_after_attempt(API_RETRIES),
+ wait=get_retry_after_seconds,
+ reraise=True,
+ retry=tenacity.retry_if_exception(should_retry_start_pod),
+ )
+ return retry_decorator(func)(*args, **kwargs)
+
+ return wrapper
+
+
+class PodLaunchFailedException(AirflowException):
+ """When pod launching fails in KubernetesPodOperator."""
+
+
class PodPhase:
"""
Possible pod phases.
@@ -355,12 +416,7 @@ class PodManager(LoggingMixin):
if str(e.status) != "404":
raise
- @tenacity.retry(
- stop=tenacity.stop_after_attempt(3),
- wait=tenacity.wait_random_exponential(),
- reraise=True,
- retry=tenacity.retry_if_exception(should_retry_start_pod),
- )
+ @create_pod_api_retry
def create_pod(self, pod: V1Pod) -> V1Pod:
"""Launch the pod asynchronously."""
return self.run_pod_async(pod)
@@ -718,7 +774,7 @@ class PodManager(LoggingMixin):
remote_pod = self.read_pod(pod)
return container_is_terminated(pod=remote_pod,
container_name=container_name)
- @tenacity.retry(stop=tenacity.stop_after_attempt(6),
wait=tenacity.wait_exponential(max=15), reraise=True)
+ @generic_api_retry
def read_pod_logs(
self,
pod: V1Pod,
@@ -761,7 +817,7 @@ class PodManager(LoggingMixin):
post_termination_timeout=post_termination_timeout,
)
- @tenacity.retry(stop=tenacity.stop_after_attempt(3),
wait=tenacity.wait_exponential(), reraise=True)
+ @generic_api_retry
def get_init_container_names(self, pod: V1Pod) -> list[str]:
"""
Return container names from the POD except for the
airflow-xcom-sidecar container.
@@ -770,7 +826,7 @@ class PodManager(LoggingMixin):
"""
return [container_spec.name for container_spec in
pod.spec.init_containers]
- @tenacity.retry(stop=tenacity.stop_after_attempt(3),
wait=tenacity.wait_exponential(), reraise=True)
+ @generic_api_retry
def get_container_names(self, pod: V1Pod) -> list[str]:
"""
Return container names from the POD except for the
airflow-xcom-sidecar container.
@@ -784,7 +840,7 @@ class PodManager(LoggingMixin):
if container_spec.name != PodDefaults.SIDECAR_CONTAINER_NAME
]
- @tenacity.retry(stop=tenacity.stop_after_attempt(3),
wait=tenacity.wait_exponential(), reraise=True)
+ @generic_api_retry
def read_pod_events(self, pod: V1Pod) -> CoreV1EventList:
"""Read events from the POD."""
try:
@@ -794,7 +850,7 @@ class PodManager(LoggingMixin):
except HTTPError as e:
raise AirflowException(f"There was an error reading the kubernetes
API: {e}")
- @tenacity.retry(stop=tenacity.stop_after_attempt(3),
wait=tenacity.wait_exponential(), reraise=True)
+ @generic_api_retry
def read_pod(self, pod: V1Pod) -> V1Pod:
"""Read POD information."""
try:
@@ -839,11 +895,7 @@ class PodManager(LoggingMixin):
finally:
self.extract_xcom_kill(pod)
- @tenacity.retry(
- stop=tenacity.stop_after_attempt(5),
- wait=tenacity.wait_exponential(multiplier=1, min=4, max=10),
- reraise=True,
- )
+ @generic_api_retry
def extract_xcom_json(self, pod: V1Pod) -> str:
"""Retrieve XCom value and also check if xcom json is valid."""
command = (
@@ -884,11 +936,7 @@ class PodManager(LoggingMixin):
raise AirflowException(f"Failed to extract xcom from pod:
{pod.metadata.name}")
return result
- @tenacity.retry(
- stop=tenacity.stop_after_attempt(5),
- wait=tenacity.wait_exponential(multiplier=1, min=4, max=10),
- reraise=True,
- )
+ @generic_api_retry
def extract_xcom_kill(self, pod: V1Pod):
"""Kill xcom sidecar container."""
with closing(
@@ -992,7 +1040,7 @@ class AsyncPodManager(LoggingMixin):
self._callbacks = callbacks or []
self.stop_watching_events = False
- @tenacity.retry(stop=tenacity.stop_after_attempt(5),
wait=tenacity.wait_exponential(), reraise=True)
+ @generic_api_retry
async def read_pod(self, pod: V1Pod) -> V1Pod:
"""Read POD information."""
return await self._hook.get_pod(
@@ -1000,7 +1048,7 @@ class AsyncPodManager(LoggingMixin):
pod.metadata.namespace,
)
- @tenacity.retry(stop=tenacity.stop_after_attempt(5),
wait=tenacity.wait_exponential(), reraise=True)
+ @generic_api_retry
async def read_pod_events(self, pod: V1Pod) -> CoreV1EventList:
"""Get pod's events."""
return await self._hook.get_pod_events(
@@ -1034,7 +1082,7 @@ class AsyncPodManager(LoggingMixin):
check_interval=check_interval,
)
- @tenacity.retry(stop=tenacity.stop_after_attempt(5),
wait=tenacity.wait_exponential(), reraise=True)
+ @generic_api_retry
async def fetch_container_logs_before_current_sec(
self, pod: V1Pod, container_name: str, since_time: DateTime | None =
None
) -> DateTime | None:
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
index 37486591ef1..406d7f9d02c 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
@@ -297,7 +297,11 @@ class TestKubernetesPodTrigger:
mock_datetime.timedelta = datetime.timedelta
mock_datetime.timezone = datetime.timezone
- mock_fetch_container_logs_before_current_sec.return_value =
DateTime(2022, 1, 1)
+
+ async def async_datetime_return(*args, **kwargs):
+ return DateTime(2022, 1, 1)
+
+ mock_fetch_container_logs_before_current_sec.side_effect =
async_datetime_return
define_container_state.side_effect = ["running", "running",
"terminated"]
trigger = KubernetesPodTrigger(
pod_name=POD_NAME,
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
index 852e5097efd..187e085ad5c 100644
---
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
+++
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
@@ -27,7 +27,6 @@ import pendulum
import pytest
import time_machine
from kubernetes.client.rest import ApiException
-from tenacity import wait_none
from urllib3.exceptions import HTTPError as BaseHTTPError
from airflow.exceptions import AirflowException
@@ -37,6 +36,7 @@ from airflow.providers.cncf.kubernetes.utils.pod_manager
import (
PodLogsConsumer,
PodManager,
PodPhase,
+ get_retry_after_seconds,
parse_log_line,
)
from airflow.utils.timezone import utc
@@ -47,6 +47,10 @@ if TYPE_CHECKING:
from pendulum import DateTime
+def wait_none(retry_state):
+ return 0
+
+
def test_parse_log_line():
log_message = "This should return no timestamp"
timestamp, line = parse_log_line(log_message)
@@ -59,6 +63,31 @@ def test_parse_log_line():
assert line == log_message
+class DummyRetryState:
+ def __init__(self, exception=None):
+ # self.attempt_number = 1
+ self.outcome = mock.Mock() if exception is not None else None
+ if self.outcome:
+ self.outcome.exception = mock.Mock(return_value=exception)
+
+
+def test_get_retry_after_seconds_with_retry_after_header():
+ exc = ApiException(status=429)
+ exc.headers = {"Retry-After": "15"}
+ retry_state = DummyRetryState(exception=exc)
+ wait = get_retry_after_seconds(retry_state)
+ assert wait == 15
+
+
[email protected](("attempt_number", "expected_wait"), [(1, 1), (4, 8)])
+def test_get_retry_after_seconds_without_retry_after_header(attempt_number,
expected_wait):
+ exc = ApiException(status=409)
+ retry_state = DummyRetryState(exception=exc)
+ retry_state.attempt_number = attempt_number
+ wait = get_retry_after_seconds(retry_state)
+ assert wait == expected_wait
+
+
class TestPodManager:
def setup_method(self):
self.mock_kube_client = mock.Mock()
@@ -66,23 +95,6 @@ class TestPodManager:
kube_client=self.mock_kube_client,
callbacks=[MockKubernetesPodOperatorCallback],
)
- # List of PodManager methods that may use tenacity retry
- tenacity_methods = [
- "await_pod_start",
- "read_pod_logs",
- "create_pod",
- "get_init_container_names",
- "get_container_names",
- "read_pod_events",
- "read_pod",
- "extract_xcom_json",
- "extract_xcom_kill",
- ]
- # Patch tenacity retry wait for all relevant methods to disable
waiting in tests
- for method_name in tenacity_methods:
- method = getattr(self.pod_manager, method_name, None)
- if method and hasattr(method, "retry"):
- method.retry.wait = wait_none()
def test_read_pod_logs_successfully_returns_logs(self):
mock.sentinel.metadata = mock.MagicMock()
@@ -91,6 +103,7 @@ class TestPodManager:
assert isinstance(logs, PodLogsConsumer)
assert logs.response == mock.sentinel.logs
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_retry_after_seconds",
wait_none)
def test_read_pod_logs_retries_successfully(self):
mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.read_namespaced_pod_log.side_effect = [
@@ -136,6 +149,7 @@ class TestPodManager:
self.pod_manager.fetch_container_logs(mock.MagicMock(),
"container-name", follow=True)
assert "[container-name] None" not in (record.message for record
in caplog.records)
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_retry_after_seconds",
wait_none)
def test_read_pod_logs_retries_fails(self):
mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.read_namespaced_pod_log.side_effect = [
@@ -228,6 +242,7 @@ class TestPodManager:
events = self.pod_manager.read_pod_events(mock.sentinel)
assert mock.sentinel.events == events
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_retry_after_seconds",
wait_none)
def test_read_pod_events_retries_successfully(self):
mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.list_namespaced_event.side_effect = [
@@ -249,12 +264,15 @@ class TestPodManager:
]
)
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_retry_after_seconds",
wait_none)
def test_read_pod_events_retries_fails(self):
mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.list_namespaced_event.side_effect = [
BaseHTTPError("Boom"),
BaseHTTPError("Boom"),
BaseHTTPError("Boom"),
+ BaseHTTPError("Boom"),
+ BaseHTTPError("Boom"),
]
with pytest.raises(AirflowException):
self.pod_manager.read_pod_events(mock.sentinel)
@@ -265,6 +283,7 @@ class TestPodManager:
pod_info = self.pod_manager.read_pod(mock.sentinel)
assert mock.sentinel.pod_info == pod_info
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_retry_after_seconds",
wait_none)
def test_read_pod_retries_successfully(self):
mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.read_namespaced_pod.side_effect = [
@@ -297,6 +316,7 @@ class TestPodManager:
self.mock_kube_client.read_namespaced_pod_log.return_value =
mock_response
self.pod_manager.fetch_container_logs(mock.sentinel, "base")
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_retry_after_seconds",
wait_none)
def test_monitor_pod_logs_failures_non_fatal(self):
mock.sentinel.metadata = mock.MagicMock()
running_status = mock.MagicMock()
@@ -320,12 +340,15 @@ class TestPodManager:
self.pod_manager.fetch_container_logs(mock.sentinel, "base")
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_retry_after_seconds",
wait_none)
def test_read_pod_retries_fails(self):
mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.read_namespaced_pod.side_effect = [
BaseHTTPError("Boom"),
BaseHTTPError("Boom"),
BaseHTTPError("Boom"),
+ BaseHTTPError("Boom"),
+ BaseHTTPError("Boom"),
]
with pytest.raises(AirflowException):
self.pod_manager.read_pod(mock.sentinel)
@@ -363,6 +386,7 @@ class TestPodManager:
]
)
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_retry_after_seconds",
wait_none)
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
def test_fetch_container_logs_failures(self, mock_container_is_running):
MockWrapper.reset()
@@ -415,21 +439,25 @@ class TestPodManager:
assert "message3 line1" in caplog.text
assert "ERROR" not in caplog.text
+ @pytest.mark.parametrize("status", [409, 429])
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_retry_after_seconds",
wait_none)
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.run_pod_async")
- def test_start_pod_retries_on_409_error(self, mock_run_pod_async):
+ def test_start_pod_retries_on_409_or_429_error(self, mock_run_pod_async,
status):
mock_run_pod_async.side_effect = [
- ApiException(status=409),
+ ApiException(status=status),
mock.MagicMock(),
]
self.pod_manager.create_pod(mock.sentinel)
assert mock_run_pod_async.call_count == 2
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_retry_after_seconds",
wait_none)
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.run_pod_async")
def test_start_pod_fails_on_other_exception(self, mock_run_pod_async):
mock_run_pod_async.side_effect = [ApiException(status=504)]
with pytest.raises(ApiException):
self.pod_manager.create_pod(mock.sentinel)
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_retry_after_seconds",
wait_none)
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.run_pod_async")
def test_start_pod_retries_three_times(self, mock_run_pod_async):
mock_run_pod_async.side_effect = [
@@ -437,11 +465,13 @@ class TestPodManager:
ApiException(status=409),
ApiException(status=409),
ApiException(status=409),
+ ApiException(status=409),
+ ApiException(status=409),
]
with pytest.raises(ApiException):
self.pod_manager.create_pod(mock.sentinel)
- assert mock_run_pod_async.call_count == 3
+ assert mock_run_pod_async.call_count == 5
@pytest.mark.asyncio
async def
test_start_pod_raises_informative_error_on_scheduled_timeout(self):
@@ -649,6 +679,7 @@ class TestPodManager:
assert ret == xcom_json
assert mock_exec_xcom_kill.call_count == 1
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_retry_after_seconds",
wait_none)
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.kubernetes_stream")
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.extract_xcom_kill")
def test_extract_xcom_failure(self, mock_exec_xcom_kill,
mock_kubernetes_stream):
@@ -677,6 +708,7 @@ class TestPodManager:
assert ret == xcom_result
assert mock_exec_xcom_kill.call_count == 1
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_retry_after_seconds",
wait_none)
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.kubernetes_stream")
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.extract_xcom_kill")
def test_extract_xcom_none(self, mock_exec_xcom_kill,
mock_kubernetes_stream):
@@ -690,6 +722,7 @@ class TestPodManager:
self.pod_manager.extract_xcom(pod=mock_pod)
assert mock_exec_xcom_kill.call_count == 1
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_retry_after_seconds",
wait_none)
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_terminated")
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
def test_await_xcom_sidecar_container_timeout(
@@ -740,18 +773,6 @@ class TestAsyncPodManager:
async_hook=self.mock_async_hook,
callbacks=[],
)
- # List of PodManager methods that may use tenacity retry
- tenacity_methods = [
- "await_pod_start",
- "fetch_container_logs_before_current_sec",
- "read_pod_events",
- "read_pod",
- ]
- # Patch tenacity retry wait for all relevant methods to disable
waiting in tests
- for method_name in tenacity_methods:
- method = getattr(self.async_pod_manager, method_name, None)
- if method and hasattr(method, "retry"):
- method.retry.wait = wait_none()
@pytest.mark.asyncio
async def
test_start_pod_raises_informative_error_on_scheduled_timeout(self):
@@ -951,6 +972,7 @@ class TestAsyncPodManager:
unexpected_call = mock.call("[%s] %s", container_name,
not_expected)
assert unexpected_call not in mock_log_info.mock_calls
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_retry_after_seconds",
wait_none)
@pytest.mark.asyncio
async def
test_fetch_container_logs_before_current_sec_error_handling(self):
pod = mock.MagicMock()