cruseakshay commented on code in PR #60626:
URL: https://github.com/apache/airflow/pull/60626#discussion_r2708891039


##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py:
##########
@@ -929,11 +939,43 @@ async def get_pod(self, name: str, namespace: str) -> 
V1Pod:
                     namespace=namespace,
                 )
                 return pod
+            except async_client.ApiException as e:
+                if e.status == 404:
+                    self._handle_pod_not_found_async(name, namespace, 
phase_tracker)
+                if e.status == 403:
+                    raise KubernetesApiPermissionError("Permission denied 
(403) from Kubernetes API.") from e
+                raise KubernetesApiError from e
             except HTTPError as e:
                 if hasattr(e, "status") and e.status == 403:
                     raise KubernetesApiPermissionError("Permission denied 
(403) from Kubernetes API.") from e
                 raise KubernetesApiError from e
 
+    def _handle_pod_not_found_async(
+        self, name: str, namespace: str, phase_tracker: PodPhaseTracker | None
+    ) -> None:
+        """
+        Handle 404 error with state-aware exception raising.
+
+        :raises PodNotFoundException: If pod was previously running.
+        :raises PodPreemptedException: If pod never reached running.
+        """
+        if phase_tracker and not phase_tracker.is_safe_to_retry_on_404():
+            # Pod was running - don't retry to prevent duplicate execution
+            raise PodNotFoundException(
+                f"Pod '{name}' in namespace '{namespace}' not found. "
+                f"The pod was previously observed in Running state (last 
phase: "
+                f"{phase_tracker.last_observed_phase}), so this is treated as 
a "
+                f"terminal failure to prevent duplicate execution of 
non-idempotent workloads."
+            )

Review Comment:
   @jscheffl, thanks for the valuable feedback. 
   As task-level retries will, in turn, try pod re-creation, I am wondering if 
it is worth adding more complexity for internal retries. I will rely on your 
discretion here.
   
   If I were to continue implementing - configurable part:
   
   - modify operator to support config: user can opt-in for this change in 
behaviour
   ```python
   KubernetesPodOperator(
       task_id="my_task",
       retry_on_preemption=True, # Default False
       ...
   )
   ```
   - modify triggerer: PodPreemptedException is caught, yield status="preempted"
   - modify operator to distinguish preemption from other failures and decide 
whether to retry (based on config)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to