This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 895e8ed3149 KubernetesPodOperator deferred callbacks (#47108)
895e8ed3149 is described below

commit 895e8ed3149734bf7605bcc7d759782de64d4f1c
Author: John Horan <[email protected]>
AuthorDate: Mon Oct 13 17:10:56 2025 +0100

    KubernetesPodOperator deferred callbacks (#47108)
    
    * test print
    
    * uv test
    
    * add defer callbacks
    
    * cleanup
    
    * remove redundant call
    
    * move test to async
    
    * add callbacks to test
    
    * cleanup
    
    * format
    
    * Update pod.py
    
    * space
    
    * ruff fix
    
    * merge recent changes
    
    * fix pass args
    
    * fix type
---
 .../providers/cncf/kubernetes/operators/pod.py     | 83 ++++++++++++----------
 .../unit/cncf/kubernetes/operators/test_pod.py     | 78 ++++++++++++++++++++
 2 files changed, 125 insertions(+), 36 deletions(-)

diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
index fdc1b36cf7e..f0ce7835910 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -736,20 +736,9 @@ class KubernetesPodOperator(BaseOperator):
             )
         finally:
             pod_to_clean = self.pod or self.pod_request_obj
-            self.cleanup(
-                pod=pod_to_clean,
-                remote_pod=self.remote_pod,
-                xcom_result=result,
-                context=context,
+            self.post_complete_action(
+                pod=pod_to_clean, remote_pod=self.remote_pod, context=context, 
result=result
             )
-            for callback in self.callbacks:
-                callback.on_pod_cleanup(
-                    pod=pod_to_clean,
-                    client=self.client,
-                    mode=ExecutionMode.SYNC,
-                    context=context,
-                    operator=self,
-                )
 
         if self.do_xcom_push:
             return result
@@ -820,11 +809,20 @@ class KubernetesPodOperator(BaseOperator):
     def execute_async(self, context: Context) -> None:
         if self.pod_request_obj is None:
             self.pod_request_obj = self.build_pod_request_obj(context)
+        for callback in self.callbacks:
+            callback.on_pod_manifest_created(
+                pod_request=self.pod_request_obj,
+                client=self.client,
+                mode=ExecutionMode.SYNC,
+                context=context,
+                operator=self,
+            )
         if self.pod is None:
             self.pod = self.get_or_create_pod(  # must set `self.pod` for 
`on_kill`
                 pod_request_obj=self.pod_request_obj,
                 context=context,
             )
+
         if self.callbacks:
             pod = self.find_pod(self.pod.metadata.namespace, context=context)
             for callback in self.callbacks:
@@ -887,6 +885,7 @@ class KubernetesPodOperator(BaseOperator):
         grab the latest logs and defer back to the trigger again.
         """
         self.pod = None
+        xcom_sidecar_output = None
         try:
             pod_name = event["name"]
             pod_namespace = event["namespace"]
@@ -910,20 +909,37 @@ class KubernetesPodOperator(BaseOperator):
             follow = self.logging_interval is None
             last_log_time = event.get("last_log_time")
 
-            if event["status"] in ("error", "failed", "timeout"):
-                event_message = event.get("message", "No message provided")
-                self.log.error(
-                    "Trigger emitted an %s event, failing the task: %s", 
event["status"], event_message
-                )
-                # fetch some logs when pod is failed
+            if event["status"] in ("error", "failed", "timeout", "success"):
                 if self.get_logs:
                     self._write_logs(self.pod, follow=follow, 
since_time=last_log_time)
 
-                if self.do_xcom_push:
-                    _ = self.extract_xcom(pod=self.pod)
+                for callback in self.callbacks:
+                    callback.on_pod_completion(
+                        pod=self.pod,
+                        client=self.client,
+                        mode=ExecutionMode.SYNC,
+                        context=context,
+                        operator=self,
+                    )
+                for callback in self.callbacks:
+                    callback.on_pod_teardown(
+                        pod=self.pod,
+                        client=self.client,
+                        mode=ExecutionMode.SYNC,
+                        context=context,
+                        operator=self,
+                    )
+
+                xcom_sidecar_output = self.extract_xcom(pod=self.pod) if 
self.do_xcom_push else None
+
+                if event["status"] != "success":
+                    self.log.error(
+                        "Trigger emitted an %s event, failing the task: %s", 
event["status"], event["message"]
+                    )
+                    message = event.get("stack_trace", event["message"])
+                    raise AirflowException(message)
 
-                message = event.get("stack_trace", event["message"])
-                raise AirflowException(message)
+                return xcom_sidecar_output
 
             if event["status"] == "running":
                 if self.get_logs:
@@ -941,22 +957,12 @@ class KubernetesPodOperator(BaseOperator):
                     self.invoke_defer_method(pod_log_status.last_log_time)
                 else:
                     self.invoke_defer_method()
-
-            elif event["status"] == "success":
-                # fetch some logs when pod is executed successfully
-                if self.get_logs:
-                    self._write_logs(self.pod, follow=follow, 
since_time=last_log_time)
-
-                if self.do_xcom_push:
-                    xcom_sidecar_output = self.extract_xcom(pod=self.pod)
-                    return xcom_sidecar_output
-                return
         except TaskDeferred:
             raise
         finally:
-            self._clean(event, context)
+            self._clean(event=event, context=context, 
result=xcom_sidecar_output)
 
-    def _clean(self, event: dict[str, Any], context: Context) -> None:
+    def _clean(self, event: dict[str, Any], result: dict | None, context: 
Context) -> None:
         if event["status"] == "running":
             return
         istio_enabled = self.is_istio_enabled(self.pod)
@@ -980,6 +986,7 @@ class KubernetesPodOperator(BaseOperator):
                 pod=self.pod,
                 remote_pod=self.pod,
                 context=context,
+                result=result,
             )
 
     def _write_logs(self, pod: k8s.V1Pod, follow: bool = False, since_time: 
DateTime | None = None) -> None:
@@ -1009,11 +1016,15 @@ class KubernetesPodOperator(BaseOperator):
                 e if not isinstance(e, ApiException) else e.reason,
             )
 
-    def post_complete_action(self, *, pod, remote_pod, context: Context, 
**kwargs) -> None:
+    def post_complete_action(
+        self, *, pod: k8s.V1Pod, remote_pod: k8s.V1Pod, context: Context, 
result: dict | None, **kwargs
+    ) -> None:
         """Actions that must be done after operator finishes logic of the 
deferrable_execution."""
         self.cleanup(
             pod=pod,
             remote_pod=remote_pod,
+            xcom_result=result,
+            context=context,
         )
         for callback in self.callbacks:
             callback.on_pod_cleanup(
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
index fd59e0f92f2..abb503c8aa4 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
@@ -2665,6 +2665,84 @@ class TestKubernetesPodOperatorAsync:
                 },
             )
 
+    @patch(HOOK_CLASS)
+    def test_execute_async_callbacks(self, mocked_hook):
+        from airflow.providers.cncf.kubernetes.callbacks import ExecutionMode
+
+        from unit.cncf.kubernetes.test_callbacks import (
+            MockKubernetesPodOperatorCallback,
+            MockWrapper,
+        )
+
+        MockWrapper.reset()
+        mock_callbacks = MockWrapper.mock_callbacks
+        remote_pod_mock = MagicMock()
+        remote_pod_mock.status.phase = "Succeeded"
+        self.await_pod_mock.return_value = remote_pod_mock
+        mocked_hook.return_value.get_pod.return_value = remote_pod_mock
+
+        k = KubernetesPodOperator(
+            namespace="default",
+            image="ubuntu:16.04",
+            cmds=["bash", "-cx"],
+            arguments=["echo 10"],
+            labels={"foo": "bar"},
+            name="test",
+            task_id="task",
+            do_xcom_push=False,
+            callbacks=MockKubernetesPodOperatorCallback,
+        )
+        context = create_context(k)
+
+        callback_event = {
+            "status": "success",
+            "message": TEST_SUCCESS_MESSAGE,
+            "name": TEST_NAME,
+            "namespace": TEST_NAMESPACE,
+        }
+        k.trigger_reentry(context=context, event=callback_event)
+
+        # check on_operator_resuming callback
+        mock_callbacks.on_operator_resuming.assert_called_once()
+        assert mock_callbacks.on_operator_resuming.call_args.kwargs == {
+            "client": k.client,
+            "mode": ExecutionMode.SYNC,
+            "pod": remote_pod_mock,
+            "operator": k,
+            "context": context,
+            "event": callback_event,
+        }
+
+        # check on_pod_cleanup callback
+        mock_callbacks.on_pod_cleanup.assert_called_once()
+        assert mock_callbacks.on_pod_cleanup.call_args.kwargs == {
+            "client": k.client,
+            "mode": ExecutionMode.SYNC,
+            "pod": remote_pod_mock,
+            "operator": k,
+            "context": context,
+        }
+
+        # check on_pod_completion callback
+        mock_callbacks.on_pod_completion.assert_called_once()
+        assert mock_callbacks.on_pod_completion.call_args.kwargs == {
+            "client": k.client,
+            "mode": ExecutionMode.SYNC,
+            "pod": remote_pod_mock,
+            "operator": k,
+            "context": context,
+        }
+
+        # check on_pod_teardown callback
+        mock_callbacks.on_pod_teardown.assert_called_once()
+        assert mock_callbacks.on_pod_teardown.call_args.kwargs == {
+            "client": k.client,
+            "mode": ExecutionMode.SYNC,
+            "pod": remote_pod_mock,
+            "operator": k,
+            "context": context,
+        }
+
 
 @pytest.mark.parametrize("do_xcom_push", [True, False])
 @patch(KUB_OP_PATH.format("extract_xcom"))

Reply via email to