jscheffl commented on code in PR #60626:
URL: https://github.com/apache/airflow/pull/60626#discussion_r2699936132


##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -811,13 +832,54 @@ def read_pod_events(self, pod: V1Pod, resource_version: 
str | None = None) -> Co
             raise KubernetesApiException(f"There was an error reading the 
kubernetes API: {e}")
 
     @generic_api_retry
-    def read_pod(self, pod: V1Pod) -> V1Pod:
-        """Read POD information."""
+    def read_pod(self, pod: V1Pod, *, phase_tracker: PodPhaseTracker | None = 
None) -> V1Pod:
+        """
+        Read POD information.
+
+        :param pod: The pod object to read.
+        :param phase_tracker: Optional tracker to monitor pod lifecycle state.
+            When provided, enables state-aware retry behavior on 404 errors
+            to handle pod preemption during node bootstrap.
+        """
         try:
-            return self._client.read_namespaced_pod(pod.metadata.name, 
pod.metadata.namespace)
+            result = self._client.read_namespaced_pod(pod.metadata.name, 
pod.metadata.namespace)
+            if phase_tracker:
+                phase_tracker.update_from_pod(result)
+            return result
+        except ApiException as e:
+            if e.status == 404:
+                self._handle_pod_not_found(pod, phase_tracker)
+            raise
         except HTTPError as e:
             raise KubernetesApiException(f"There was an error reading the 
kubernetes API: {e}")
 
+    def _handle_pod_not_found(self, pod: V1Pod, phase_tracker: PodPhaseTracker 
| None) -> None:

Review Comment:
   The code here is redundant (and some more) here in the utility and the hook. 
please consolidate to one place and copy only.



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -208,8 +209,28 @@ class PodLaunchTimeoutException(AirflowException):
     """When pod does not leave the ``Pending`` phase within specified 
timeout."""
 
 
-class PodNotFoundException(AirflowException):
-    """Expected pod does not exist in kube-api."""
+@dataclass
+class PodPhaseTracker:
+    """
+    Track whether a pod ever reached Running state for safe 404 retry handling.
+
+    If pod never reached Running: safe to retry (likely preemption).
+    If pod was Running: NOT safe to retry (may cause duplicate execution).
+    """
+
+    ever_reached_running: bool = False
+    last_observed_phase: str | None = None
+
+    def update_from_pod(self, pod: V1Pod) -> None:
+        """Update tracker state from observed pod status."""
+        if pod.status and pod.status.phase:
+            self.last_observed_phase = pod.status.phase
+            if pod.status.phase == PodPhase.RUNNING:
+                self.ever_reached_running = True

Review Comment:
   How about the race condition if the Pod was so fast that it is already 
failed or completed when checking? I think it might be a bit dangerous checking 
just for running.



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py:
##########
@@ -929,11 +939,43 @@ async def get_pod(self, name: str, namespace: str) -> 
V1Pod:
                     namespace=namespace,
                 )
                 return pod
+            except async_client.ApiException as e:
+                if e.status == 404:
+                    self._handle_pod_not_found_async(name, namespace, 
phase_tracker)
+                if e.status == 403:
+                    raise KubernetesApiPermissionError("Permission denied 
(403) from Kubernetes API.") from e
+                raise KubernetesApiError from e
             except HTTPError as e:
                 if hasattr(e, "status") and e.status == 403:
                     raise KubernetesApiPermissionError("Permission denied 
(403) from Kubernetes API.") from e
                 raise KubernetesApiError from e
 
+    def _handle_pod_not_found_async(
+        self, name: str, namespace: str, phase_tracker: PodPhaseTracker | None
+    ) -> None:
+        """
+        Handle 404 error with state-aware exception raising.
+
+        :raises PodNotFoundException: If pod was previously running.
+        :raises PodPreemptedException: If pod never reached running.
+        """
+        if phase_tracker and not phase_tracker.is_safe_to_retry_on_404():
+            # Pod was running - don't retry to prevent duplicate execution
+            raise PodNotFoundException(
+                f"Pod '{name}' in namespace '{namespace}' not found. "
+                f"The pod was previously observed in Running state (last 
phase: "
+                f"{phase_tracker.last_observed_phase}), so this is treated as 
a "
+                f"terminal failure to prevent duplicate execution of 
non-idempotent workloads."
+            )

Review Comment:
   That is not clear to me. There might be cases where a logic is only allowed 
to start only once and then a broken state must not retried - and there are 
cases where a failed execution still should retry.
   
   I understand that you want to handle it differently but the exceptions are 
just different exceptions, but it is not changing anything in retry handling. 
Also the text is mis-leading because besides the exception types the "normal" 
retry would happen if configured?
   
   For me this PR only makes sense if a configurable setting based on Exception 
type skips or makes the retry based on preemption.



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py:
##########
@@ -64,7 +65,10 @@ class KubernetesApiException(AirflowException):
 
 
 def _should_retry_api(exc: BaseException) -> bool:
-    """Retry on selected ApiException status codes, plus plain HTTP/timeout 
errors."""
+    """Retry on transient API errors, HTTP errors, and 
PodPreemptedException."""
+    if isinstance(exc, PodPreemptedException):

Review Comment:
   This is wrong. The function is used to retry calling API if e.g. a HTTP 500 
or 403 is returned - so on call level. I assume you wanted to make the Pod 
execution retryable - but this is not defined with this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to