holmuk commented on code in PR #65058:
URL: https://github.com/apache/airflow/pull/65058#discussion_r3071629269
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py:
##########
@@ -156,6 +188,233 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
}
)
+ async def _get_pod_or_none_for_xcom(self, pod_name: str) -> V1Pod | None:
+ try:
+ return await self.hook.get_pod(name=pod_name,
namespace=self.pod_namespace)
+ except Exception as err:
+ if self._is_not_found_error(err):
+ self.log.info("Pod '%s' no longer exists; skipping XCom
extraction.", pod_name)
+ return None
+ raise
+
+ async def _wait_until_container_complete_or_job_done(
+ self, pod_name: str, job_task: asyncio.Task[V1Job]
+ ) -> WaitOutcome:
+ return await self._wait_until_container_state_or_job_done(
+ pod_name=pod_name,
+ container_name=self.base_container_name,
+ wait_method=self.hook.wait_until_container_complete,
+ job_task=job_task,
+ state_label="completed",
+ )
+
+ async def _wait_until_sidecar_started_or_job_done(
+ self, pod_name: str, job_task: asyncio.Task[V1Job]
+ ) -> WaitOutcome:
+ return await self._wait_until_container_state_or_job_done(
+ pod_name=pod_name,
+ container_name=PodDefaults.SIDECAR_CONTAINER_NAME,
+ wait_method=self.hook.wait_until_container_started,
+ job_task=job_task,
+ state_label="running",
+ )
+
+ async def _wait_until_container_state_or_job_done(
+ self,
+ pod_name: str,
+ container_name: str,
+ wait_method: Any,
+ job_task: asyncio.Task[V1Job],
+ state_label: str,
+ ) -> WaitOutcome:
+ poll_interval = max(self.poll_interval, 0.1)
+ wait_task = asyncio.create_task(
+ wait_method(
+ name=pod_name,
+ namespace=self.pod_namespace,
+ container_name=container_name,
+ poll_interval=poll_interval,
+ )
+ )
+
+ try:
+ while True:
+ done, _ = await asyncio.wait(
+ {wait_task, job_task},
+ timeout=poll_interval,
+ return_when=asyncio.FIRST_COMPLETED,
+ )
+
+ if wait_task in done:
+ try:
+ await wait_task
Review Comment:
`done()` means that the task is finished, but it doesn't raise exceptions or
return results. We await for the task to get the final result or exception.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]