SameerMesiah97 commented on code in PR #63915:
URL: https://github.com/apache/airflow/pull/63915#discussion_r2990431777
##########
providers/google/tests/unit/google/cloud/triggers/test_kubernetes_engine.py:
##########
@@ -589,3 +589,74 @@ def test_hook(self, mock_hook, job_trigger):
impersonation_chain=IMPERSONATION_CHAIN,
)
assert hook_actual == hook_expected
+
+ @pytest.mark.asyncio
+ @mock.patch(f"{GKE_TRIGGERS_PATH}.ProvidersManager")
+ @mock.patch(f"{TRIGGER_GKE_JOB_PATH}.pod_manager",
new_callable=mock.PropertyMock)
+ @mock.patch(f"{TRIGGER_GKE_JOB_PATH}.hook")
+ async def
test_run_do_xcom_push_uses_succeeded_retry_pod_not_original_failed_pod(
+ self, mock_hook, mock_pod_manager_prop, mock_providers_manager
+ ):
+ """When a job has a failed pod and a succeeded retry pod, use the
succeeded pod for XCom."""
+ mock_providers_manager.return_value.providers = {
+ "apache-airflow-providers-cncf-kubernetes": mock.MagicMock(
+ data={"package-name":
"apache-airflow-providers-cncf-kubernetes"},
+ version="8.4.1",
+ )
+ }
+
+ failed_pod = mock.MagicMock()
+ failed_pod.metadata.name = "original-failed-pod"
+ failed_pod.status.phase = "Failed"
+
+ succeeded_pod = mock.MagicMock()
+ succeeded_pod.metadata.name = "retry-succeeded-pod"
+ succeeded_pod.status.phase = "Succeeded"
+
+ mock_hook.list_pods = mock.AsyncMock(return_value=[failed_pod,
succeeded_pod])
+ mock_hook.wait_until_container_complete = mock.AsyncMock()
+ mock_hook.wait_until_container_started = mock.AsyncMock()
+ mock_hook.wait_until_job_complete = mock.AsyncMock()
+ mock_job = mock.MagicMock()
+ mock_job.metadata.name = JOB_NAME
+ mock_job.metadata.namespace = NAMESPACE
+ mock_job.to_dict.return_value = {"job": "dict"}
+ mock_hook.wait_until_job_complete.return_value = mock_job
+ mock_hook.is_job_failed = mock.MagicMock(return_value=False)
+
+ mock_pod_manager = mock.MagicMock()
+ mock_pod_manager.extract_xcom.return_value = {"xcom": "result"}
+ mock_pod_manager_prop.return_value = mock_pod_manager
+
+ def run_in_executor(_executor, func, *args):
+ result = func(*args)
+ loop = asyncio.get_running_loop()
+ f = loop.create_future()
+ f.set_result(result)
+ return f
Review Comment:
I don’t think we need to mock the executor loop here. `extract_xcom` is
already mocked and lightweight, so letting it run via `run_in_executor` should
be fast and deterministic.
##########
providers/google/tests/unit/google/cloud/triggers/test_kubernetes_engine.py:
##########
@@ -589,3 +589,74 @@ def test_hook(self, mock_hook, job_trigger):
impersonation_chain=IMPERSONATION_CHAIN,
)
assert hook_actual == hook_expected
+
+ @pytest.mark.asyncio
+ @mock.patch(f"{GKE_TRIGGERS_PATH}.ProvidersManager")
+ @mock.patch(f"{TRIGGER_GKE_JOB_PATH}.pod_manager",
new_callable=mock.PropertyMock)
+ @mock.patch(f"{TRIGGER_GKE_JOB_PATH}.hook")
+ async def
test_run_do_xcom_push_uses_succeeded_retry_pod_not_original_failed_pod(
+ self, mock_hook, mock_pod_manager_prop, mock_providers_manager
+ ):
+ """When a job has a failed pod and a succeeded retry pod, use the
succeeded pod for XCom."""
+ mock_providers_manager.return_value.providers = {
+ "apache-airflow-providers-cncf-kubernetes": mock.MagicMock(
+ data={"package-name":
"apache-airflow-providers-cncf-kubernetes"},
+ version="8.4.1",
+ )
+ }
+
+ failed_pod = mock.MagicMock()
+ failed_pod.metadata.name = "original-failed-pod"
+ failed_pod.status.phase = "Failed"
+
+ succeeded_pod = mock.MagicMock()
+ succeeded_pod.metadata.name = "retry-succeeded-pod"
+ succeeded_pod.status.phase = "Succeeded"
+
+ mock_hook.list_pods = mock.AsyncMock(return_value=[failed_pod,
succeeded_pod])
+ mock_hook.wait_until_container_complete = mock.AsyncMock()
+ mock_hook.wait_until_container_started = mock.AsyncMock()
+ mock_hook.wait_until_job_complete = mock.AsyncMock()
Review Comment:
Maybe you could a blank line here? Right now, it's a bit too dense.
##########
providers/google/src/airflow/providers/google/cloud/triggers/kubernetes_engine.py:
##########
@@ -333,9 +333,27 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
f"package
{kubernetes_provider_name}=={kubernetes_provider_version} which doesn't "
f"support this feature. Please upgrade it to version
higher than or equal to {min_version}."
)
+ # Re-discover pods for this job to handle retry scenarios
+ # where the original pod failed and a new pod was created
+ label_selector = f"job-name={self.job_name}"
+ current_pods = await self.hook.list_pods(
+ namespace=self.pod_namespace,
+ label_selector=label_selector,
+ )
+ succeeded_pods = [p for p in current_pods if p.status and
p.status.phase == "Succeeded"]
+ if not succeeded_pods:
+ self.log.warning(
+ "No succeeded pods found for job %s, falling back to
original pod list",
+ self.job_name,
+ )
+ succeeded_pods = [
+ await self.hook.get_pod(name=pod_name,
namespace=self.pod_namespace)
Review Comment:
Why are you doing an API call for every pod here when you have called
`list_pods` already? I would suggest something like this instead
```
succeeded_pods = [
p for p in current_pods
if p.metadata.name in self.pod_names
]
```
##########
providers/google/tests/unit/google/cloud/triggers/test_kubernetes_engine.py:
##########
@@ -589,3 +589,74 @@ def test_hook(self, mock_hook, job_trigger):
impersonation_chain=IMPERSONATION_CHAIN,
)
assert hook_actual == hook_expected
+
+ @pytest.mark.asyncio
+ @mock.patch(f"{GKE_TRIGGERS_PATH}.ProvidersManager")
+ @mock.patch(f"{TRIGGER_GKE_JOB_PATH}.pod_manager",
new_callable=mock.PropertyMock)
+ @mock.patch(f"{TRIGGER_GKE_JOB_PATH}.hook")
+ async def
test_run_do_xcom_push_uses_succeeded_retry_pod_not_original_failed_pod(
+ self, mock_hook, mock_pod_manager_prop, mock_providers_manager
+ ):
+ """When a job has a failed pod and a succeeded retry pod, use the
succeeded pod for XCom."""
+ mock_providers_manager.return_value.providers = {
+ "apache-airflow-providers-cncf-kubernetes": mock.MagicMock(
+ data={"package-name":
"apache-airflow-providers-cncf-kubernetes"},
+ version="8.4.1",
+ )
+ }
+
+ failed_pod = mock.MagicMock()
+ failed_pod.metadata.name = "original-failed-pod"
+ failed_pod.status.phase = "Failed"
+
+ succeeded_pod = mock.MagicMock()
+ succeeded_pod.metadata.name = "retry-succeeded-pod"
+ succeeded_pod.status.phase = "Succeeded"
+
+ mock_hook.list_pods = mock.AsyncMock(return_value=[failed_pod,
succeeded_pod])
+ mock_hook.wait_until_container_complete = mock.AsyncMock()
+ mock_hook.wait_until_container_started = mock.AsyncMock()
+ mock_hook.wait_until_job_complete = mock.AsyncMock()
+ mock_job = mock.MagicMock()
+ mock_job.metadata.name = JOB_NAME
+ mock_job.metadata.namespace = NAMESPACE
+ mock_job.to_dict.return_value = {"job": "dict"}
+ mock_hook.wait_until_job_complete.return_value = mock_job
+ mock_hook.is_job_failed = mock.MagicMock(return_value=False)
+
+ mock_pod_manager = mock.MagicMock()
+ mock_pod_manager.extract_xcom.return_value = {"xcom": "result"}
+ mock_pod_manager_prop.return_value = mock_pod_manager
+
+ def run_in_executor(_executor, func, *args):
+ result = func(*args)
+ loop = asyncio.get_running_loop()
+ f = loop.create_future()
+ f.set_result(result)
+ return f
+
+ with mock.patch.object(asyncio.get_running_loop(), "run_in_executor",
side_effect=run_in_executor):
+ trigger = GKEJobTrigger(
+ cluster_url=CLUSTER_URL,
+ ssl_ca_cert=SSL_CA_CERT,
+ job_name=JOB_NAME,
+ job_namespace=NAMESPACE,
+ pod_names=[POD_NAME],
+ pod_namespace=NAMESPACE,
+ base_container_name=BASE_CONTAINER_NAME,
+ gcp_conn_id=GCP_CONN_ID,
+ poll_interval=POLL_INTERVAL,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ get_logs=GET_LOGS,
+ do_xcom_push=True,
+ )
+ event = await trigger.run().asend(None)
+
+ mock_hook.list_pods.assert_called_once_with(
+ namespace=NAMESPACE,
+ label_selector=f"job-name={JOB_NAME}",
+ )
+ assert mock_pod_manager.extract_xcom.call_count == 1
+ assert mock_pod_manager.extract_xcom.call_args[0][0] is succeeded_pod
+ assert mock_pod_manager.extract_xcom.call_args[0][0].metadata.name ==
"retry-succeeded-pod"
+ assert event.payload["xcom_result"] == [{"xcom": "result"}]
Review Comment:
I think these asserts can be simplified to:
```
mock_pod_manager.extract_xcom.assert_called_once_with(succeeded_pod)
assert event.payload["xcom_result"] == [{"xcom": "result"}]
```
The 2 asserts inthe middle are duplicative and can be covered by
`assert_called_once_with`.
Also, looking at the bug report, I think we should add asserts for the
container wait calls:
```
mock_hook.wait_until_container_complete.assert_called_once_with(
name="retry-succeeded-pod",
namespace=NAMESPACE,
container_name=BASE_CONTAINER_NAME,
poll_interval=POLL_INTERVAL,
)
mock_hook.wait_until_container_started.assert_called_once_with(
name="retry-succeeded-pod",
namespace=NAMESPACE,
container_name=PodDefaults.SIDECAR_CONTAINER_NAME,
poll_interval=POLL_INTERVAL,
)
```
The crux of the issue is that the trigger was waiting on the old pod, which
eventually expired and caused the deadline to be exceeded. These asserts make
sure we are consistently operating on the correct pod, not just during XCom
extraction.
##########
providers/google/tests/unit/google/cloud/triggers/test_kubernetes_engine.py:
##########
@@ -589,3 +589,74 @@ def test_hook(self, mock_hook, job_trigger):
impersonation_chain=IMPERSONATION_CHAIN,
)
assert hook_actual == hook_expected
+
+ @pytest.mark.asyncio
+ @mock.patch(f"{GKE_TRIGGERS_PATH}.ProvidersManager")
+ @mock.patch(f"{TRIGGER_GKE_JOB_PATH}.pod_manager",
new_callable=mock.PropertyMock)
+ @mock.patch(f"{TRIGGER_GKE_JOB_PATH}.hook")
+ async def
test_run_do_xcom_push_uses_succeeded_retry_pod_not_original_failed_pod(
+ self, mock_hook, mock_pod_manager_prop, mock_providers_manager
+ ):
+ """When a job has a failed pod and a succeeded retry pod, use the
succeeded pod for XCom."""
+ mock_providers_manager.return_value.providers = {
+ "apache-airflow-providers-cncf-kubernetes": mock.MagicMock(
+ data={"package-name":
"apache-airflow-providers-cncf-kubernetes"},
+ version="8.4.1",
+ )
+ }
+
+ failed_pod = mock.MagicMock()
+ failed_pod.metadata.name = "original-failed-pod"
+ failed_pod.status.phase = "Failed"
+
+ succeeded_pod = mock.MagicMock()
+ succeeded_pod.metadata.name = "retry-succeeded-pod"
+ succeeded_pod.status.phase = "Succeeded"
+
+ mock_hook.list_pods = mock.AsyncMock(return_value=[failed_pod,
succeeded_pod])
+ mock_hook.wait_until_container_complete = mock.AsyncMock()
+ mock_hook.wait_until_container_started = mock.AsyncMock()
+ mock_hook.wait_until_job_complete = mock.AsyncMock()
+ mock_job = mock.MagicMock()
+ mock_job.metadata.name = JOB_NAME
+ mock_job.metadata.namespace = NAMESPACE
+ mock_job.to_dict.return_value = {"job": "dict"}
+ mock_hook.wait_until_job_complete.return_value = mock_job
+ mock_hook.is_job_failed = mock.MagicMock(return_value=False)
+
+ mock_pod_manager = mock.MagicMock()
+ mock_pod_manager.extract_xcom.return_value = {"xcom": "result"}
+ mock_pod_manager_prop.return_value = mock_pod_manager
+
+ def run_in_executor(_executor, func, *args):
+ result = func(*args)
+ loop = asyncio.get_running_loop()
+ f = loop.create_future()
+ f.set_result(result)
+ return f
+
+ with mock.patch.object(asyncio.get_running_loop(), "run_in_executor",
side_effect=run_in_executor):
+ trigger = GKEJobTrigger(
+ cluster_url=CLUSTER_URL,
+ ssl_ca_cert=SSL_CA_CERT,
+ job_name=JOB_NAME,
+ job_namespace=NAMESPACE,
+ pod_names=[POD_NAME],
+ pod_namespace=NAMESPACE,
+ base_container_name=BASE_CONTAINER_NAME,
+ gcp_conn_id=GCP_CONN_ID,
+ poll_interval=POLL_INTERVAL,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ get_logs=GET_LOGS,
+ do_xcom_push=True,
+ )
Review Comment:
I don't see the need to re-initialize `GKEJobTrigger` here when you can use
the `job_trigger` fixture instead and set `job_trigger.do_xcom_push` to True.
##########
providers/google/tests/unit/google/cloud/triggers/test_kubernetes_engine.py:
##########
@@ -589,3 +589,74 @@ def test_hook(self, mock_hook, job_trigger):
impersonation_chain=IMPERSONATION_CHAIN,
)
assert hook_actual == hook_expected
+
+ @pytest.mark.asyncio
+ @mock.patch(f"{GKE_TRIGGERS_PATH}.ProvidersManager")
+ @mock.patch(f"{TRIGGER_GKE_JOB_PATH}.pod_manager",
new_callable=mock.PropertyMock)
+ @mock.patch(f"{TRIGGER_GKE_JOB_PATH}.hook")
+ async def
test_run_do_xcom_push_uses_succeeded_retry_pod_not_original_failed_pod(
+ self, mock_hook, mock_pod_manager_prop, mock_providers_manager
+ ):
+ """When a job has a failed pod and a succeeded retry pod, use the
succeeded pod for XCom."""
+ mock_providers_manager.return_value.providers = {
+ "apache-airflow-providers-cncf-kubernetes": mock.MagicMock(
+ data={"package-name":
"apache-airflow-providers-cncf-kubernetes"},
+ version="8.4.1",
+ )
+ }
+
+ failed_pod = mock.MagicMock()
+ failed_pod.metadata.name = "original-failed-pod"
+ failed_pod.status.phase = "Failed"
+
+ succeeded_pod = mock.MagicMock()
+ succeeded_pod.metadata.name = "retry-succeeded-pod"
+ succeeded_pod.status.phase = "Succeeded"
+
+ mock_hook.list_pods = mock.AsyncMock(return_value=[failed_pod,
succeeded_pod])
+ mock_hook.wait_until_container_complete = mock.AsyncMock()
+ mock_hook.wait_until_container_started = mock.AsyncMock()
+ mock_hook.wait_until_job_complete = mock.AsyncMock()
+ mock_job = mock.MagicMock()
+ mock_job.metadata.name = JOB_NAME
+ mock_job.metadata.namespace = NAMESPACE
+ mock_job.to_dict.return_value = {"job": "dict"}
Review Comment:
A blank line here too as well. This is for readability.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]