This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a563fb0f39a Unify Pod Startup Tracking: KubernetesPodTriggerer and 
KubernetesPodOperator Now Share Common Startup Logic (#56875)
a563fb0f39a is described below

commit a563fb0f39af8efb78ec93759f6271fc691358c7
Author: AutomationDev85 <[email protected]>
AuthorDate: Thu Oct 30 09:04:55 2025 +0100

    Unify Pod Startup Tracking: KubernetesPodTriggerer and 
KubernetesPodOperator Now Share Common Startup Logic (#56875)
    
    * Move container-related functions from PodManager to a separate file
    
    * Moved unit tests
    
    * Sync and async workflow use the same code to track Pod startup
    
    * Reworked unit tests and pod startup logic
    
    * Add api permission error detection for triggerer
    
    * Fix pytest fixture
    
    * Removed not requried code
    
    ---------
    
    Co-authored-by: AutomationDev85 <AutomationDev85>
---
 .../providers/cncf/kubernetes/exceptions.py        |   8 +
 .../providers/cncf/kubernetes/hooks/kubernetes.py  |  35 ++-
 .../providers/cncf/kubernetes/operators/pod.py     |   1 +
 .../providers/cncf/kubernetes/triggers/pod.py      |  50 +++--
 .../providers/cncf/kubernetes/utils/pod_manager.py | 235 +++++++++++++++------
 .../unit/cncf/kubernetes/triggers/test_pod.py      |  73 +++++--
 .../unit/cncf/kubernetes/utils/test_pod_manager.py | 168 +++++++++++++--
 7 files changed, 463 insertions(+), 107 deletions(-)

diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/exceptions.py 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/exceptions.py
index c0b6ad83a3f..5503c743797 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/exceptions.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/exceptions.py
@@ -27,3 +27,11 @@ class PodMutationHookException(AirflowException):
 
 class PodReconciliationError(AirflowException):
     """Raised when an error is encountered while trying to merge pod 
configs."""
+
+
+class KubernetesApiError(AirflowException):
+    """Raised when an error is encountered while trying access Kubernetes 
API."""
+
+
+class KubernetesApiPermissionError(AirflowException):
+    """Raised when an error is encountered while trying access Kubernetes 
API."""
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
index d99314efa1c..d59c6527d6b 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
@@ -37,6 +37,7 @@ from urllib3.exceptions import HTTPError
 
 from airflow.exceptions import AirflowException, AirflowNotFoundException
 from airflow.models import Connection
+from airflow.providers.cncf.kubernetes.exceptions import KubernetesApiError, 
KubernetesApiPermissionError
 from airflow.providers.cncf.kubernetes.kube_client import _disable_verify_ssl, 
_enable_tcp_keepalive
 from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import 
should_retry_creation
 from airflow.providers.cncf.kubernetes.utils.container import (
@@ -48,7 +49,7 @@ from airflow.utils import yaml
 
 if TYPE_CHECKING:
     from kubernetes.client import V1JobList
-    from kubernetes.client.models import V1Job, V1Pod
+    from kubernetes.client.models import CoreV1EventList, V1Job, V1Pod
 
 LOADING_KUBE_CONFIG_FILE_RESOURCE = "Loading Kubernetes configuration file 
kube_config from {}..."
 
@@ -879,12 +880,17 @@ class AsyncKubernetesHook(KubernetesHook):
         :param namespace: Name of the pod's namespace.
         """
         async with self.get_conn() as connection:
-            v1_api = async_client.CoreV1Api(connection)
-            pod: V1Pod = await v1_api.read_namespaced_pod(
-                name=name,
-                namespace=namespace,
-            )
-        return pod
+            try:
+                v1_api = async_client.CoreV1Api(connection)
+                pod: V1Pod = await v1_api.read_namespaced_pod(
+                    name=name,
+                    namespace=namespace,
+                )
+                return pod
+            except HTTPError as e:
+                if hasattr(e, "status") and e.status == 403:
+                    raise KubernetesApiPermissionError("Permission denied 
(403) from Kubernetes API.") from e
+                raise KubernetesApiError from e
 
     async def delete_pod(self, name: str, namespace: str):
         """
@@ -933,6 +939,21 @@ class AsyncKubernetesHook(KubernetesHook):
                 self.log.exception("There was an error reading the kubernetes 
API.")
                 raise
 
+    async def get_pod_events(self, name: str, namespace: str) -> 
CoreV1EventList:
+        """Get pod's events."""
+        async with self.get_conn() as connection:
+            try:
+                v1_api = async_client.CoreV1Api(connection)
+                events: CoreV1EventList = await v1_api.list_namespaced_event(
+                    field_selector=f"involvedObject.name={name}",
+                    namespace=namespace,
+                )
+                return events
+            except HTTPError as e:
+                if hasattr(e, "status") and e.status == 403:
+                    raise KubernetesApiPermissionError("Permission denied 
(403) from Kubernetes API.") from e
+                raise KubernetesApiError from e
+
     async def get_job_status(self, name: str, namespace: str) -> V1Job:
         """
         Get job's status object.
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
index 1df9a1be8ff..895ee8ddab0 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -868,6 +868,7 @@ class KubernetesPodOperator(BaseOperator):
                 get_logs=self.get_logs,
                 startup_timeout=self.startup_timeout_seconds,
                 startup_check_interval=self.startup_check_interval_seconds,
+                schedule_timeout=self.schedule_timeout_seconds,
                 base_container_name=self.base_container_name,
                 on_finish_action=self.on_finish_action.value,
                 last_log_time=last_log_time,
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
index 9008e0573ec..78e760df018 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ -26,8 +26,10 @@ from typing import TYPE_CHECKING, Any, cast
 
 import tenacity
 
+from airflow.providers.cncf.kubernetes.exceptions import 
KubernetesApiPermissionError
 from airflow.providers.cncf.kubernetes.hooks.kubernetes import 
AsyncKubernetesHook
 from airflow.providers.cncf.kubernetes.utils.pod_manager import (
+    AsyncPodManager,
     OnFinishAction,
     PodLaunchTimeoutException,
     PodPhase,
@@ -69,6 +71,7 @@ class KubernetesPodTrigger(BaseTrigger):
     :param get_logs: get the stdout of the container as logs of the tasks.
     :param startup_timeout: timeout in seconds to start up the pod.
     :param startup_check_interval: interval in seconds to check if the pod has 
already started.
+    :param schedule_timeout: timeout in seconds to schedule pod in cluster.
     :param on_finish_action: What to do when the pod reaches its final state, 
or the execution is interrupted.
         If "delete_pod", the pod will be deleted regardless its state; if 
"delete_succeeded_pod",
         only succeeded pod will be deleted. You can set to "keep_pod" to keep 
the pod.
@@ -91,7 +94,8 @@ class KubernetesPodTrigger(BaseTrigger):
         in_cluster: bool | None = None,
         get_logs: bool = True,
         startup_timeout: int = 120,
-        startup_check_interval: int = 5,
+        startup_check_interval: float = 5,
+        schedule_timeout: int = 120,
         on_finish_action: str = "delete_pod",
         last_log_time: DateTime | None = None,
         logging_interval: int | None = None,
@@ -110,11 +114,11 @@ class KubernetesPodTrigger(BaseTrigger):
         self.get_logs = get_logs
         self.startup_timeout = startup_timeout
         self.startup_check_interval = startup_check_interval
+        self.schedule_timeout = schedule_timeout
         self.last_log_time = last_log_time
         self.logging_interval = logging_interval
         self.on_finish_action = OnFinishAction(on_finish_action)
         self.trigger_kwargs = trigger_kwargs or {}
-
         self._since_time = None
 
     def serialize(self) -> tuple[str, dict[str, Any]]:
@@ -133,6 +137,7 @@ class KubernetesPodTrigger(BaseTrigger):
                 "get_logs": self.get_logs,
                 "startup_timeout": self.startup_timeout,
                 "startup_check_interval": self.startup_check_interval,
+                "schedule_timeout": self.schedule_timeout,
                 "trigger_start_time": self.trigger_start_time,
                 "on_finish_action": self.on_finish_action.value,
                 "last_log_time": self.last_log_time,
@@ -187,6 +192,22 @@ class KubernetesPodTrigger(BaseTrigger):
                 }
             )
             return
+        except KubernetesApiPermissionError as e:
+            message = (
+                "Kubernetes API permission error: The triggerer may not have 
sufficient permissions to monitor or delete pods. "
+                "Please ensure the triggerer's service account is included in 
the 'pod-launcher-role' as defined in the latest Airflow Helm chart. "
+                f"Original error: {e}"
+            )
+            yield TriggerEvent(
+                {
+                    "name": self.pod_name,
+                    "namespace": self.pod_namespace,
+                    "status": "error",
+                    "message": message,
+                    **self.trigger_kwargs,
+                }
+            )
+            return
         except Exception as e:
             self.log.exception(
                 "Unexpected error while waiting for pod %s in namespace %s",
@@ -219,17 +240,16 @@ class KubernetesPodTrigger(BaseTrigger):
 
     async def _wait_for_pod_start(self) -> ContainerState:
         """Loops until pod phase leaves ``PENDING`` If timeout is reached, 
throws error."""
-        while True:
-            pod = await self._get_pod()
-            if not pod.status.phase == "Pending":
-                return self.define_container_state(pod)
-
-            delta = datetime.datetime.now(tz=datetime.timezone.utc) - 
self.trigger_start_time
-            if self.startup_timeout < delta.total_seconds():
-                raise PodLaunchTimeoutException("Pod did not leave 'Pending' 
phase within specified timeout")
-
-            self.log.info("Still waiting for pod to start. The pod state is 
%s", pod.status.phase)
-            await asyncio.sleep(self.startup_check_interval)
+        pod = await self._get_pod()
+        events_task = self.pod_manager.watch_pod_events(pod, 
self.startup_check_interval)
+        pod_start_task = self.pod_manager.await_pod_start(
+            pod=pod,
+            schedule_timeout=self.schedule_timeout,
+            startup_timeout=self.startup_timeout,
+            check_interval=self.startup_check_interval,
+        )
+        await asyncio.gather(pod_start_task, events_task)
+        return self.define_container_state(await self._get_pod())
 
     async def _wait_for_container_completion(self) -> TriggerEvent:
         """
@@ -297,6 +317,10 @@ class KubernetesPodTrigger(BaseTrigger):
             cluster_context=self.cluster_context,
         )
 
+    @cached_property
+    def pod_manager(self) -> AsyncPodManager:
+        return AsyncPodManager(async_hook=self.hook)
+
     def define_container_state(self, pod: V1Pod) -> ContainerState:
         pod_containers = pod.status.container_statuses
 
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index c9dc45e57f4..789ad4db79e 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -60,6 +60,8 @@ if TYPE_CHECKING:
     from kubernetes.client.models.v1_pod_condition import V1PodCondition
     from urllib3.response import HTTPResponse
 
+    from airflow.providers.cncf.kubernetes.hooks.kubernetes import 
AsyncKubernetesHook
+
 
 EMPTY_XCOM_RESULT = "__airflow_xcom_result_empty__"
 """
@@ -99,6 +101,109 @@ def check_exception_is_kubernetes_api_unauthorized(exc: 
BaseException):
     return isinstance(exc, ApiException) and exc.status and str(exc.status) == 
"401"
 
 
+async def watch_pod_events(
+    pod_manager: PodManager | AsyncPodManager,
+    pod: V1Pod,
+    check_interval: float = 1,
+) -> None:
+    """
+    Read pod events and write them to the log.
+
+    This function supports both asynchronous and synchronous pod managers.
+
+    :param pod_manager: The pod manager instance (PodManager or 
AsyncPodManager).
+    :param pod: The pod object to monitor.
+    :param check_interval: Interval (in seconds) between checks.
+    """
+    num_events = 0
+    is_async = isinstance(pod_manager, AsyncPodManager)
+    while not pod_manager.stop_watching_events:
+        if is_async:
+            events = await pod_manager.read_pod_events(pod)
+        else:
+            events = pod_manager.read_pod_events(pod)
+        for new_event in events.items[num_events:]:
+            involved_object: V1ObjectReference = new_event.involved_object
+            pod_manager.log.info(
+                "The Pod has an Event: %s from %s", new_event.message, 
involved_object.field_path
+            )
+        num_events = len(events.items)
+        await asyncio.sleep(check_interval)
+
+
+async def await_pod_start(
+    pod_manager: PodManager | AsyncPodManager,
+    pod: V1Pod,
+    schedule_timeout: int = 120,
+    startup_timeout: int = 120,
+    check_interval: float = 1,
+):
+    """
+    Monitor the startup phase of a Kubernetes pod, waiting for it to leave the 
``Pending`` state.
+
+    This function is shared by both PodManager and AsyncPodManager to provide 
consistent pod startup tracking.
+
+    :param pod_manager: The pod manager instance (PodManager or 
AsyncPodManager).
+    :param pod: The pod object to monitor.
+    :param schedule_timeout: Maximum time (in seconds) to wait for the pod to 
be scheduled.
+    :param startup_timeout: Maximum time (in seconds) to wait for the pod to 
start running after being scheduled.
+    :param check_interval: Interval (in seconds) between status checks.
+    :param is_async: Set to True if called in an async context; otherwise, 
False.
+    """
+    pod_manager.log.info("::group::Waiting until %ss to get the POD 
scheduled...", schedule_timeout)
+    pod_was_scheduled = False
+    start_check_time = time.time()
+    is_async = isinstance(pod_manager, AsyncPodManager)
+    while True:
+        if is_async:
+            remote_pod = await pod_manager.read_pod(pod)
+        else:
+            remote_pod = pod_manager.read_pod(pod)
+        pod_status = remote_pod.status
+        if pod_status.phase != PodPhase.PENDING:
+            pod_manager.stop_watching_events = True
+            pod_manager.log.info("::endgroup::")
+            break
+
+        # Check for timeout
+        pod_conditions: list[V1PodCondition] = pod_status.conditions
+        if pod_conditions and any(
+            (condition.type == "PodScheduled" and condition.status == "True") 
for condition in pod_conditions
+        ):
+            if not pod_was_scheduled:
+                # POD was initially scheduled update timeout for getting POD 
launched
+                pod_was_scheduled = True
+                start_check_time = time.time()
+                pod_manager.log.info("Waiting %ss to get the POD running...", 
startup_timeout)
+
+            if time.time() - start_check_time >= startup_timeout:
+                pod_manager.log.info("::endgroup::")
+                raise PodLaunchTimeoutException(
+                    f"Pod took too long to start. More than 
{startup_timeout}s. Check the pod events in kubernetes."
+                )
+        else:
+            if time.time() - start_check_time >= schedule_timeout:
+                pod_manager.log.info("::endgroup::")
+                raise PodLaunchTimeoutException(
+                    f"Pod took too long to be scheduled on the cluster, giving 
up. More than {schedule_timeout}s. Check the pod events in kubernetes."
+                )
+
+        # Check for general problems to terminate early - ErrImagePull
+        if pod_status.container_statuses:
+            for container_status in pod_status.container_statuses:
+                container_state: V1ContainerState = container_status.state
+                container_waiting: V1ContainerStateWaiting | None = 
container_state.waiting
+                if container_waiting:
+                    if container_waiting.reason in ["ErrImagePull", 
"InvalidImageName"]:
+                        pod_manager.log.info("::endgroup::")
+                        raise PodLaunchFailedException(
+                            f"Pod docker image cannot be pulled, unable to 
start: {container_waiting.reason}"
+                            f"\n{container_waiting.message}"
+                        )
+
+        await asyncio.sleep(check_interval)
+
+
 class PodLaunchTimeoutException(AirflowException):
     """When pod does not leave the ``Pending`` phase within specified 
timeout."""
 
@@ -262,16 +367,7 @@ class PodManager(LoggingMixin):
 
     async def watch_pod_events(self, pod: V1Pod, check_interval: int = 1) -> 
None:
         """Read pod events and writes into log."""
-        num_events = 0
-        while not self.stop_watching_events:
-            events = self.read_pod_events(pod)
-            for new_event in events.items[num_events:]:
-                involved_object: V1ObjectReference = new_event.involved_object
-                self.log.info(
-                    "The Pod has an Event: %s from %s", new_event.message, 
involved_object.field_path
-                )
-            num_events = len(events.items)
-            await asyncio.sleep(check_interval)
+        await watch_pod_events(pod_manager=self, pod=pod, 
check_interval=check_interval)
 
     async def await_pod_start(
         self, pod: V1Pod, schedule_timeout: int = 120, startup_timeout: int = 
120, check_interval: int = 1
@@ -287,55 +383,13 @@ class PodManager(LoggingMixin):
         :param check_interval: Interval (in seconds) between checks
         :return:
         """
-        self.log.info("::group::Waiting until %ss to get the POD 
scheduled...", schedule_timeout)
-        pod_was_scheduled = False
-        start_check_time = time.time()
-        while True:
-            remote_pod = self.read_pod(pod)
-            pod_status = remote_pod.status
-            if pod_status.phase != PodPhase.PENDING:
-                self.stop_watching_events = True
-                self.log.info("::endgroup::")
-                break
-
-            # Check for timeout
-            pod_conditions: list[V1PodCondition] = pod_status.conditions
-            if pod_conditions and any(
-                (condition.type == "PodScheduled" and condition.status == 
"True")
-                for condition in pod_conditions
-            ):
-                if not pod_was_scheduled:
-                    # POD was initially scheduled update timeout for getting 
POD launched
-                    pod_was_scheduled = True
-                    start_check_time = time.time()
-                    self.log.info("Waiting %ss to get the POD running...", 
startup_timeout)
-
-                if time.time() - start_check_time >= startup_timeout:
-                    self.log.info("::endgroup::")
-                    raise PodLaunchFailedException(
-                        f"Pod took too long to start. More than 
{startup_timeout}s. Check the pod events in kubernetes."
-                    )
-            else:
-                if time.time() - start_check_time >= schedule_timeout:
-                    self.log.info("::endgroup::")
-                    raise PodLaunchFailedException(
-                        f"Pod took too long to be scheduled on the cluster, 
giving up. More than {schedule_timeout}s. Check the pod events in kubernetes."
-                    )
-
-            # Check for general problems to terminate early - ErrImagePull
-            if pod_status.container_statuses:
-                for container_status in pod_status.container_statuses:
-                    container_state: V1ContainerState = container_status.state
-                    container_waiting: V1ContainerStateWaiting | None = 
container_state.waiting
-                    if container_waiting:
-                        if container_waiting.reason in ["ErrImagePull", 
"InvalidImageName"]:
-                            self.log.info("::endgroup::")
-                            raise PodLaunchFailedException(
-                                f"Pod docker image cannot be pulled, unable to 
start: {container_waiting.reason}"
-                                f"\n{container_waiting.message}"
-                            )
-
-            await asyncio.sleep(check_interval)
+        await await_pod_start(
+            pod_manager=self,
+            pod=pod,
+            schedule_timeout=schedule_timeout,
+            startup_timeout=startup_timeout,
+            check_interval=check_interval,
+        )
 
     def _log_message(
         self,
@@ -915,3 +969,66 @@ class OnFinishAction(str, enum.Enum):
 def is_log_group_marker(line: str) -> bool:
     """Check if the line is a log group marker like `::group::` or 
`::endgroup::`."""
     return line.startswith("::group::") or line.startswith("::endgroup::")
+
+
+class AsyncPodManager(LoggingMixin):
+    """Create, monitor, and otherwise interact with Kubernetes pods for use 
with the KubernetesPodTriggerer."""
+
+    def __init__(
+        self,
+        async_hook: AsyncKubernetesHook,
+        callbacks: list[type[KubernetesPodOperatorCallback]] | None = None,
+    ):
+        """
+        Create the launcher.
+
+        :param kube_client: kubernetes client
+        :param callbacks:
+        """
+        super().__init__()
+        self._hook = async_hook
+        self._watch = watch.Watch()
+        self._callbacks = callbacks or []
+        self.stop_watching_events = False
+
+    @tenacity.retry(stop=tenacity.stop_after_attempt(5), 
wait=tenacity.wait_exponential(), reraise=True)
+    async def read_pod(self, pod: V1Pod) -> V1Pod:
+        """Read POD information."""
+        return await self._hook.get_pod(
+            pod.metadata.name,
+            pod.metadata.namespace,
+        )
+
+    @tenacity.retry(stop=tenacity.stop_after_attempt(5), 
wait=tenacity.wait_exponential(), reraise=True)
+    async def read_pod_events(self, pod: V1Pod) -> CoreV1EventList:
+        """Get pod's events."""
+        return await self._hook.get_pod_events(
+            pod.metadata.name,
+            pod.metadata.namespace,
+        )
+
+    async def watch_pod_events(self, pod: V1Pod, check_interval: float = 1) -> 
None:
+        """Read pod events and writes into log."""
+        await watch_pod_events(pod_manager=self, pod=pod, 
check_interval=check_interval)
+
+    async def await_pod_start(
+        self, pod: V1Pod, schedule_timeout: int = 120, startup_timeout: int = 
120, check_interval: float = 1
+    ) -> None:
+        """
+        Wait for the pod to reach phase other than ``Pending``.
+
+        :param pod:
+        :param schedule_timeout: Timeout (in seconds) for pod stay in schedule 
state
+            (if pod is taking to long in schedule state, fails task)
+        :param startup_timeout: Timeout (in seconds) for startup of the pod
+            (if pod is pending for too long after being scheduled, fails task)
+        :param check_interval: Interval (in seconds) between checks
+        :return:
+        """
+        await await_pod_start(
+            pod_manager=self,
+            pod=pod,
+            schedule_timeout=schedule_timeout,
+            startup_timeout=startup_timeout,
+            check_interval=check_interval,
+        )
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
index 66fae2524d6..fc477fe0cf7 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
@@ -44,6 +44,7 @@ CONFIG_DICT = {"a": "b"}
 IN_CLUSTER = False
 GET_LOGS = True
 STARTUP_TIMEOUT_SECS = 120
+STARTUP_CHECK_INTERVAL_SECS = 0.1
 TRIGGER_START_TIME = datetime.datetime.now(tz=datetime.timezone.utc)
 FAILED_RESULT_MSG = "Test message that appears when trigger have failed event."
 BASE_CONTAINER_NAME = "base"
@@ -63,11 +64,24 @@ def trigger():
         in_cluster=IN_CLUSTER,
         get_logs=GET_LOGS,
         startup_timeout=STARTUP_TIMEOUT_SECS,
+        startup_check_interval=STARTUP_CHECK_INTERVAL_SECS,
+        schedule_timeout=STARTUP_TIMEOUT_SECS,
         trigger_start_time=TRIGGER_START_TIME,
         on_finish_action=ON_FINISH_ACTION,
     )
 
 
[email protected]
+def mock_time_fixture():
+    """Fixture to simulate time passage beyond startup timeout."""
+    with mock.patch("time.time") as mock_time:
+        start_time = 1000
+        mock_time.side_effect = [
+            *(start_time + STARTUP_TIMEOUT_SECS * n for n in range(5)),
+        ]
+        yield mock_time
+
+
 def get_read_pod_mock_containers(statuses_to_emit=None):
     """
     Emit pods with given phases sequentially.
@@ -106,7 +120,8 @@ class TestKubernetesPodTrigger:
             "in_cluster": IN_CLUSTER,
             "get_logs": GET_LOGS,
             "startup_timeout": STARTUP_TIMEOUT_SECS,
-            "startup_check_interval": 5,
+            "startup_check_interval": STARTUP_CHECK_INTERVAL_SECS,
+            "schedule_timeout": STARTUP_TIMEOUT_SECS,
             "trigger_start_time": TRIGGER_START_TIME,
             "on_finish_action": ON_FINISH_ACTION,
             "last_log_time": None,
@@ -138,7 +153,7 @@ class TestKubernetesPodTrigger:
     async def test_run_loop_return_waiting_event(
         self, mock_hook, mock_method, mock_wait_pod, trigger, caplog
     ):
-        mock_hook.get_pod.return_value = 
self._mock_pod_result(mock.MagicMock())
+        mock_hook.get_pod.return_value = 
self._mock_pod_result(mock.AsyncMock())
         mock_method.return_value = ContainerState.WAITING
 
         caplog.set_level(logging.INFO)
@@ -157,7 +172,7 @@ class TestKubernetesPodTrigger:
     async def test_run_loop_return_running_event(
         self, mock_hook, mock_method, mock_wait_pod, trigger, caplog
     ):
-        mock_hook.get_pod.return_value = 
self._mock_pod_result(mock.MagicMock())
+        mock_hook.get_pod.return_value = 
self._mock_pod_result(mock.AsyncMock())
         mock_method.return_value = ContainerState.RUNNING
 
         caplog.set_level(logging.INFO)
@@ -229,7 +244,7 @@ class TestKubernetesPodTrigger:
         Test that KubernetesPodTrigger fires the correct event in case of fail.
         """
 
-        mock_hook.get_pod.return_value = 
self._mock_pod_result(mock.MagicMock())
+        mock_hook.get_pod.return_value = 
self._mock_pod_result(mock.AsyncMock())
         mock_method.return_value = ContainerState.FAILED
         caplog.set_level(logging.INFO)
 
@@ -319,14 +334,46 @@ class TestKubernetesPodTrigger:
 
         assert expected_state == trigger.define_container_state(pod)
 
+    @pytest.mark.asyncio
+    @mock.patch(f"{TRIGGER_PATH}.define_container_state")
+    @mock.patch(f"{TRIGGER_PATH}.hook")
+    async def test_run_loop_read_events_during_start(self, mock_hook, 
mock_method, trigger):
+        event1 = mock.AsyncMock()
+        event1.message = "event 1"
+        event1.involved_object.field_path = "object 1"
+        event2 = mock.AsyncMock()
+        event2.message = "event 2"
+        event2.involved_object.field_path = "object 2"
+        events_list = mock.AsyncMock()
+        events_list.items = [event1, event2]
+
+        mock_hook.get_pod_events = mock.AsyncMock(return_value=events_list)
+
+        pod_pending = mock.MagicMock()
+        pod_pending.status.phase = PodPhase.PENDING
+        pod_succeeded = mock.MagicMock()
+        pod_succeeded.status.phase = PodPhase.SUCCEEDED
+
+        mock_hook.get_pod = mock.AsyncMock(
+            side_effect=[pod_pending, pod_pending, pod_succeeded, 
pod_succeeded]
+        )
+
+        mock_method.return_value = ContainerState.TERMINATED
+
+        with mock.patch.object(trigger.pod_manager.log, "info") as 
mock_log_info:
+            generator = trigger.run()
+            await generator.asend(None)
+
+            mock_log_info.assert_any_call("The Pod has an Event: %s from %s", 
"event 1", "object 1")
+            mock_log_info.assert_any_call("The Pod has an Event: %s from %s", 
"event 2", "object 2")
+
     @pytest.mark.asyncio
     @pytest.mark.parametrize("container_state", [ContainerState.WAITING, 
ContainerState.UNDEFINED])
     @mock.patch(f"{TRIGGER_PATH}.define_container_state")
     @mock.patch(f"{TRIGGER_PATH}.hook")
     async def test_run_loop_return_timeout_event(
-        self, mock_hook, mock_method, trigger, caplog, container_state
+        self, mock_hook, mock_method, trigger, container_state, 
mock_time_fixture
     ):
-        trigger.trigger_start_time = TRIGGER_START_TIME - 
datetime.timedelta(minutes=2)
         mock_hook.get_pod.return_value = self._mock_pod_result(
             mock.MagicMock(
                 status=mock.MagicMock(
@@ -335,9 +382,6 @@ class TestKubernetesPodTrigger:
             )
         )
         mock_method.return_value = container_state
-
-        caplog.set_level(logging.INFO)
-
         generator = trigger.run()
         actual = await generator.asend(None)
         assert (
@@ -346,7 +390,7 @@ class TestKubernetesPodTrigger:
                     "name": POD_NAME,
                     "namespace": NAMESPACE,
                     "status": "timeout",
-                    "message": "Pod did not leave 'Pending' phase within 
specified timeout",
+                    "message": "Pod took too long to be scheduled on the 
cluster, giving up. More than 120s. Check the pod events in kubernetes.",
                 }
             )
             == actual
@@ -356,14 +400,13 @@ class TestKubernetesPodTrigger:
     @mock.patch(f"{TRIGGER_PATH}.define_container_state")
     @mock.patch(f"{TRIGGER_PATH}.hook")
     async def test_run_loop_return_success_for_completed_pod_after_timeout(
-        self, mock_hook, mock_method, trigger, caplog
+        self, mock_hook, mock_method, trigger, mock_time_fixture
     ):
         """
         Test that the trigger correctly recognizes the pod is not pending even 
after the timeout has been
         reached. This may happen when a new triggerer process takes over the 
trigger, the pod already left
         pending state and the timeout has been reached.
         """
-        trigger.trigger_start_time = TRIGGER_START_TIME - 
datetime.timedelta(minutes=2)
         mock_hook.get_pod.return_value = self._mock_pod_result(
             mock.MagicMock(
                 status=mock.MagicMock(
@@ -373,8 +416,6 @@ class TestKubernetesPodTrigger:
         )
         mock_method.return_value = ContainerState.TERMINATED
 
-        caplog.set_level(logging.INFO)
-
         generator = trigger.run()
         actual = await generator.asend(None)
         assert (
@@ -396,7 +437,7 @@ class TestKubernetesPodTrigger:
         Test that KubernetesPodTrigger _get_pod is called with the correct 
arguments.
         """
 
-        mock_hook.get_pod.return_value = 
self._mock_pod_result(mock.MagicMock())
+        mock_hook.get_pod.return_value = 
self._mock_pod_result(mock.AsyncMock())
 
         await trigger._get_pod()
         mock_hook.get_pod.assert_called_with(name=POD_NAME, 
namespace=NAMESPACE)
@@ -423,7 +464,7 @@ class TestKubernetesPodTrigger:
         the hook.get_pod call.
         """
 
-        side_effects = [Exception("Test exception") for _ in range(exc_count)] 
+ [MagicMock()]
+        side_effects = [Exception("Test exception") for _ in range(exc_count)] 
+ [mock.AsyncMock()]
 
         mock_hook.get_pod.side_effect = 
mock.AsyncMock(side_effect=side_effects)
         # We expect the exception to be raised only if the number of retries 
is exceeded
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
index 2634119fc29..485bdbd769e 100644
--- 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
+++ 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
@@ -31,6 +31,7 @@ from urllib3.exceptions import HTTPError as BaseHTTPError
 
 from airflow.exceptions import AirflowException
 from airflow.providers.cncf.kubernetes.utils.pod_manager import (
+    AsyncPodManager,
     PodLogsConsumer,
     PodManager,
     PodPhase,
@@ -477,7 +478,7 @@ class TestPodManager:
 
     @pytest.mark.asyncio
     @mock.patch("asyncio.sleep", new_callable=mock.AsyncMock)
-    async def test_start_pod_startup_interval_seconds(self, mock_time_sleep, 
caplog):
+    async def test_start_pod_startup_interval_seconds(self, mock_time_sleep):
         condition_scheduled = mock.MagicMock()
         condition_scheduled.type = "PodScheduled"
         condition_scheduled.status = "True"
@@ -500,17 +501,21 @@ class TestPodManager:
         schedule_timeout = 30
         startup_timeout = 60
         mock_pod = MagicMock()
-        await self.pod_manager.await_pod_start(
-            pod=mock_pod,
-            schedule_timeout=schedule_timeout,  # Never hit, any value is 
fine, as time.sleep is mocked to do nothing
-            startup_timeout=startup_timeout,  # Never hit, any value is fine, 
as time.sleep is mocked to do nothing
-            check_interval=startup_check_interval,
-        )
-        mock_time_sleep.assert_called_with(startup_check_interval)
-        assert mock_time_sleep.call_count == 3
-        assert f"::group::Waiting until {schedule_timeout}s to get the POD 
scheduled..." in caplog.text
-        assert f"Waiting {startup_timeout}s to get the POD running..." in 
caplog.text
-        assert self.pod_manager.stop_watching_events is True
+
+        with mock.patch.object(self.pod_manager.log, "info") as mock_log_info:
+            await self.pod_manager.await_pod_start(
+                pod=mock_pod,
+                schedule_timeout=schedule_timeout,  # Never hit, any value is 
fine, as time.sleep is mocked to do nothing
+                startup_timeout=startup_timeout,  # Never hit, any value is 
fine, as time.sleep is mocked to do nothing
+                check_interval=startup_check_interval,
+            )
+            mock_time_sleep.assert_called_with(startup_check_interval)
+            assert self.pod_manager.stop_watching_events is True
+            assert mock_time_sleep.call_count == 3
+            mock_log_info.assert_any_call(
+                "::group::Waiting until %ss to get the POD scheduled...", 
schedule_timeout
+            )
+            mock_log_info.assert_any_call("Waiting %ss to get the POD 
running...", startup_timeout)
 
     
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running")
     def test_container_is_running(self, container_is_running_mock):
@@ -700,6 +705,145 @@ class TestPodManager:
         mock_container_is_running.assert_any_call(mock_pod, 
"airflow-xcom-sidecar")
 
 
+class TestAsyncPodManager:
+    def setup_method(self):
+        self.mock_async_hook = mock.AsyncMock()
+        self.async_pod_manager = AsyncPodManager(
+            async_hook=self.mock_async_hook,
+            callbacks=[],
+        )
+
+    @pytest.mark.asyncio
+    async def 
test_start_pod_raises_informative_error_on_scheduled_timeout(self):
+        pod_response = mock.MagicMock()
+        pod_response.status.phase = "Pending"
+        self.mock_async_hook.get_pod.return_value = pod_response
+        expected_msg = "Pod took too long to be scheduled on the cluster, 
giving up. More than 0s. Check the pod events in kubernetes."
+        mock_pod = mock.MagicMock()
+        with pytest.raises(AirflowException, match=expected_msg):
+            await self.async_pod_manager.await_pod_start(
+                pod=mock_pod,
+                schedule_timeout=0,
+                startup_timeout=0,
+            )
+        self.mock_async_hook.get_pod.assert_called()
+
+    @pytest.mark.asyncio
+    async def test_start_pod_raises_informative_error_on_startup_timeout(self):
+        pod_response = mock.MagicMock()
+        pod_response.status.phase = "Pending"
+        condition = mock.MagicMock()
+        condition.type = "PodScheduled"
+        condition.status = "True"
+        pod_response.status.conditions = [condition]
+        self.mock_async_hook.get_pod.return_value = pod_response
+        expected_msg = "Pod took too long to start. More than 0s. Check the 
pod events in kubernetes."
+        mock_pod = mock.MagicMock()
+        with pytest.raises(AirflowException, match=expected_msg):
+            await self.async_pod_manager.await_pod_start(
+                pod=mock_pod,
+                schedule_timeout=0,
+                startup_timeout=0,
+            )
+        self.mock_async_hook.get_pod.assert_called()
+
+    @pytest.mark.asyncio
+    async def test_start_pod_raises_fast_error_on_image_error(self):
+        pod_response = mock.MagicMock()
+        pod_response.status.phase = "Pending"
+        container_status = mock.MagicMock()
+        waiting_state = mock.MagicMock()
+        waiting_state.reason = "ErrImagePull"
+        waiting_state.message = "Test error"
+        container_status.state.waiting = waiting_state
+        pod_response.status.container_statuses = [container_status]
+        self.mock_async_hook.get_pod.return_value = pod_response
+        expected_msg = f"Pod docker image cannot be pulled, unable to start: 
{waiting_state.reason}\n{waiting_state.message}"
+        mock_pod = mock.MagicMock()
+        with pytest.raises(AirflowException, match=expected_msg):
+            await self.async_pod_manager.await_pod_start(
+                pod=mock_pod,
+                schedule_timeout=60,
+                startup_timeout=60,
+            )
+        self.mock_async_hook.get_pod.assert_called()
+
+    @pytest.mark.asyncio
+    @mock.patch("asyncio.sleep", new_callable=mock.AsyncMock)
+    async def test_start_pod_startup_interval_seconds(self, mock_time_sleep):
+        condition_scheduled = mock.MagicMock()
+        condition_scheduled.type = "PodScheduled"
+        condition_scheduled.status = "True"
+
+        pod_info_pending = mock.MagicMock()
+        pod_info_pending.status.phase = PodPhase.PENDING
+        pod_info_pending.status.conditions = []
+
+        pod_info_pending_scheduled = mock.MagicMock()
+        pod_info_pending_scheduled.status.phase = PodPhase.PENDING
+        pod_info_pending_scheduled.status.conditions = [condition_scheduled]
+
+        pod_info_succeeded = mock.MagicMock()
+        pod_info_succeeded.status.phase = PodPhase.SUCCEEDED
+
+        # Simulate sequence of pod states
+        self.mock_async_hook.get_pod.side_effect = [
+            pod_info_pending,
+            pod_info_pending_scheduled,
+            pod_info_pending_scheduled,
+            pod_info_succeeded,
+        ]
+        startup_check_interval = 10
+        schedule_timeout = 30
+        startup_timeout = 60
+        mock_pod = mock.MagicMock()
+        with mock.patch.object(self.async_pod_manager.log, "info") as 
mock_log_info:
+            await self.async_pod_manager.await_pod_start(
+                pod=mock_pod,
+                schedule_timeout=schedule_timeout,
+                startup_timeout=startup_timeout,
+                check_interval=startup_check_interval,
+            )
+            assert mock_time_sleep.call_count == 3
+            mock_log_info.assert_any_call(
+                "::group::Waiting until %ss to get the POD scheduled...", 
schedule_timeout
+            )
+            mock_log_info.assert_any_call("Waiting %ss to get the POD 
running...", startup_timeout)
+            assert self.async_pod_manager.stop_watching_events is True
+
+    @pytest.mark.asyncio
+    @mock.patch("asyncio.sleep", new_callable=mock.AsyncMock)
+    async def test_watch_pod_events(self, mock_time_sleep):
+        mock_pod = mock.MagicMock()
+        mock_pod.metadata.name = "test-pod"
+        mock_pod.metadata.namespace = "default"
+
+        events = mock.MagicMock()
+        events.items = []
+        for id in ["event 1", "event 2"]:
+            event = mock.MagicMock()
+            event.message = f"test {id}"
+            event.involved_object.field_path = f"object {id}"
+            events.items.append(event)
+        startup_check_interval = 10
+
+        def get_pod_events_side_effect(name, namespace):
+            self.async_pod_manager.stop_watching_events = True
+            return events
+
+        self.mock_async_hook.get_pod_events.side_effect = 
get_pod_events_side_effect
+
+        with mock.patch.object(self.async_pod_manager.log, "info") as 
mock_log_info:
+            await self.async_pod_manager.watch_pod_events(pod=mock_pod, 
check_interval=startup_check_interval)
+            mock_log_info.assert_any_call(
+                "The Pod has an Event: %s from %s", "test event 1", "object 
event 1"
+            )
+            mock_log_info.assert_any_call(
+                "The Pod has an Event: %s from %s", "test event 2", "object 
event 2"
+            )
+            mock_time_sleep.assert_called_once_with(startup_check_interval)
+
+
 class TestPodLogsConsumer:
     @pytest.mark.parametrize(
         "chunks, expected_logs",


Reply via email to