jscheffl commented on code in PR #58397:
URL: https://github.com/apache/airflow/pull/58397#discussion_r2539677795
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -848,15 +779,15 @@ def read_pod_events(self, pod: V1Pod) -> CoreV1EventList:
namespace=pod.metadata.namespace,
field_selector=f"involvedObject.name={pod.metadata.name}"
)
except HTTPError as e:
- raise AirflowException(f"There was an error reading the kubernetes
API: {e}")
+ raise KubernetesApiException(f"There was an error reading the
kubernetes API: {e}")
Review Comment:
Ah, cool that you clean this up when touching code anyway. Thanks!
##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py:
##########
@@ -103,8 +73,11 @@ def test_read_pod_logs_successfully_returns_logs(self):
assert isinstance(logs, PodLogsConsumer)
assert logs.response == mock.sentinel.logs
-
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_retry_after_seconds",
wait_none)
- def test_read_pod_logs_retries_successfully(self):
+ @mock.patch(
Review Comment:
Nit: Very many similar/same patches, would it make sense to make this a
general fixture to potentially auto-use?
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py:
##########
@@ -39,6 +42,61 @@
POD_NAME_MAX_LENGTH = 63 # Matches Linux kernel's HOST_NAME_MAX default value
minus 1.
+class PodLaunchFailedException(AirflowException):
+ """When pod launching fails in KubernetesPodOperator."""
+
+
+class KubernetesApiException(AirflowException):
+ """When communication with kubernetes API fails."""
+
+
+API_RETRIES = conf.getint("workers", "api_retries", fallback=5)
+API_RETRY_WAIT_MIN = conf.getfloat("workers", "api_retry_wait_min", fallback=1)
+API_RETRY_WAIT_MAX = conf.getfloat("workers", "api_retry_wait_max",
fallback=15)
+
+_default_wait = tenacity.wait_exponential(min=API_RETRY_WAIT_MIN,
max=API_RETRY_WAIT_MAX)
+
+TRANSIENT_STATUS_CODES = {409, 429, 500, 502, 503, 504}
+
+
+def _should_retry_api(exc: BaseException) -> bool:
+ """Retry on selected ApiException status codes, plus plain HTTP/timeout
errors."""
+ if isinstance(exc, ApiException):
+ return exc.status in TRANSIENT_STATUS_CODES
+ return isinstance(exc, (HTTPError, KubernetesApiException))
+
+
+class WaitRetryAfterOrExponential(tenacity.wait.wait_base):
+ """Wait strategy that honors Retry-After header on 429, else falls back to
exponential backoff."""
+
+ def __call__(self, retry_state):
Review Comment:
Oh, very cool to combine this in one helper!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]