This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 895e8ed3149 KubernetesPodOperator deferred callbacks (#47108)
895e8ed3149 is described below
commit 895e8ed3149734bf7605bcc7d759782de64d4f1c
Author: John Horan <[email protected]>
AuthorDate: Mon Oct 13 17:10:56 2025 +0100
KubernetesPodOperator deferred callbacks (#47108)
* test print
* uv test
* add defer callbacks
* cleanup
* remove redundant call
* move test to async
* add callbacks to test
* cleanup
* format
* Update pod.py
* space
* ruff fix
* merge recent changes
* fix pass args
* fix type
---
.../providers/cncf/kubernetes/operators/pod.py | 83 ++++++++++++----------
.../unit/cncf/kubernetes/operators/test_pod.py | 78 ++++++++++++++++++++
2 files changed, 125 insertions(+), 36 deletions(-)
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
index fdc1b36cf7e..f0ce7835910 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -736,20 +736,9 @@ class KubernetesPodOperator(BaseOperator):
)
finally:
pod_to_clean = self.pod or self.pod_request_obj
- self.cleanup(
- pod=pod_to_clean,
- remote_pod=self.remote_pod,
- xcom_result=result,
- context=context,
+ self.post_complete_action(
+ pod=pod_to_clean, remote_pod=self.remote_pod, context=context,
result=result
)
- for callback in self.callbacks:
- callback.on_pod_cleanup(
- pod=pod_to_clean,
- client=self.client,
- mode=ExecutionMode.SYNC,
- context=context,
- operator=self,
- )
if self.do_xcom_push:
return result
@@ -820,11 +809,20 @@ class KubernetesPodOperator(BaseOperator):
def execute_async(self, context: Context) -> None:
if self.pod_request_obj is None:
self.pod_request_obj = self.build_pod_request_obj(context)
+ for callback in self.callbacks:
+ callback.on_pod_manifest_created(
+ pod_request=self.pod_request_obj,
+ client=self.client,
+ mode=ExecutionMode.SYNC,
+ context=context,
+ operator=self,
+ )
if self.pod is None:
self.pod = self.get_or_create_pod( # must set `self.pod` for
`on_kill`
pod_request_obj=self.pod_request_obj,
context=context,
)
+
if self.callbacks:
pod = self.find_pod(self.pod.metadata.namespace, context=context)
for callback in self.callbacks:
@@ -887,6 +885,7 @@ class KubernetesPodOperator(BaseOperator):
grab the latest logs and defer back to the trigger again.
"""
self.pod = None
+ xcom_sidecar_output = None
try:
pod_name = event["name"]
pod_namespace = event["namespace"]
@@ -910,20 +909,37 @@ class KubernetesPodOperator(BaseOperator):
follow = self.logging_interval is None
last_log_time = event.get("last_log_time")
- if event["status"] in ("error", "failed", "timeout"):
- event_message = event.get("message", "No message provided")
- self.log.error(
- "Trigger emitted an %s event, failing the task: %s",
event["status"], event_message
- )
- # fetch some logs when pod is failed
+ if event["status"] in ("error", "failed", "timeout", "success"):
if self.get_logs:
self._write_logs(self.pod, follow=follow,
since_time=last_log_time)
- if self.do_xcom_push:
- _ = self.extract_xcom(pod=self.pod)
+ for callback in self.callbacks:
+ callback.on_pod_completion(
+ pod=self.pod,
+ client=self.client,
+ mode=ExecutionMode.SYNC,
+ context=context,
+ operator=self,
+ )
+ for callback in self.callbacks:
+ callback.on_pod_teardown(
+ pod=self.pod,
+ client=self.client,
+ mode=ExecutionMode.SYNC,
+ context=context,
+ operator=self,
+ )
+
+ xcom_sidecar_output = self.extract_xcom(pod=self.pod) if
self.do_xcom_push else None
+
+ if event["status"] != "success":
+ self.log.error(
+ "Trigger emitted an %s event, failing the task: %s",
event["status"], event["message"]
+ )
+ message = event.get("stack_trace", event["message"])
+ raise AirflowException(message)
- message = event.get("stack_trace", event["message"])
- raise AirflowException(message)
+ return xcom_sidecar_output
if event["status"] == "running":
if self.get_logs:
@@ -941,22 +957,12 @@ class KubernetesPodOperator(BaseOperator):
self.invoke_defer_method(pod_log_status.last_log_time)
else:
self.invoke_defer_method()
-
- elif event["status"] == "success":
- # fetch some logs when pod is executed successfully
- if self.get_logs:
- self._write_logs(self.pod, follow=follow,
since_time=last_log_time)
-
- if self.do_xcom_push:
- xcom_sidecar_output = self.extract_xcom(pod=self.pod)
- return xcom_sidecar_output
- return
except TaskDeferred:
raise
finally:
- self._clean(event, context)
+ self._clean(event=event, context=context,
result=xcom_sidecar_output)
- def _clean(self, event: dict[str, Any], context: Context) -> None:
+ def _clean(self, event: dict[str, Any], result: dict | None, context:
Context) -> None:
if event["status"] == "running":
return
istio_enabled = self.is_istio_enabled(self.pod)
@@ -980,6 +986,7 @@ class KubernetesPodOperator(BaseOperator):
pod=self.pod,
remote_pod=self.pod,
context=context,
+ result=result,
)
def _write_logs(self, pod: k8s.V1Pod, follow: bool = False, since_time:
DateTime | None = None) -> None:
@@ -1009,11 +1016,15 @@ class KubernetesPodOperator(BaseOperator):
e if not isinstance(e, ApiException) else e.reason,
)
- def post_complete_action(self, *, pod, remote_pod, context: Context,
**kwargs) -> None:
+ def post_complete_action(
+ self, *, pod: k8s.V1Pod, remote_pod: k8s.V1Pod, context: Context,
result: dict | None, **kwargs
+ ) -> None:
"""Actions that must be done after operator finishes logic of the
deferrable_execution."""
self.cleanup(
pod=pod,
remote_pod=remote_pod,
+ xcom_result=result,
+ context=context,
)
for callback in self.callbacks:
callback.on_pod_cleanup(
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
index fd59e0f92f2..abb503c8aa4 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
@@ -2665,6 +2665,84 @@ class TestKubernetesPodOperatorAsync:
},
)
+ @patch(HOOK_CLASS)
+ def test_execute_async_callbacks(self, mocked_hook):
+ from airflow.providers.cncf.kubernetes.callbacks import ExecutionMode
+
+ from unit.cncf.kubernetes.test_callbacks import (
+ MockKubernetesPodOperatorCallback,
+ MockWrapper,
+ )
+
+ MockWrapper.reset()
+ mock_callbacks = MockWrapper.mock_callbacks
+ remote_pod_mock = MagicMock()
+ remote_pod_mock.status.phase = "Succeeded"
+ self.await_pod_mock.return_value = remote_pod_mock
+ mocked_hook.return_value.get_pod.return_value = remote_pod_mock
+
+ k = KubernetesPodOperator(
+ namespace="default",
+ image="ubuntu:16.04",
+ cmds=["bash", "-cx"],
+ arguments=["echo 10"],
+ labels={"foo": "bar"},
+ name="test",
+ task_id="task",
+ do_xcom_push=False,
+ callbacks=MockKubernetesPodOperatorCallback,
+ )
+ context = create_context(k)
+
+ callback_event = {
+ "status": "success",
+ "message": TEST_SUCCESS_MESSAGE,
+ "name": TEST_NAME,
+ "namespace": TEST_NAMESPACE,
+ }
+ k.trigger_reentry(context=context, event=callback_event)
+
+ # check on_operator_resuming callback
+ mock_callbacks.on_operator_resuming.assert_called_once()
+ assert mock_callbacks.on_operator_resuming.call_args.kwargs == {
+ "client": k.client,
+ "mode": ExecutionMode.SYNC,
+ "pod": remote_pod_mock,
+ "operator": k,
+ "context": context,
+ "event": callback_event,
+ }
+
+ # check on_pod_cleanup callback
+ mock_callbacks.on_pod_cleanup.assert_called_once()
+ assert mock_callbacks.on_pod_cleanup.call_args.kwargs == {
+ "client": k.client,
+ "mode": ExecutionMode.SYNC,
+ "pod": remote_pod_mock,
+ "operator": k,
+ "context": context,
+ }
+
+ # check on_pod_completion callback
+ mock_callbacks.on_pod_completion.assert_called_once()
+ assert mock_callbacks.on_pod_completion.call_args.kwargs == {
+ "client": k.client,
+ "mode": ExecutionMode.SYNC,
+ "pod": remote_pod_mock,
+ "operator": k,
+ "context": context,
+ }
+
+ # check on_pod_teardown callback
+ mock_callbacks.on_pod_teardown.assert_called_once()
+ assert mock_callbacks.on_pod_teardown.call_args.kwargs == {
+ "client": k.client,
+ "mode": ExecutionMode.SYNC,
+ "pod": remote_pod_mock,
+ "operator": k,
+ "context": context,
+ }
+
@pytest.mark.parametrize("do_xcom_push", [True, False])
@patch(KUB_OP_PATH.format("extract_xcom"))