This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 81dee2f4fe1 Optimize K8s API usage for watching events (#59080)
81dee2f4fe1 is described below
commit 81dee2f4fe14ea8471b4d2047d94973364dd1d75
Author: Daniel Wolf <[email protected]>
AuthorDate: Fri Dec 12 00:04:32 2025 +0100
Optimize K8s API usage for watching events (#59080)
* Optimize K8s API usage for watching events
* Fix mypy errors
* Update
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
Co-authored-by: Jens Scheffler <[email protected]>
* Fix hanging API communication during pod event watching
---------
Co-authored-by: Jens Scheffler <[email protected]>
Co-authored-by: AutomationDev85 <AutomationDev85>
---
chart/templates/rbac/pod-launcher-role.yaml | 1 +
.../providers/cncf/kubernetes/hooks/kubernetes.py | 96 +++++-
.../providers/cncf/kubernetes/operators/pod.py | 26 +-
.../providers/cncf/kubernetes/triggers/pod.py | 27 +-
.../providers/cncf/kubernetes/utils/pod_manager.py | 87 +++--
.../unit/cncf/kubernetes/hooks/test_kubernetes.py | 211 ++++++++++++
.../unit/cncf/kubernetes/triggers/test_pod.py | 24 +-
.../unit/cncf/kubernetes/utils/test_pod_manager.py | 374 ++++++++++++++++++++-
8 files changed, 781 insertions(+), 65 deletions(-)
diff --git a/chart/templates/rbac/pod-launcher-role.yaml
b/chart/templates/rbac/pod-launcher-role.yaml
index 454c1d5f31b..c6f3a54d19f 100644
--- a/chart/templates/rbac/pod-launcher-role.yaml
+++ b/chart/templates/rbac/pod-launcher-role.yaml
@@ -76,4 +76,5 @@ rules:
- "events"
verbs:
- "list"
+ - "watch"
{{- end }}
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
index 44231631583..1f1ab896a09 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
@@ -20,7 +20,6 @@ import asyncio
import contextlib
import json
import tempfile
-from collections.abc import Generator
from functools import cached_property
from time import sleep
from typing import TYPE_CHECKING, Any, Protocol
@@ -31,7 +30,7 @@ from asgiref.sync import sync_to_async
from kubernetes import client, config, utils, watch
from kubernetes.client.models import V1Deployment
from kubernetes.config import ConfigException
-from kubernetes_asyncio import client as async_client, config as async_config
+from kubernetes_asyncio import client as async_client, config as async_config,
watch as async_watch
from urllib3.exceptions import HTTPError
from airflow.models import Connection
@@ -46,8 +45,10 @@ from airflow.providers.common.compat.sdk import
AirflowException, AirflowNotFoun
from airflow.utils import yaml
if TYPE_CHECKING:
+ from collections.abc import AsyncGenerator, Generator
+
from kubernetes.client import V1JobList
- from kubernetes.client.models import CoreV1EventList, V1Job, V1Pod
+ from kubernetes.client.models import CoreV1Event, CoreV1EventList, V1Job,
V1Pod
LOADING_KUBE_CONFIG_FILE_RESOURCE = "Loading Kubernetes configuration file
kube_config from {}..."
@@ -782,6 +783,7 @@ class AsyncKubernetesHook(KubernetesHook):
self.config_dict = config_dict
self._extras: dict | None = None
+ self._event_polling_fallback = False
async def _load_config(self):
"""Return Kubernetes API session for use with requests."""
@@ -953,14 +955,24 @@ class AsyncKubernetesHook(KubernetesHook):
raise KubernetesApiError from e
@generic_api_retry
- async def get_pod_events(self, name: str, namespace: str) ->
CoreV1EventList:
- """Get pod's events."""
+ async def get_pod_events(
+ self, name: str, namespace: str, resource_version: str | None = None
+ ) -> CoreV1EventList:
+ """
+ Get pod events.
+
+ :param name: Pod name to get events for
+ :param namespace: Kubernetes namespace
+ :param resource_version: Only return events not older than this
resource version
+ """
async with self.get_conn() as connection:
try:
v1_api = async_client.CoreV1Api(connection)
events: CoreV1EventList = await v1_api.list_namespaced_event(
field_selector=f"involvedObject.name={name}",
namespace=namespace,
+ resource_version=resource_version,
+ resource_version_match="NotOlderThan" if resource_version
else None,
)
return events
except HTTPError as e:
@@ -968,6 +980,80 @@ class AsyncKubernetesHook(KubernetesHook):
raise KubernetesApiPermissionError("Permission denied
(403) from Kubernetes API.") from e
raise KubernetesApiError from e
+ @generic_api_retry
+ async def watch_pod_events(
+ self,
+ name: str,
+ namespace: str,
+ resource_version: str | None = None,
+ timeout_seconds: int = 30,
+ ) -> AsyncGenerator[CoreV1Event]:
+ """
+ Watch pod events using Kubernetes Watch API.
+
+ :param name: Pod name to watch events for
+ :param namespace: Kubernetes namespace
+ :param resource_version: Only return events not older than this
resource version
+ :param timeout_seconds: Timeout in seconds for the watch stream
+ """
+ if self._event_polling_fallback:
+ async for event_polled in self.watch_pod_events_polling_fallback(
+ name, namespace, resource_version, timeout_seconds
+ ):
+ yield event_polled
+
+ try:
+ w = async_watch.Watch()
+ async with self.get_conn() as connection:
+ v1_api = async_client.CoreV1Api(connection)
+
+ async for event_watched in w.stream(
+ v1_api.list_namespaced_event,
+ namespace=namespace,
+ field_selector=f"involvedObject.name={name}",
+ resource_version=resource_version,
+ timeout_seconds=timeout_seconds,
+ ):
+ event: CoreV1Event = event_watched.get("object")
+ yield event
+
+ except async_client.exceptions.ApiException as e:
+ if hasattr(e, "status") and e.status == 403:
+ self.log.warning(
+ "Triggerer does not have Kubernetes API permission to
'watch' events: %s Falling back to polling.",
+ str(e),
+ )
+ self._event_polling_fallback = True
+ async for event_polled in
self.watch_pod_events_polling_fallback(
+ name, namespace, resource_version, timeout_seconds
+ ):
+ yield event_polled
+
+ finally:
+ w.stop()
+
+ async def watch_pod_events_polling_fallback(
+ self,
+ name: str,
+ namespace: str,
+ resource_version: str | None = None,
+ interval: int = 30,
+ ) -> AsyncGenerator[CoreV1Event]:
+ """
+ Fallback method to poll pod event at regular intervals.
+
+ This is required when the Airflow triggerer does not have permission
to watch events.
+
+ :param name: Pod name to watch events for
+ :param namespace: Kubernetes namespace
+ :param resource_version: Only return events not older than this
resource version
+ :param interval: Polling interval in seconds
+ """
+ events: CoreV1EventList = await self.get_pod_events(name, namespace,
resource_version)
+ for event in events.items:
+ yield event
+ await asyncio.sleep(interval)
+
@generic_api_retry
async def get_job_status(self, name: str, namespace: str) -> V1Job:
"""
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
index aac466ae59e..3337fc77490 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -630,14 +630,26 @@ class KubernetesPodOperator(BaseOperator):
try:
async def _await_pod_start():
- events_task = self.pod_manager.watch_pod_events(pod,
self.startup_check_interval_seconds)
- pod_start_task = self.pod_manager.await_pod_start(
- pod=pod,
- schedule_timeout=self.schedule_timeout_seconds,
- startup_timeout=self.startup_timeout_seconds,
- check_interval=self.startup_check_interval_seconds,
+ # Start event stream in background
+ events_task = asyncio.create_task(
+ self.pod_manager.watch_pod_events(pod,
self.startup_check_interval_seconds)
)
- await asyncio.gather(pod_start_task, events_task)
+
+ # Await pod start completion
+ try:
+ await self.pod_manager.await_pod_start(
+ pod=pod,
+ schedule_timeout=self.schedule_timeout_seconds,
+ startup_timeout=self.startup_timeout_seconds,
+ check_interval=self.startup_check_interval_seconds,
+ )
+ finally:
+ # Stop watching events
+ events_task.cancel()
+ try:
+ await events_task
+ except asyncio.CancelledError:
+ pass
asyncio.run(_await_pod_start())
except PodLaunchFailedException:
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
index 29c1a382a0b..5aa496a772a 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ -241,14 +241,25 @@ class KubernetesPodTrigger(BaseTrigger):
async def _wait_for_pod_start(self) -> ContainerState:
"""Loops until pod phase leaves ``PENDING`` If timeout is reached,
throws error."""
pod = await self._get_pod()
- events_task = self.pod_manager.watch_pod_events(pod,
self.startup_check_interval)
- pod_start_task = self.pod_manager.await_pod_start(
- pod=pod,
- schedule_timeout=self.schedule_timeout,
- startup_timeout=self.startup_timeout,
- check_interval=self.startup_check_interval,
- )
- await asyncio.gather(pod_start_task, events_task)
+ # Start event stream in background
+ events_task =
asyncio.create_task(self.pod_manager.watch_pod_events(pod,
self.startup_check_interval))
+
+ # Await pod start completion
+ try:
+ await self.pod_manager.await_pod_start(
+ pod=pod,
+ schedule_timeout=self.schedule_timeout,
+ startup_timeout=self.startup_timeout,
+ check_interval=self.startup_check_interval,
+ )
+ finally:
+ # Stop watching events
+ events_task.cancel()
+ try:
+ await events_task
+ except asyncio.CancelledError:
+ pass
+
return self.define_container_state(await self._get_pod())
async def _wait_for_container_completion(self) -> TriggerEvent:
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 667dfa02fcd..7fddb37bf34 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -56,6 +56,7 @@ from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.timezone import utcnow
if TYPE_CHECKING:
+ from kubernetes.client.models.core_v1_event import CoreV1Event
from kubernetes.client.models.core_v1_event_list import CoreV1EventList
from kubernetes.client.models.v1_container_state import V1ContainerState
from kubernetes.client.models.v1_container_state_waiting import
V1ContainerStateWaiting
@@ -94,34 +95,21 @@ def check_exception_is_kubernetes_api_unauthorized(exc:
BaseException):
return isinstance(exc, ApiException) and exc.status and str(exc.status) ==
"401"
-async def watch_pod_events(
- pod_manager: PodManager | AsyncPodManager,
- pod: V1Pod,
- check_interval: float = 1,
+def log_pod_event(
+ pod_manager: PodManager | AsyncPodManager, event: CoreV1Event,
seen_events: set[str]
) -> None:
"""
- Read pod events and write them to the log.
+ Log a pod event if not already seen.
- This function supports both asynchronous and synchronous pod managers.
-
- :param pod_manager: The pod manager instance (PodManager or
AsyncPodManager).
- :param pod: The pod object to monitor.
- :param check_interval: Interval (in seconds) between checks.
+ :param pod_manager: The pod manager instance for logging
+ :param event: Kubernetes event
+ :param seen_events: Set of event UIDs already logged to avoid duplicates
"""
- num_events = 0
- is_async = isinstance(pod_manager, AsyncPodManager)
- while not pod_manager.stop_watching_events:
- if is_async:
- events = await pod_manager.read_pod_events(pod)
- else:
- events = pod_manager.read_pod_events(pod)
- for new_event in events.items[num_events:]:
- involved_object: V1ObjectReference = new_event.involved_object
- pod_manager.log.info(
- "The Pod has an Event: %s from %s", new_event.message,
involved_object.field_path
- )
- num_events = len(events.items)
- await asyncio.sleep(check_interval)
+ event_uid = event.metadata.uid
+ if event_uid not in seen_events:
+ seen_events.add(event_uid)
+ involved_object: V1ObjectReference = event.involved_object
+ pod_manager.log.info("The Pod has an Event: %s from %s",
event.message, involved_object.field_path)
async def await_pod_start(
@@ -170,12 +158,14 @@ async def await_pod_start(
pod_manager.log.info("Waiting %ss to get the POD running...",
startup_timeout)
if time.time() - start_check_time >= startup_timeout:
+ pod_manager.stop_watching_events = True
pod_manager.log.info("::endgroup::")
raise PodLaunchTimeoutException(
f"Pod took too long to start. More than
{startup_timeout}s. Check the pod events in kubernetes."
)
else:
if time.time() - start_check_time >= schedule_timeout:
+ pod_manager.stop_watching_events = True
pod_manager.log.info("::endgroup::")
raise PodLaunchTimeoutException(
f"Pod took too long to be scheduled on the cluster, giving
up. More than {schedule_timeout}s. Check the pod events in kubernetes."
@@ -188,6 +178,7 @@ async def await_pod_start(
container_waiting: V1ContainerStateWaiting | None =
container_state.waiting
if container_waiting:
if container_waiting.reason in ["ErrImagePull",
"InvalidImageName"]:
+ pod_manager.stop_watching_events = True
pod_manager.log.info("::endgroup::")
raise PodLaunchFailedException(
f"Pod docker image cannot be pulled, unable to
start: {container_waiting.reason}"
@@ -354,9 +345,16 @@ class PodManager(LoggingMixin):
"""Launch the pod asynchronously."""
return self.run_pod_async(pod)
- async def watch_pod_events(self, pod: V1Pod, check_interval: int = 1) ->
None:
- """Read pod events and writes into log."""
- await watch_pod_events(pod_manager=self, pod=pod,
check_interval=check_interval)
+ async def watch_pod_events(self, pod: V1Pod, check_interval: float = 10)
-> None:
+ """Read pod events and write into log."""
+ resource_version = None
+ seen_events: set[str] = set()
+ while not self.stop_watching_events:
+ events = self.read_pod_events(pod, resource_version)
+ for event in events.items:
+ log_pod_event(self, event, seen_events)
+ resource_version = event.metadata.resource_version
+ await asyncio.sleep(check_interval)
async def await_pod_start(
self, pod: V1Pod, schedule_timeout: int = 120, startup_timeout: int =
120, check_interval: int = 1
@@ -772,11 +770,20 @@ class PodManager(LoggingMixin):
]
@generic_api_retry
- def read_pod_events(self, pod: V1Pod) -> CoreV1EventList:
- """Read events from the POD."""
+ def read_pod_events(self, pod: V1Pod, resource_version: str | None = None)
-> CoreV1EventList:
+ """
+ Read events from the POD with optimization parameters to reduce API
load.
+
+ :param pod: The pod to get events for
+ :param resource_version: Only return events newer than this resource
version
+ :param limit: Maximum number of events to return
+ """
try:
return self._client.list_namespaced_event(
- namespace=pod.metadata.namespace,
field_selector=f"involvedObject.name={pod.metadata.name}"
+ namespace=pod.metadata.namespace,
+ field_selector=f"involvedObject.name={pod.metadata.name}",
+ resource_version=resource_version,
+ resource_version_match="NotOlderThan" if resource_version else
None,
)
except HTTPError as e:
raise KubernetesApiException(f"There was an error reading the
kubernetes API: {e}")
@@ -978,16 +985,28 @@ class AsyncPodManager(LoggingMixin):
pod.metadata.namespace,
)
- async def read_pod_events(self, pod: V1Pod) -> CoreV1EventList:
+ async def read_pod_events(self, pod: V1Pod, resource_version: str | None =
None) -> CoreV1EventList:
"""Get pod's events."""
return await self._hook.get_pod_events(
pod.metadata.name,
pod.metadata.namespace,
+ resource_version=resource_version,
)
- async def watch_pod_events(self, pod: V1Pod, check_interval: float = 1) ->
None:
- """Read pod events and writes into log."""
- await watch_pod_events(pod_manager=self, pod=pod,
check_interval=check_interval)
+ async def watch_pod_events(self, pod: V1Pod, startup_check_interval: float
= 30) -> None:
+ """Watch pod events and write to log."""
+ seen_events: set[str] = set()
+ resource_version = None
+ while not self.stop_watching_events:
+ async for event in self._hook.watch_pod_events(
+ name=pod.metadata.name,
+ namespace=pod.metadata.namespace,
+ resource_version=resource_version,
+ timeout_seconds=startup_check_interval,
+ ):
+ if event:
+ log_pod_event(self, event, seen_events)
+ resource_version = event.metadata.resource_version
async def await_pod_start(
self, pod: V1Pod, schedule_timeout: int = 120, startup_timeout: int =
120, check_interval: float = 1
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/hooks/test_kubernetes.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/hooks/test_kubernetes.py
index 737c8c10672..ea0a64938c6 100644
---
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/hooks/test_kubernetes.py
+++
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/hooks/test_kubernetes.py
@@ -32,6 +32,7 @@ import yaml
from kubernetes.client import V1Deployment, V1DeploymentStatus
from kubernetes.client.rest import ApiException
from kubernetes.config import ConfigException
+from kubernetes_asyncio import client as async_client
from airflow.models import Connection
from airflow.providers.cncf.kubernetes.hooks.kubernetes import
AsyncKubernetesHook, KubernetesHook
@@ -1011,6 +1012,216 @@ class TestAsyncKubernetesHook:
with pytest.raises(AirflowException):
await hook._load_config()
+ @pytest.mark.asyncio
+ @mock.patch(KUBE_API.format("list_namespaced_event"))
+ async def test_async_get_pod_events_with_resource_version(
+ self, mock_list_namespaced_event, kube_config_loader
+ ):
+ """Test getting pod events with resource_version parameter."""
+ mock_event = mock.Mock()
+ mock_event.metadata.name = "test-event"
+ mock_events = mock.Mock()
+ mock_events.items = [mock_event]
+ mock_list_namespaced_event.return_value =
self.mock_await_result(mock_events)
+
+ hook = AsyncKubernetesHook(
+ conn_id=None,
+ in_cluster=False,
+ config_file=None,
+ cluster_context=None,
+ )
+
+ result = await hook.get_pod_events(name=POD_NAME, namespace=NAMESPACE,
resource_version="12345")
+
+ mock_list_namespaced_event.assert_called_once_with(
+ field_selector=f"involvedObject.name={POD_NAME}",
+ namespace=NAMESPACE,
+ resource_version="12345",
+ resource_version_match="NotOlderThan",
+ )
+ assert result == mock_events
+
+ @pytest.mark.asyncio
+ @mock.patch(KUBE_API.format("list_namespaced_event"))
+ async def test_async_get_pod_events_without_resource_version(
+ self, mock_list_namespaced_event, kube_config_loader
+ ):
+ """Test getting pod events without resource_version parameter."""
+ mock_event = mock.Mock()
+ mock_event.metadata.name = "test-event"
+ mock_events = mock.Mock()
+ mock_events.items = [mock_event]
+ mock_list_namespaced_event.return_value =
self.mock_await_result(mock_events)
+
+ hook = AsyncKubernetesHook(
+ conn_id=None,
+ in_cluster=False,
+ config_file=None,
+ cluster_context=None,
+ )
+
+ result = await hook.get_pod_events(name=POD_NAME, namespace=NAMESPACE)
+
+ mock_list_namespaced_event.assert_called_once_with(
+ field_selector=f"involvedObject.name={POD_NAME}",
+ namespace=NAMESPACE,
+ resource_version=None,
+ resource_version_match=None,
+ )
+ assert result == mock_events
+
+ @pytest.mark.asyncio
+ @mock.patch("kubernetes_asyncio.watch.Watch")
+ @mock.patch(KUBE_API.format("list_namespaced_event"))
+ async def test_async_watch_pod_events(
+ self, mock_list_namespaced_event, mock_watch_class, kube_config_loader
+ ):
+ """Test watching pod events using Watch API."""
+ mock_event1 = mock.Mock()
+ mock_event1.metadata.uid = "event-1"
+ mock_event2 = mock.Mock()
+ mock_event2.metadata.uid = "event-2"
+
+ async def async_generator(*_, **__):
+ yield {"object": mock_event1}
+ yield {"object": mock_event2}
+
+ mock_watch = mock.Mock()
+ mock_watch_class.return_value = mock_watch
+ mock_watch.stream = mock.Mock(side_effect=async_generator)
+
+ hook = AsyncKubernetesHook(
+ conn_id=None,
+ in_cluster=False,
+ config_file=None,
+ cluster_context=None,
+ )
+
+ events = []
+ async for event in hook.watch_pod_events(
+ name=POD_NAME, namespace=NAMESPACE, resource_version="12345",
timeout_seconds=30
+ ):
+ events.append(event)
+
+ assert len(events) == 2
+ assert events[0] == mock_event1
+ assert events[1] == mock_event2
+ mock_watch.stop.assert_called_once()
+
+ @pytest.mark.asyncio
+ @mock.patch("kubernetes_asyncio.watch.Watch")
+ @mock.patch(KUBE_API.format("list_namespaced_event"))
+ async def test_async_watch_pod_events_permission_error_fallback(
+ self, mock_list_namespaced_event, mock_watch_class, kube_config_loader
+ ):
+ """Test fallback to polling when watch permission is denied."""
+
+ # Simulate permission error on watch
+ async def async_generator_with_error(*_, **__):
+ raise async_client.exceptions.ApiException(status=403)
+ yield
+
+ mock_watch = mock.Mock()
+ mock_watch_class.return_value = mock_watch
+ mock_watch.stream = mock.Mock(side_effect=async_generator_with_error)
+
+ # Setup fallback polling
+ mock_event = mock.Mock()
+ mock_event.metadata.uid = "event-1"
+ mock_events = mock.Mock()
+ mock_events.items = [mock_event]
+ mock_list_namespaced_event.return_value =
self.mock_await_result(mock_events)
+
+ hook = AsyncKubernetesHook(
+ conn_id=None,
+ in_cluster=False,
+ config_file=None,
+ cluster_context=None,
+ )
+
+ events = []
+ async for event in hook.watch_pod_events(
+ name=POD_NAME, namespace=NAMESPACE, resource_version="12345",
timeout_seconds=30
+ ):
+ events.append(event)
+ break
+
+ assert len(events) == 1
+ assert events[0] == mock_event
+ assert hook._event_polling_fallback is True
+
+ @pytest.mark.asyncio
+ @mock.patch(KUBE_API.format("list_namespaced_event"))
+ @mock.patch("asyncio.sleep", new_callable=mock.AsyncMock)
+ async def test_async_watch_pod_events_polling_fallback(
+ self, mock_sleep, mock_list_namespaced_event, kube_config_loader
+ ):
+ """Test polling fallback method."""
+ mock_event1 = mock.Mock()
+ mock_event1.metadata.uid = "event-1"
+ mock_event2 = mock.Mock()
+ mock_event2.metadata.uid = "event-2"
+ mock_events = mock.Mock()
+ mock_events.items = [mock_event1, mock_event2]
+ mock_list_namespaced_event.return_value =
self.mock_await_result(mock_events)
+
+ hook = AsyncKubernetesHook(
+ conn_id=None,
+ in_cluster=False,
+ config_file=None,
+ cluster_context=None,
+ )
+
+ events = []
+ async for event in hook.watch_pod_events_polling_fallback(
+ name=POD_NAME, namespace=NAMESPACE, resource_version="12345",
interval=10
+ ):
+ events.append(event)
+
+ assert len(events) == 2
+ assert events[0] == mock_event1
+ assert events[1] == mock_event2
+ mock_list_namespaced_event.assert_called_once_with(
+ field_selector=f"involvedObject.name={POD_NAME}",
+ namespace=NAMESPACE,
+ resource_version="12345",
+ resource_version_match="NotOlderThan",
+ )
+ mock_sleep.assert_called_once_with(10)
+
+ @pytest.mark.asyncio
+ @mock.patch("kubernetes_asyncio.watch.Watch")
+ @mock.patch(KUBE_API.format("list_namespaced_event"))
+ async def test_async_watch_pod_events_uses_fallback_if_already_set(
+ self, mock_list_namespaced_event, mock_watch_class, kube_config_loader
+ ):
+ """Test that watch uses polling fallback if flag is already set."""
+ hook = AsyncKubernetesHook(
+ conn_id=None,
+ in_cluster=False,
+ config_file=None,
+ cluster_context=None,
+ )
+
+ hook._event_polling_fallback = True
+
+ mock_event = mock.Mock()
+ mock_event.metadata.uid = "event-1"
+ mock_events = mock.Mock()
+ mock_events.items = [mock_event]
+ mock_list_namespaced_event.return_value =
self.mock_await_result(mock_events)
+
+ events = []
+ async for event in hook.watch_pod_events(name=POD_NAME,
namespace=NAMESPACE, timeout_seconds=30):
+ events.append(event)
+ break
+
+ # Watch API should not be called
+ mock_watch_class.assert_not_called()
+ # Polling should be used
+ assert len(events) == 1
+ assert events[0] == mock_event
+
@pytest.mark.asyncio
@mock.patch(KUBE_API.format("read_namespaced_pod"))
async def test_get_pod(self, lib_method, kube_config_loader):
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
index 406d7f9d02c..f2ce3b00dc3 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
@@ -358,16 +358,30 @@ class TestKubernetesPodTrigger:
@mock.patch(f"{TRIGGER_PATH}.define_container_state")
@mock.patch(f"{TRIGGER_PATH}.hook")
async def test_run_loop_read_events_during_start(self, mock_hook,
mock_method, trigger):
- event1 = mock.AsyncMock()
+ event1 = mock.Mock()
+ event1.metadata.uid = "event-uid-1"
+ event1.metadata.resource_version = "100"
event1.message = "event 1"
event1.involved_object.field_path = "object 1"
- event2 = mock.AsyncMock()
+ event2 = mock.Mock()
+ event2.metadata.uid = "event-uid-2"
+ event2.metadata.resource_version = "101"
event2.message = "event 2"
event2.involved_object.field_path = "object 2"
- events_list = mock.AsyncMock()
- events_list.items = [event1, event2]
- mock_hook.get_pod_events = mock.AsyncMock(return_value=events_list)
+ call_count = 0
+
+ async def async_event_generator(*_, **__):
+ nonlocal call_count
+ call_count += 1
+ if call_count == 1:
+ # First call: return events
+ yield event1
+ yield event2
+ # Subsequent calls: return nothing and stop watching
+ trigger.pod_manager.stop_watching_events = True
+
+ mock_hook.watch_pod_events =
mock.Mock(side_effect=async_event_generator)
pod_pending = mock.MagicMock()
pod_pending.status.phase = PodPhase.PENDING
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
index 5bbc08643a3..6fb8b1e10e7 100644
---
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
+++
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
@@ -35,6 +35,7 @@ from airflow.providers.cncf.kubernetes.utils.pod_manager
import (
PodLogsConsumer,
PodManager,
PodPhase,
+ log_pod_event,
parse_log_line,
)
from airflow.providers.common.compat.sdk import AirflowException
@@ -58,6 +59,65 @@ def test_parse_log_line():
assert line == log_message
+def test_log_pod_event():
+ """Test logging a pod event."""
+ mock_pod_manager = mock.Mock()
+ mock_event = mock.Mock()
+ mock_event.metadata.uid = "event-uid-1"
+ mock_event.message = "Test event message"
+ mock_event.involved_object.field_path = "Test field path"
+
+ seen_events = set()
+
+ log_pod_event(mock_pod_manager, mock_event, seen_events)
+
+ assert "event-uid-1" in seen_events
+ mock_pod_manager.log.info.assert_called_once_with(
+ "The Pod has an Event: %s from %s", "Test event message", "Test field
path"
+ )
+
+
+def test_log_pod_event_skips_duplicate():
+ """Test that duplicate events are skipped."""
+ mock_pod_manager = mock.Mock()
+ mock_event = mock.Mock()
+ mock_event.metadata.uid = "event-uid-1"
+ mock_event.message = "Test event message"
+
+ seen_events = {"event-uid-1"} # Event already seen
+
+ log_pod_event(mock_pod_manager, mock_event, seen_events)
+
+ assert "event-uid-1" in seen_events
+ mock_pod_manager.log.info.assert_not_called()
+
+
+def test_log_pod_event_multiple_events():
+ """Test logging multiple different events."""
+ mock_pod_manager = mock.Mock()
+ seen_events = set()
+
+ # First event
+ mock_event1 = mock.Mock()
+ mock_event1.metadata.uid = "event-uid-1"
+ mock_event1.message = "First message"
+ mock_event1.involved_object.field_path = "Test field path 1"
+
+ log_pod_event(mock_pod_manager, mock_event1, seen_events)
+ assert "event-uid-1" in seen_events
+
+ # Second event
+ mock_event2 = mock.Mock()
+ mock_event2.metadata.uid = "event-uid-2"
+ mock_event2.message = "Second message"
+ mock_event2.involved_object.field_path = "Test field path 2"
+
+ log_pod_event(mock_pod_manager, mock_event2, seen_events)
+ assert "event-uid-2" in seen_events
+ assert len(seen_events) == 2
+ assert mock_pod_manager.log.info.call_count == 2
+
+
class TestPodManager:
def setup_method(self):
self.mock_kube_client = mock.Mock()
@@ -183,7 +243,7 @@ class TestPodManager:
events.items.append(event)
startup_check_interval = 10
- def mock_read_pod_events(pod):
+ def mock_read_pod_events(*_, **__):
self.pod_manager.stop_watching_events = True
return events
@@ -210,6 +270,130 @@ class TestPodManager:
events = self.pod_manager.read_pod_events(mock.sentinel)
assert mock.sentinel.events == events
+ def test_read_pod_events_with_resource_version(self):
+ """Test reading pod events with resource_version parameter."""
+ mock_pod = mock.Mock()
+ mock_pod.metadata.namespace = "test-namespace"
+ mock_pod.metadata.name = "test-pod"
+ mock_events = mock.Mock()
+ self.mock_kube_client.list_namespaced_event.return_value = mock_events
+
+ events = self.pod_manager.read_pod_events(mock_pod,
resource_version="12345")
+
+ assert events == mock_events
+ self.mock_kube_client.list_namespaced_event.assert_called_once_with(
+ namespace="test-namespace",
+ field_selector="involvedObject.name=test-pod",
+ resource_version="12345",
+ resource_version_match="NotOlderThan",
+ )
+
+ def test_read_pod_events_without_resource_version(self):
+ """Test reading pod events without resource_version parameter."""
+ mock_pod = mock.Mock()
+ mock_pod.metadata.namespace = "test-namespace"
+ mock_pod.metadata.name = "test-pod"
+ mock_events = mock.Mock()
+ self.mock_kube_client.list_namespaced_event.return_value = mock_events
+
+ events = self.pod_manager.read_pod_events(mock_pod)
+
+ assert events == mock_events
+ self.mock_kube_client.list_namespaced_event.assert_called_once_with(
+ namespace="test-namespace",
+ field_selector="involvedObject.name=test-pod",
+ resource_version=None,
+ resource_version_match=None,
+ )
+
+ @pytest.mark.asyncio
+ @mock.patch("asyncio.sleep", new_callable=mock.AsyncMock)
+ async def test_watch_pod_events_tracks_resource_version(self, mock_sleep):
+ """Test that watch_pod_events tracks resource version."""
+ mock_pod = mock.Mock()
+ mock_pod.metadata.namespace = "test-namespace"
+ mock_pod.metadata.name = "test-pod"
+
+ mock_event_1 = mock.Mock()
+ mock_event_1.metadata.uid = "event-uid-1"
+ mock_event_1.metadata.resource_version = "100"
+ mock_event_1.message = "Event 1"
+ mock_event_1.involved_object.field_path = "spec"
+
+ mock_events_1 = mock.Mock()
+ mock_events_1.items = [mock_event_1]
+
+ mock_event_2 = mock.Mock()
+ mock_event_2.metadata.uid = "event-uid-2"
+ mock_event_2.metadata.resource_version = "101"
+ mock_event_2.message = "Event 2"
+ mock_event_2.involved_object.field_path = "spec"
+
+ mock_events_2 = mock.Mock()
+ mock_events_2.items = [mock_event_2]
+
+ self.mock_kube_client.list_namespaced_event.side_effect =
[mock_events_1, mock_events_2]
+ self.pod_manager.stop_watching_events = False
+
+ call_count = 0
+
+ async def side_effect_sleep(*_, **__):
+ nonlocal call_count
+ call_count += 1
+ if call_count >= 2:
+ self.pod_manager.stop_watching_events = True
+
+ mock_sleep.side_effect = side_effect_sleep
+
+ await self.pod_manager.watch_pod_events(mock_pod, check_interval=1)
+
+ # Check that resource_version was passed in second call
+ calls = self.mock_kube_client.list_namespaced_event.call_args_list
+ assert len(calls) == 2
+ # First call should have no resource_version
+ assert calls[0][1]["resource_version"] is None
+ # Second call should use resource_version from first event
+ assert calls[1][1]["resource_version"] == "100"
+
+ @pytest.mark.asyncio
+ @mock.patch("asyncio.sleep", new_callable=mock.AsyncMock)
+ async def test_watch_pod_events_deduplicates_events(self, mock_sleep):
+ """Test that watch_pod_events deduplicates events."""
+ mock_pod = mock.Mock()
+ mock_pod.metadata.namespace = "test-namespace"
+ mock_pod.metadata.name = "test-pod"
+
+ mock_event = mock.Mock()
+ mock_event.metadata.uid = "event-uid-1"
+ mock_event.metadata.resource_version = "100"
+ mock_event.message = "Duplicate event"
+ mock_event.involved_object.field_path = "spec"
+
+ mock_events = mock.Mock()
+ mock_events.items = [mock_event]
+
+ # Will return the same event on each invocation
+ self.mock_kube_client.list_namespaced_event.return_value = mock_events
+ self.pod_manager.stop_watching_events = False
+
+ call_count = 0
+
+ async def side_effect_sleep(*_, **__):
+ nonlocal call_count
+ call_count += 1
+ if call_count >= 2:
+ # Stop after 2 iterations -> same event is returned 2 times
+ self.pod_manager.stop_watching_events = True
+
+ mock_sleep.side_effect = side_effect_sleep
+
+ with mock.patch.object(self.pod_manager.log, "info") as mock_log_info:
+ await self.pod_manager.watch_pod_events(mock_pod, check_interval=1)
+
+ # Event should only be logged once despite being returned twice
+ assert mock_log_info.call_count == 1
+ mock_log_info.assert_called_with("The Pod has an Event: %s from
%s", "Duplicate event", "spec")
+
def test_read_pod_events_retries_successfully(self):
mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.list_namespaced_event.side_effect = [
@@ -223,10 +407,14 @@ class TestPodManager:
mock.call(
namespace=mock.sentinel.metadata.namespace,
field_selector=f"involvedObject.name={mock.sentinel.metadata.name}",
+ resource_version=None,
+ resource_version_match=None,
),
mock.call(
namespace=mock.sentinel.metadata.namespace,
field_selector=f"involvedObject.name={mock.sentinel.metadata.name}",
+ resource_version=None,
+ resource_version_match=None,
),
]
)
@@ -730,6 +918,178 @@ class TestAsyncPodManager:
callbacks=[],
)
+ @pytest.mark.asyncio
+ async def test_read_pod_events_with_resource_version(self):
+ """Test async read_pod_events with resource_version parameter."""
+ mock_pod = mock.Mock()
+ mock_pod.metadata.namespace = "test-namespace"
+ mock_pod.metadata.name = "test-pod"
+ mock_events = mock.Mock()
+
+ self.mock_async_hook.get_pod_events.return_value = mock_events
+
+ result = await self.async_pod_manager.read_pod_events(mock_pod,
resource_version="12345")
+
+ assert result == mock_events
+ self.mock_async_hook.get_pod_events.assert_called_once_with(
+ "test-pod", "test-namespace", resource_version="12345"
+ )
+
+ @pytest.mark.asyncio
+ async def test_read_pod_events_without_resource_version(self):
+ """Test async read_pod_events without resource_version parameter."""
+ mock_pod = mock.Mock()
+ mock_pod.metadata.namespace = "test-namespace"
+ mock_pod.metadata.name = "test-pod"
+ mock_events = mock.Mock()
+
+ self.mock_async_hook.get_pod_events.return_value = mock_events
+
+ result = await self.async_pod_manager.read_pod_events(mock_pod)
+
+ assert result == mock_events
+ self.mock_async_hook.get_pod_events.assert_called_once_with(
+ "test-pod", "test-namespace", resource_version=None
+ )
+
+ @pytest.mark.asyncio
+ async def test_watch_pod_events_uses_hook_watch(self):
+ """Test that watch_pod_events uses hook's watch_pod_events method."""
+ mock_pod = mock.Mock()
+ mock_pod.metadata.namespace = "test-namespace"
+ mock_pod.metadata.name = "test-pod"
+
+ mock_event1 = mock.Mock()
+ mock_event1.metadata.uid = "event-uid-1"
+ mock_event1.metadata.resource_version = "100"
+ mock_event1.message = "Event 1"
+ mock_event1.involved_object.field_path = "spec"
+
+ mock_event2 = mock.Mock()
+ mock_event2.metadata.uid = "event-uid-2"
+ mock_event2.metadata.resource_version = "101"
+ mock_event2.message = "Event 2"
+ mock_event2.involved_object.field_path = "spec"
+
+ async def async_event_generator(*_, **__):
+ yield mock_event1
+ yield mock_event2
+ self.async_pod_manager.stop_watching_events = True
+
+ self.mock_async_hook.watch_pod_events =
mock.Mock(side_effect=async_event_generator)
+
+ with mock.patch.object(self.async_pod_manager.log, "info") as
mock_log_info:
+ await self.async_pod_manager.watch_pod_events(mock_pod,
startup_check_interval=30)
+
+ # Both events should be logged
+ assert mock_log_info.call_count == 2
+ calls = mock_log_info.call_args_list
+ assert calls[0][0] == ("The Pod has an Event: %s from %s", "Event
1", "spec")
+ assert calls[1][0] == ("The Pod has an Event: %s from %s", "Event
2", "spec")
+
+ # Verify hook was called
+ self.mock_async_hook.watch_pod_events.assert_called()
+
+ @pytest.mark.asyncio
+ async def test_watch_pod_events_tracks_resource_version(self):
+ """Test that watch_pod_events tracks and updates resource version."""
+ mock_pod = mock.Mock()
+ mock_pod.metadata.namespace = "test-namespace"
+ mock_pod.metadata.name = "test-pod"
+
+ # Create events for two iterations
+ mock_event1 = mock.Mock()
+ mock_event1.metadata.uid = "event-uid-1"
+ mock_event1.metadata.resource_version = "100"
+ mock_event1.message = "Event 1"
+ mock_event1.involved_object.field_path = "spec"
+
+ mock_event2 = mock.Mock()
+ mock_event2.metadata.uid = "event-uid-2"
+ mock_event2.metadata.resource_version = "101"
+ mock_event2.message = "Event 2"
+ mock_event2.involved_object.field_path = "spec"
+
+ call_count = 0
+
+ async def async_event_generator(*_, **__):
+ nonlocal call_count
+ call_count += 1
+ if call_count == 1:
+ # First iteration
+ yield mock_event1
+ else:
+ # Second iteration
+ yield mock_event2
+ self.async_pod_manager.stop_watching_events = True
+
+ self.mock_async_hook.watch_pod_events =
mock.Mock(side_effect=async_event_generator)
+ self.async_pod_manager.stop_watching_events = False
+
+ await self.async_pod_manager.watch_pod_events(mock_pod,
startup_check_interval=30)
+
+ # Verify hook was called twice with updated resource_version
+ assert self.mock_async_hook.watch_pod_events.call_count == 2
+ calls = self.mock_async_hook.watch_pod_events.call_args_list
+
+ # First call should have no resource_version
+ assert calls[0][1]["resource_version"] is None
+ # Second call should use resource_version from first event
+ assert calls[1][1]["resource_version"] == "100"
+
+ @pytest.mark.asyncio
+ async def test_watch_pod_events_deduplicates_events(self):
+ """Test that watch_pod_events deduplicates events across iterations."""
+ mock_pod = mock.Mock()
+ mock_pod.metadata.namespace = "test-namespace"
+ mock_pod.metadata.name = "test-pod"
+
+ # Same event returned in two iterations
+ mock_event = mock.Mock()
+ mock_event.metadata.uid = "event-uid-1"
+ mock_event.metadata.resource_version = "100"
+ mock_event.message = "Duplicate event"
+ mock_event.involved_object.field_path = "spec"
+
+ call_count = 0
+
+ async def async_event_generator(*_, **__):
+ nonlocal call_count
+ call_count += 1
+ yield mock_event # Return same event
+ if call_count >= 2:
+ self.async_pod_manager.stop_watching_events = True
+
+ self.mock_async_hook.watch_pod_events =
mock.Mock(side_effect=async_event_generator)
+ self.async_pod_manager.stop_watching_events = False
+
+ with mock.patch.object(self.async_pod_manager.log, "info") as
mock_log_info:
+ await self.async_pod_manager.watch_pod_events(mock_pod,
startup_check_interval=30)
+
+ # Event should only be logged once despite being returned twice
+ assert mock_log_info.call_count == 1
+ mock_log_info.assert_called_with("The Pod has an Event: %s from
%s", "Duplicate event", "spec")
+
+ @pytest.mark.asyncio
+ async def test_watch_pod_events_handles_none_event(self):
+ """Test that watch_pod_events handles None events gracefully."""
+ mock_pod = mock.Mock()
+ mock_pod.metadata.namespace = "test-namespace"
+ mock_pod.metadata.name = "test-pod"
+
+ async def async_event_generator(*_, **__):
+ yield None # None event should be skipped
+ self.async_pod_manager.stop_watching_events = True
+
+ self.mock_async_hook.watch_pod_events =
mock.Mock(side_effect=async_event_generator)
+ self.async_pod_manager.stop_watching_events = False
+
+ with mock.patch.object(self.async_pod_manager.log, "info") as
mock_log_info:
+ await self.async_pod_manager.watch_pod_events(mock_pod,
startup_check_interval=30)
+
+ # No events should be logged for None
+ mock_log_info.assert_not_called()
+
@pytest.mark.asyncio
async def
test_start_pod_raises_informative_error_on_scheduled_timeout(self):
pod_response = mock.MagicMock()
@@ -844,16 +1204,18 @@ class TestAsyncPodManager:
events.items.append(event)
startup_check_interval = 10
- def get_pod_events_side_effect(name, namespace):
+ async def watch_events_generator(*_, **__):
+ for event in events.items:
+ yield event
self.async_pod_manager.stop_watching_events = True
- return events
- self.mock_async_hook.get_pod_events.side_effect =
get_pod_events_side_effect
+ self.mock_async_hook.watch_pod_events =
mock.Mock(side_effect=watch_events_generator)
- await self.async_pod_manager.watch_pod_events(pod=mock_pod,
check_interval=startup_check_interval)
+ await self.async_pod_manager.watch_pod_events(
+ pod=mock_pod, startup_check_interval=startup_check_interval
+ )
mock_log_info.assert_any_call("The Pod has an Event: %s from %s",
"test event 1", "object event 1")
mock_log_info.assert_any_call("The Pod has an Event: %s from %s",
"test event 2", "object event 2")
- mock_time_sleep.assert_called_once_with(startup_check_interval)
@pytest.mark.asyncio
@pytest.mark.parametrize(