holmuk commented on code in PR #65058:
URL: https://github.com/apache/airflow/pull/65058#discussion_r3071574516
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py:
##########
@@ -156,6 +188,233 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
}
)
+ async def _get_pod_or_none_for_xcom(self, pod_name: str) -> V1Pod | None:
+ try:
+ return await self.hook.get_pod(name=pod_name,
namespace=self.pod_namespace)
+ except Exception as err:
+ if self._is_not_found_error(err):
+ self.log.info("Pod '%s' no longer exists; skipping XCom
extraction.", pod_name)
+ return None
+ raise
+
+ async def _wait_until_container_complete_or_job_done(
+ self, pod_name: str, job_task: asyncio.Task[V1Job]
+ ) -> WaitOutcome:
+ return await self._wait_until_container_state_or_job_done(
+ pod_name=pod_name,
+ container_name=self.base_container_name,
+ wait_method=self.hook.wait_until_container_complete,
+ job_task=job_task,
+ state_label="completed",
+ )
+
+ async def _wait_until_sidecar_started_or_job_done(
+ self, pod_name: str, job_task: asyncio.Task[V1Job]
+ ) -> WaitOutcome:
+ return await self._wait_until_container_state_or_job_done(
+ pod_name=pod_name,
+ container_name=PodDefaults.SIDECAR_CONTAINER_NAME,
+ wait_method=self.hook.wait_until_container_started,
+ job_task=job_task,
+ state_label="running",
+ )
+
+ async def _wait_until_container_state_or_job_done(
+ self,
+ pod_name: str,
+ container_name: str,
+ wait_method: Any,
+ job_task: asyncio.Task[V1Job],
+ state_label: str,
+ ) -> WaitOutcome:
+ poll_interval = max(self.poll_interval, 0.1)
+ wait_task = asyncio.create_task(
+ wait_method(
+ name=pod_name,
+ namespace=self.pod_namespace,
+ container_name=container_name,
+ poll_interval=poll_interval,
+ )
+ )
+
+ try:
+ while True:
+ done, _ = await asyncio.wait(
+ {wait_task, job_task},
+ timeout=poll_interval,
+ return_when=asyncio.FIRST_COMPLETED,
+ )
+
+ if wait_task in done:
+ try:
+ await wait_task
+ return "ready"
+ except Exception as err:
+ if self._is_not_found_error(err):
+ self.log.info(
+ "Pod '%s' no longer exists while waiting for
container '%s' state '%s'; skipping.",
+ pod_name,
+ container_name,
+ state_label,
+ )
+ return "pod_missing"
+ raise
+
+ if job_task in done:
+ self.log.info(
+ "Job '%s' finished before pod '%s' container '%s'
reached state '%s'; stopping XCom wait.",
+ self.job_name,
+ pod_name,
+ container_name,
+ state_label,
+ )
+ return "job_done"
+ finally:
+ if not wait_task.done():
+ wait_task.cancel()
+ with suppress(asyncio.CancelledError):
+ await wait_task
+
+ async def _collect_xcom_results(
+ self,
+ job_task: asyncio.Task[V1Job],
+ poll_interval: float,
+ ) -> list[str]:
+ pre_job_results, post_job_pod_names = await
self._collect_xcom_until_job_done(job_task=job_task)
+ if not post_job_pod_names:
+ return pre_job_results
+
+ attempts = await self._collect_xcom_after_job_done_best_effort(
+ pod_names=post_job_pod_names,
+ poll_interval=poll_interval,
+ )
+ summary = self._summarize_post_job_attempts(attempts)
+ self._log_post_job_summary(summary=summary)
+ pre_job_results.extend(self._extract_successful_xcom_values(attempts))
+ return pre_job_results
+
+ async def _collect_xcom_until_job_done(
+ self,
+ job_task: asyncio.Task[V1Job],
+ ) -> tuple[list[str], list[str]]:
+ xcom_results: list[str] = []
+ post_job_pod_names: list[str] = []
+
+ for pod_index, pod_name in enumerate(self.pod_names):
+ pod = await self._get_pod_or_none_for_xcom(pod_name=pod_name)
+ if pod is None:
+ continue
+
+ completion_outcome = await
self._wait_until_container_complete_or_job_done(
+ pod_name=pod_name,
+ job_task=job_task,
+ )
+ if completion_outcome == "job_done":
+ post_job_pod_names = self.pod_names[pod_index:]
+ break
+ if completion_outcome == "pod_missing":
+ continue
+
+ self.log.info("Checking if xcom sidecar container is started.")
+ sidecar_outcome = await
self._wait_until_sidecar_started_or_job_done(
+ pod_name=pod_name,
+ job_task=job_task,
+ )
+ if sidecar_outcome == "job_done":
+ post_job_pod_names = self.pod_names[pod_index:]
+ break
+ if sidecar_outcome == "pod_missing":
+ continue
+
+ xcom_results.append(await
self._extract_xcom_for_ready_pod(pod=pod))
+
+ return xcom_results, post_job_pod_names
+
+ async def _extract_xcom_for_ready_pod(self, pod: V1Pod) -> str:
+ self.log.info("Extracting result from xcom sidecar container.")
+ loop = asyncio.get_running_loop()
+ return await loop.run_in_executor(None, self.pod_manager.extract_xcom,
pod)
+
+ async def _collect_xcom_after_job_done_best_effort(
+ self,
+ pod_names: list[str],
+ poll_interval: float,
+ ) -> list[PodXComAttempt]:
+ if not pod_names:
+ return []
+
+ per_pod_timeout = max(poll_interval, 0.1)
+ max_concurrency = min(5, len(pod_names))
+ semaphore = asyncio.Semaphore(max_concurrency)
+
+ self.log.info(
+ "Job is done; collecting XCom best-effort for %d pod(s) with
per-pod timeout %.2f seconds and max concurrency %d.",
+ len(pod_names),
+ per_pod_timeout,
+ max_concurrency,
+ )
+
+ async def extract_one_pod(pod_name: str) -> PodXComAttempt:
+ async with semaphore:
+ try:
+ return await asyncio.wait_for(
+
self._extract_xcom_for_pod_best_effort(pod_name=pod_name),
+ timeout=per_pod_timeout,
+ )
+ except asyncio.TimeoutError:
+ self.log.warning(
+ "Timed out extracting XCom from pod '%s' after job
completion; skipping.",
+ pod_name,
+ )
+ return PodXComAttempt(pod_name=pod_name, outcome="timeout")
+
+ return await asyncio.gather(*(extract_one_pod(pod_name) for pod_name
in pod_names))
+
+ async def _extract_xcom_for_pod_best_effort(self, pod_name: str) ->
PodXComAttempt:
+ pod = await self._get_pod_or_none_for_xcom(pod_name=pod_name)
+ if pod is None:
+ return PodXComAttempt(pod_name=pod_name, outcome="missing")
+
+ self.log.info("Extracting result from xcom sidecar container
(best-effort).")
+ loop = asyncio.get_running_loop()
+ try:
+ result = await loop.run_in_executor(None,
self.pod_manager.extract_xcom, pod)
Review Comment:
`self.pod_manager.extract_xcom` is a **synchronous** networking call, that
will probably block the asyncio loop. To prevent so, we use the executor.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]