jscheffl commented on code in PR #58397:
URL: https://github.com/apache/airflow/pull/58397#discussion_r2539677795


##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -848,15 +779,15 @@ def read_pod_events(self, pod: V1Pod) -> CoreV1EventList:
                 namespace=pod.metadata.namespace, 
field_selector=f"involvedObject.name={pod.metadata.name}"
             )
         except HTTPError as e:
-            raise AirflowException(f"There was an error reading the kubernetes 
API: {e}")
+            raise KubernetesApiException(f"There was an error reading the 
kubernetes API: {e}")

Review Comment:
   Ah, cool that you clean this up when touching code anyway. Thanks!



##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py:
##########
@@ -103,8 +73,11 @@ def test_read_pod_logs_successfully_returns_logs(self):
         assert isinstance(logs, PodLogsConsumer)
         assert logs.response == mock.sentinel.logs
 
-    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_retry_after_seconds",
 wait_none)
-    def test_read_pod_logs_retries_successfully(self):
+    @mock.patch(

Review Comment:
   Nit: Very many similar/same patches, would it make sense to make this a 
general fixture to potentially auto-use?



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py:
##########
@@ -39,6 +42,61 @@
 POD_NAME_MAX_LENGTH = 63  # Matches Linux kernel's HOST_NAME_MAX default value 
minus 1.
 
 
+class PodLaunchFailedException(AirflowException):
+    """When pod launching fails in KubernetesPodOperator."""
+
+
+class KubernetesApiException(AirflowException):
+    """When communication with kubernetes API fails."""
+
+
+API_RETRIES = conf.getint("workers", "api_retries", fallback=5)
+API_RETRY_WAIT_MIN = conf.getfloat("workers", "api_retry_wait_min", fallback=1)
+API_RETRY_WAIT_MAX = conf.getfloat("workers", "api_retry_wait_max", 
fallback=15)
+
+_default_wait = tenacity.wait_exponential(min=API_RETRY_WAIT_MIN, 
max=API_RETRY_WAIT_MAX)
+
+TRANSIENT_STATUS_CODES = {409, 429, 500, 502, 503, 504}
+
+
+def _should_retry_api(exc: BaseException) -> bool:
+    """Retry on selected ApiException status codes, plus plain HTTP/timeout 
errors."""
+    if isinstance(exc, ApiException):
+        return exc.status in TRANSIENT_STATUS_CODES
+    return isinstance(exc, (HTTPError, KubernetesApiException))
+
+
+class WaitRetryAfterOrExponential(tenacity.wait.wait_base):
+    """Wait strategy that honors Retry-After header on 429, else falls back to 
exponential backoff."""
+
+    def __call__(self, retry_state):

Review Comment:
   Oh, very cool to combine this in one helper!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to