jscheffl commented on code in PR #67030:
URL: https://github.com/apache/airflow/pull/67030#discussion_r3383332322


##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -824,6 +831,133 @@ def _adopt_completed_pods(self, kube_client: 
client.CoreV1Api) -> None:
                 )
             )
 
+    # Operators whose pods this cleanup manages.  EksPodOperator and
+    # GKEStartPodOperator are intentionally excluded: their pods run in
+    # external clusters where this kube_client has no authority.
+    _KPO_OPERATORS: frozenset[str] = frozenset(
+        [
+            "KubernetesPodOperator",
+            "SparkKubernetesOperator",
+        ]
+    )
+
+    @provide_session
+    def _cleanup_zombie_kpo_pods(
+        self, kube_client: client.CoreV1Api, *, session: Session = NEW_SESSION
+    ) -> None:
+        """
+        Force-delete KubernetesPodOperator pods whose TaskInstance is no 
longer active.
+
+        A pod is a zombie when either:
+
+        - No matching non-terminal TaskInstance exists in the DB (the TI 
already
+          finished or was never recorded), or
+        - The pod's ``try_number`` label is less than the active TI's current
+          ``try_number`` (the pod is a leftover from a previous retry attempt).
+
+        Force-deletion (``grace_period_seconds=0``) is used so that sidecar
+        containers that ignore SIGTERM cannot delay pod termination.
+        """
+        from sqlalchemy import or_
+
+        from airflow.models.taskinstance import TaskInstance
+        from airflow.providers.cncf.kubernetes.pod_generator import 
make_safe_label_value
+        from airflow.utils.state import State
+
+        pod_list = self._list_pods({"label_selector": 
"kubernetes_pod_operator=True"})
+        if not pod_list:
+            return
+
+        # Parse pod labels.  Values were already normalized by 
make_safe_label_value()
+        # at pod-creation time (operators/pod.py get_labels()), so they are 
ready to
+        # compare directly against normalized DB values.
+        pod_identities = []
+        for pod in pod_list:
+            labels = pod.metadata.labels or {}
+            dag_id = labels.get("dag_id")
+            task_id = labels.get("task_id")
+            run_id = labels.get("run_id")
+            if not (dag_id and task_id and run_id):
+                continue
+            map_index = int(labels.get("map_index", -1))
+            try_number = int(labels.get("try_number", 0))
+            pod_identities.append((pod, dag_id, task_id, run_id, map_index, 
try_number))
+
+        if not pod_identities:
+            return
+
+        # Single batch query: all non-terminal TIs for KPO-managed operators.
+        # We match on notin_(finished) rather than in_(unfinished) because SQL
+        # evaluates "NULL NOT IN (...)" as NULL, not TRUE — the explicit 
is_(None)
+        # arm ensures we also capture TIs with no state set yet.
+        terminal_state_values = [s.value for s in State.finished]
+        active_tis = session.execute(
+            select(
+                TaskInstance.dag_id,
+                TaskInstance.task_id,
+                TaskInstance.run_id,
+                TaskInstance.map_index,
+                TaskInstance.try_number,
+            ).where(
+                or_(
+                    TaskInstance.state.notin_(terminal_state_values),
+                    TaskInstance.state.is_(None),
+                ),
+                TaskInstance.operator.in_(self._KPO_OPERATORS),
+            )
+        ).all()

Review Comment:
   This is an very expensive operation, on one side because by this select you 
might spool-out thousands of tasks as well as the source table might contain 
millions of rows. In my view a better selection strategy should be used. The 
approach might be feasible for small installations only.



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -824,6 +831,133 @@ def _adopt_completed_pods(self, kube_client: 
client.CoreV1Api) -> None:
                 )
             )
 
+    # Operators whose pods this cleanup manages.  EksPodOperator and
+    # GKEStartPodOperator are intentionally excluded: their pods run in
+    # external clusters where this kube_client has no authority.
+    _KPO_OPERATORS: frozenset[str] = frozenset(
+        [
+            "KubernetesPodOperator",
+            "SparkKubernetesOperator",
+        ]
+    )
+
+    @provide_session
+    def _cleanup_zombie_kpo_pods(
+        self, kube_client: client.CoreV1Api, *, session: Session = NEW_SESSION
+    ) -> None:
+        """
+        Force-delete KubernetesPodOperator pods whose TaskInstance is no 
longer active.
+
+        A pod is a zombie when either:
+
+        - No matching non-terminal TaskInstance exists in the DB (the TI 
already
+          finished or was never recorded), or
+        - The pod's ``try_number`` label is less than the active TI's current
+          ``try_number`` (the pod is a leftover from a previous retry attempt).
+
+        Force-deletion (``grace_period_seconds=0``) is used so that sidecar
+        containers that ignore SIGTERM cannot delay pod termination.
+        """
+        from sqlalchemy import or_
+
+        from airflow.models.taskinstance import TaskInstance
+        from airflow.providers.cncf.kubernetes.pod_generator import 
make_safe_label_value
+        from airflow.utils.state import State
+
+        pod_list = self._list_pods({"label_selector": 
"kubernetes_pod_operator=True"})
+        if not pod_list:
+            return
+
+        # Parse pod labels.  Values were already normalized by 
make_safe_label_value()
+        # at pod-creation time (operators/pod.py get_labels()), so they are 
ready to
+        # compare directly against normalized DB values.
+        pod_identities = []
+        for pod in pod_list:
+            labels = pod.metadata.labels or {}
+            dag_id = labels.get("dag_id")
+            task_id = labels.get("task_id")
+            run_id = labels.get("run_id")
+            if not (dag_id and task_id and run_id):
+                continue
+            map_index = int(labels.get("map_index", -1))
+            try_number = int(labels.get("try_number", 0))
+            pod_identities.append((pod, dag_id, task_id, run_id, map_index, 
try_number))
+
+        if not pod_identities:
+            return
+
+        # Single batch query: all non-terminal TIs for KPO-managed operators.
+        # We match on notin_(finished) rather than in_(unfinished) because SQL
+        # evaluates "NULL NOT IN (...)" as NULL, not TRUE — the explicit 
is_(None)
+        # arm ensures we also capture TIs with no state set yet.
+        terminal_state_values = [s.value for s in State.finished]
+        active_tis = session.execute(
+            select(
+                TaskInstance.dag_id,
+                TaskInstance.task_id,
+                TaskInstance.run_id,
+                TaskInstance.map_index,
+                TaskInstance.try_number,
+            ).where(
+                or_(
+                    TaskInstance.state.notin_(terminal_state_values),
+                    TaskInstance.state.is_(None),
+                ),
+                TaskInstance.operator.in_(self._KPO_OPERATORS),

Review Comment:
   This is somehow mis-placed. if you want to clean KPO then this is not 
correct from side of KubernetesExecutor. If you use K8sExecutor you typically 
do not use KPO. And if you use another executor then the code path is not 
active.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to