sean-rose commented on code in PR #35714:
URL: https://github.com/apache/airflow/pull/35714#discussion_r2051262279
##########
tests/providers/cncf/kubernetes/operators/test_pod.py:
##########
@@ -1446,6 +1446,119 @@ def test_get_logs_but_not_for_base_container(
# check that we wait for the xcom sidecar to start before extracting
XCom
mock_await_xcom_sidecar.assert_called_once_with(pod=pod)
+ @patch(HOOK_CLASS, new=MagicMock)
+ @patch(KUB_OP_PATH.format("find_pod"))
+ def test_execute_sync_callbacks(self, find_pod_mock):
+ from airflow.providers.cncf.kubernetes.callbacks import ExecutionMode
+
+ from ..test_callbacks import MockKubernetesPodOperatorCallback,
MockWrapper
+
+ MockWrapper.reset()
+ mock_callbacks = MockWrapper.mock_callbacks
+ found_pods = [MagicMock(), MagicMock(), MagicMock()]
+ find_pod_mock.side_effect = [None] + found_pods
+
+ remote_pod_mock = MagicMock()
+ remote_pod_mock.status.phase = "Succeeded"
+ self.await_pod_mock.return_value = remote_pod_mock
+ k = KubernetesPodOperator(
+ namespace="default",
+ image="ubuntu:16.04",
+ cmds=["bash", "-cx"],
+ arguments=["echo 10"],
+ labels={"foo": "bar"},
+ name="test",
+ task_id="task",
+ do_xcom_push=False,
+ callbacks=MockKubernetesPodOperatorCallback,
+ )
+ self.run_pod(k)
+
+ # check on_sync_client_creation callback
+ mock_callbacks.on_sync_client_creation.assert_called_once()
+ assert mock_callbacks.on_sync_client_creation.call_args.kwargs ==
{"client": k.client}
+
+ # check on_pod_creation callback
+ mock_callbacks.on_pod_creation.assert_called_once()
+ assert mock_callbacks.on_pod_creation.call_args.kwargs == {
+ "client": k.client,
+ "mode": ExecutionMode.SYNC,
+ "pod": found_pods[0],
+ }
+
+ # check on_pod_starting callback
+ mock_callbacks.on_pod_starting.assert_called_once()
+ assert mock_callbacks.on_pod_starting.call_args.kwargs == {
+ "client": k.client,
+ "mode": ExecutionMode.SYNC,
+ "pod": found_pods[1],
+ }
+
+ # check on_pod_completion callback
+ mock_callbacks.on_pod_completion.assert_called_once()
+ assert mock_callbacks.on_pod_completion.call_args.kwargs == {
+ "client": k.client,
+ "mode": ExecutionMode.SYNC,
+ "pod": found_pods[2],
+ }
+
+ # check on_pod_cleanup callback
+ mock_callbacks.on_pod_cleanup.assert_called_once()
+ assert mock_callbacks.on_pod_cleanup.call_args.kwargs == {
+ "client": k.client,
+ "mode": ExecutionMode.SYNC,
+ "pod": k.pod,
+ }
+
+ @patch(HOOK_CLASS, new=MagicMock)
+ def test_execute_async_callbacks(self):
+ from airflow.providers.cncf.kubernetes.callbacks import ExecutionMode
+
+ from ..test_callbacks import MockKubernetesPodOperatorCallback,
MockWrapper
+
+ MockWrapper.reset()
+ mock_callbacks = MockWrapper.mock_callbacks
+ remote_pod_mock = MagicMock()
+ remote_pod_mock.status.phase = "Succeeded"
+ self.await_pod_mock.return_value = remote_pod_mock
+
+ k = KubernetesPodOperator(
+ namespace="default",
+ image="ubuntu:16.04",
+ cmds=["bash", "-cx"],
+ arguments=["echo 10"],
+ labels={"foo": "bar"},
+ name="test",
+ task_id="task",
+ do_xcom_push=False,
+ callbacks=MockKubernetesPodOperatorCallback,
+ )
+ k.execute_complete(
+ context=create_context(k),
+ event={
+ "status": "success",
+ "message": TEST_SUCCESS_MESSAGE,
+ "name": TEST_NAME,
+ "namespace": TEST_NAMESPACE,
+ },
+ )
+
+ # check on_operator_resuming callback
+ mock_callbacks.on_pod_cleanup.assert_called_once()
+ assert mock_callbacks.on_pod_cleanup.call_args.kwargs == {
+ "client": k.client,
+ "mode": ExecutionMode.SYNC,
+ "pod": remote_pod_mock,
+ }
Review Comment:
FYI, while working on #49441 I noticed that this code isn't testing
`on_operator_resuming` like the comment says it does.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]