Copilot commented on code in PR #64962:
URL: https://github.com/apache/airflow/pull/64962#discussion_r3066483775


##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py:
##########
@@ -1025,7 +1025,7 @@ def _clean(self, event: dict[str, Any], result: dict | 
None, context: Context) -
         if event["status"] != "timeout":
             try:
                 self.pod = self.pod_manager.await_pod_completion(
-                    self.pod, istio_enabled, self.base_container_name
+                    self.pod, istio_enabled, self.base_container_name, 
self.do_xcom_push

Review Comment:
   `await_pod_completion` is now sidecar-aware for `do_xcom_push`, but the 
synchronous execution path still calls 
`self.pod_manager.await_pod_completion(self.pod, istio_enabled, 
self.base_container_name)` (see this file around lines 761-763) without passing 
`self.do_xcom_push`. In the failure scenario described (sidecar kill failing), 
sync mode can still hang waiting for a terminal pod phase. Pass 
`self.do_xcom_push` in that call as well (or use a keyword argument) to apply 
the same early-exit behavior consistently.
   ```suggestion
                       self.pod,
                       istio_enabled,
                       self.base_container_name,
                       do_xcom_push=self.do_xcom_push,, self.do_xcom_push
   ```



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -741,21 +741,33 @@ def await_container_completion(self, pod: V1Pod, 
container_name: str, polling_ti
             time.sleep(polling_time)
 
     def await_pod_completion(
-        self, pod: V1Pod, istio_enabled: bool = False, container_name: str = 
"base"
+        self,
+        pod: V1Pod,
+        istio_enabled: bool = False,
+        container_name: str = "base",
+        do_xcom_push: bool = False,
     ) -> V1Pod:
         """
         Monitor a pod and return the final state.
 
         :param istio_enabled: whether istio is enabled in the namespace
         :param pod: pod spec that will be monitored
         :param container_name: name of the container within the pod
+        :param do_xcom_push: whether to push XComs
         :return: tuple[State, str | None]
         """

Review Comment:
   Docstring/typing mismatch: `await_pod_completion` is annotated to return 
`V1Pod` and returns `remote_pod`, but the docstring still says `:return: 
tuple[State, str | None]`. Update the docstring to reflect the actual return 
value to avoid misleading callers.



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py:
##########
@@ -1110,9 +1110,12 @@ def cleanup(
         ):
             self.patch_already_checked(remote_pod, reraise=False)
 
-        failed = (pod_phase != PodPhase.SUCCEEDED and not istio_enabled) or (
-            istio_enabled and not container_is_succeeded(remote_pod, 
self.base_container_name)
-        )
+        if istio_enabled or self.do_xcom_push:
+            failed = pod_phase != PodPhase.SUCCEEDED and not 
container_is_succeeded(
+                remote_pod, self.base_container_name
+            )

Review Comment:
   The new `failed` condition for `istio_enabled or self.do_xcom_push` includes 
`pod_phase != PodPhase.SUCCEEDED` as a gate. This changes behavior from the 
previous istio logic: if the pod reaches `Succeeded` but 
`container_is_succeeded(remote_pod, self.base_container_name)` is false (e.g. 
misconfigured `base_container_name` / missing container status), the task would 
now be treated as success. For sidecar scenarios the intent is to base success 
strictly on the base container result; consider setting `failed` solely based 
on `container_is_succeeded(...)` (and treating missing status as failure).
   ```suggestion
               failed = not container_is_succeeded(remote_pod, 
self.base_container_name)
   ```



##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py:
##########
@@ -1055,6 +1055,72 @@ def 
test_pod_with_istio_delete_after_await_container_error(
         else:
             delete_pod_mock.assert_not_called()
 
+    @pytest.mark.parametrize(
+        ("base_container_exit_code", "expect_failure"),
+        [
+            pytest.param(0, False, id="base-succeeded"),
+            pytest.param(1, True, id="base-failed"),
+        ],
+    )
+    @patch(f"{POD_MANAGER_CLASS}.extract_xcom")
+    @patch(f"{POD_MANAGER_CLASS}.await_xcom_sidecar_container_start")
+    @patch(f"{POD_MANAGER_CLASS}.delete_pod")
+    @patch(f"{POD_MANAGER_CLASS}.await_pod_completion")
+    @patch(f"{KPO_MODULE}.KubernetesPodOperator.find_pod")
+    def test_cleanup_with_xcom_sidecar_uses_base_container_status(
+        self,
+        find_pod_mock,
+        await_pod_completion_mock,
+        delete_pod_mock,
+        mock_await_xcom_sidecar,
+        mock_extract_xcom,
+        base_container_exit_code,
+        expect_failure,
+    ):
+        """
+        When do_xcom_push=True, cleanup should determine success/failure based 
on
+        the base container's exit status, not the pod phase. The xcom sidecar 
may
+        keep the pod in Running phase after the base container completes.
+        """
+        mock_extract_xcom.return_value = "{}"
+        mock_await_xcom_sidecar.return_value = None
+
+        base_status = MagicMock()
+        base_status.name = "base"
+        base_status.state.terminated.exit_code = base_container_exit_code
+        base_status.state.terminated.message = "task failed" if 
base_container_exit_code else None
+
+        xcom_sidecar_status = MagicMock()
+        xcom_sidecar_status.name = "airflow-xcom-sidecar"
+        xcom_sidecar_status.state.running = True
+        xcom_sidecar_status.state.terminated = None
+
+        # Pod is still Running because xcom sidecar is alive
+        remote_pod = MagicMock()
+        remote_pod.status.phase = "Running"
+        remote_pod.status.container_statuses = [base_status, 
xcom_sidecar_status]
+        remote_pod.metadata.name = "pod-with-xcom-sidecar"
+        remote_pod.metadata.namespace = "default"
+        remote_pod.spec.containers = [MagicMock(), MagicMock()]
+
+        await_pod_completion_mock.return_value = remote_pod
+        find_pod_mock.return_value = remote_pod
+
+        k = KubernetesPodOperator(
+            task_id="task",
+            do_xcom_push=True,
+        )
+
+        context = create_context(k)
+        context["ti"].xcom_push = MagicMock()
+
+        if expect_failure:
+            self.await_pod_mock.side_effect = AirflowException("fake failure")

Review Comment:
   This test sets `self.await_pod_mock.side_effect = AirflowException("fake 
failure")`, but this test also patches `PodManager.await_pod_completion` as 
`await_pod_completion_mock` and sets its `return_value`. The side_effect 
assignment is therefore redundant/confusing (and if it ever applied, the 
exception message wouldn’t match `match="task failed"`). Consider removing it 
and relying on the base container exit code to drive the expected failure.
   ```suggestion
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to