holmuk commented on code in PR #65058:
URL: https://github.com/apache/airflow/pull/65058#discussion_r3071610289
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py:
##########
@@ -156,6 +188,233 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
}
)
+ async def _get_pod_or_none_for_xcom(self, pod_name: str) -> V1Pod | None:
+ try:
+ return await self.hook.get_pod(name=pod_name,
namespace=self.pod_namespace)
+ except Exception as err:
+ if self._is_not_found_error(err):
+ self.log.info("Pod '%s' no longer exists; skipping XCom
extraction.", pod_name)
+ return None
+ raise
+
+ async def _wait_until_container_complete_or_job_done(
+ self, pod_name: str, job_task: asyncio.Task[V1Job]
+ ) -> WaitOutcome:
+ return await self._wait_until_container_state_or_job_done(
+ pod_name=pod_name,
+ container_name=self.base_container_name,
+ wait_method=self.hook.wait_until_container_complete,
+ job_task=job_task,
+ state_label="completed",
+ )
+
+ async def _wait_until_sidecar_started_or_job_done(
+ self, pod_name: str, job_task: asyncio.Task[V1Job]
+ ) -> WaitOutcome:
+ return await self._wait_until_container_state_or_job_done(
+ pod_name=pod_name,
+ container_name=PodDefaults.SIDECAR_CONTAINER_NAME,
+ wait_method=self.hook.wait_until_container_started,
+ job_task=job_task,
+ state_label="running",
+ )
+
+ async def _wait_until_container_state_or_job_done(
+ self,
+ pod_name: str,
+ container_name: str,
+ wait_method: Any,
+ job_task: asyncio.Task[V1Job],
+ state_label: str,
+ ) -> WaitOutcome:
+ poll_interval = max(self.poll_interval, 0.1)
+ wait_task = asyncio.create_task(
+ wait_method(
+ name=pod_name,
+ namespace=self.pod_namespace,
+ container_name=container_name,
+ poll_interval=poll_interval,
+ )
+ )
+
+ try:
+ while True:
+ done, _ = await asyncio.wait(
+ {wait_task, job_task},
+ timeout=poll_interval,
+ return_when=asyncio.FIRST_COMPLETED,
+ )
+
+ if wait_task in done:
+ try:
+ await wait_task
+ return "ready"
+ except Exception as err:
+ if self._is_not_found_error(err):
+ self.log.info(
+ "Pod '%s' no longer exists while waiting for
container '%s' state '%s'; skipping.",
+ pod_name,
+ container_name,
+ state_label,
+ )
+ return "pod_missing"
+ raise
+
+ if job_task in done:
+ self.log.info(
+ "Job '%s' finished before pod '%s' container '%s'
reached state '%s'; stopping XCom wait.",
+ self.job_name,
+ pod_name,
+ container_name,
+ state_label,
+ )
+ return "job_done"
Review Comment:
Valid point
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]