This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 52867d9cc4b AsyncKubernetesHook.watch_pod_events could silently stop 
emitting events when (#60532)
52867d9cc4b is described below

commit 52867d9cc4bbda2223162e01e7dd818327a69efc
Author: SameerMesiah97 <[email protected]>
AuthorDate: Mon Jan 26 19:30:34 2026 +0000

    AsyncKubernetesHook.watch_pod_events could silently stop emitting events 
when (#60532)
    
    the Kubernetes watch stream ended (e.g. due to timeout_seconds), even while
    the pod was still running.
    
    This change reconnects on watch termination, resumes from the last observed
    resourceVersion, restarts on stale resourceVersion errors (410), and stops
    only when the pod completes or is deleted. Permission-denied watches still 
fall
    back to polling.
    
    As part of this fix, kubeconfig loading is cached and _load_config no longer
    returns an API client, clarifying its responsibility and avoiding repeated
    config loading during watch reconnects.
    
    Tests cover reconnection behavior, stale resourceVersion recovery, and clean
    termination on pod completion or deletion.
    
    Co-authored-by: Sameer Mesiah <[email protected]>
---
 .../providers/cncf/kubernetes/hooks/kubernetes.py  | 168 +++++++++++-----
 .../unit/cncf/kubernetes/hooks/test_kubernetes.py  | 217 ++++++++++++++++++++-
 2 files changed, 331 insertions(+), 54 deletions(-)

diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
index a76e9390db6..fa33fe44951 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
@@ -816,48 +816,54 @@ class AsyncKubernetesHook(KubernetesHook):
         self.config_dict = config_dict
         self._extras: dict | None = connection_extras
         self._event_polling_fallback = False
+        self._config_loaded = False
 
     async def _load_config(self):
-        """Return Kubernetes API session for use with requests."""
+        """Load Kubernetes configuration once per hook instance."""
+        if self._config_loaded:
+            return
+
         in_cluster = self._coalesce_param(self.in_cluster, await 
self._get_field("in_cluster"))
         cluster_context = self._coalesce_param(self.cluster_context, await 
self._get_field("cluster_context"))
         kubeconfig_path = await self._get_field("kube_config_path")
         kubeconfig = await self._get_field("kube_config")
+
         num_selected_configuration = sum(
             1 for o in [in_cluster, kubeconfig, kubeconfig_path, 
self.config_dict] if o
         )
 
-        async def api_client_from_kubeconfig_file(_kubeconfig_path: str | 
None):
-            await async_config.load_kube_config(
-                config_file=_kubeconfig_path,
-                client_configuration=self.client_configuration,
-                context=cluster_context,
-            )
-            return _TimeoutAsyncK8sApiClient()
-
         if num_selected_configuration > 1:
             raise AirflowException(
                 "Invalid connection configuration. Options kube_config_path, "
-                "kube_config, in_cluster are mutually exclusive. "
+                "kube_config, in_cluster, and config_dict are mutually 
exclusive. "
                 "You can only use one option at a time."
             )
 
         if in_cluster:
             self.log.debug(LOADING_KUBE_CONFIG_FILE_RESOURCE.format("within a 
pod"))
-            self._is_in_cluster = True
             async_config.load_incluster_config()
-            return _TimeoutAsyncK8sApiClient()
+            self._is_in_cluster = True
+            self._config_loaded = True
+            return
+
+        # If above block does not return, we are not in a cluster.
+        self._is_in_cluster = False
 
         if self.config_dict:
             self.log.debug(LOADING_KUBE_CONFIG_FILE_RESOURCE.format("config 
dictionary"))
-            self._is_in_cluster = False
             await async_config.load_kube_config_from_dict(self.config_dict, 
context=cluster_context)
-            return _TimeoutAsyncK8sApiClient()
+            self._config_loaded = True
+            return
 
         if kubeconfig_path is not None:
             self.log.debug("loading kube_config from: %s", kubeconfig_path)
-            self._is_in_cluster = False
-            return await api_client_from_kubeconfig_file(kubeconfig_path)
+            await async_config.load_kube_config(
+                config_file=kubeconfig_path,
+                client_configuration=self.client_configuration,
+                context=cluster_context,
+            )
+            self._config_loaded = True
+            return
 
         if kubeconfig is not None:
             async with aiofiles.tempfile.NamedTemporaryFile() as temp_config:
@@ -874,13 +880,21 @@ class AsyncKubernetesHook(KubernetesHook):
                     kubeconfig = json.dumps(kubeconfig)
                 await temp_config.write(kubeconfig.encode())
                 await temp_config.flush()
-                self._is_in_cluster = False
-                return await api_client_from_kubeconfig_file(temp_config.name)
+
+                await async_config.load_kube_config(
+                    config_file=temp_config.name,
+                    client_configuration=self.client_configuration,
+                    context=cluster_context,
+                )
+                self._config_loaded = True
+                return
+
         self.log.debug(LOADING_KUBE_CONFIG_FILE_RESOURCE.format("default 
configuration file"))
         await async_config.load_kube_config(
             client_configuration=self.client_configuration,
             context=cluster_context,
         )
+        self._config_loaded = True
 
     async def get_conn_extras(self) -> dict:
         if self._extras is None:
@@ -907,7 +921,8 @@ class AsyncKubernetesHook(KubernetesHook):
     async def get_conn(self) -> AsyncGenerator[async_client.ApiClient, None]:
         kube_client = None
         try:
-            kube_client = await self._load_config() or 
_TimeoutAsyncK8sApiClient()
+            await self._load_config()
+            kube_client = _TimeoutAsyncK8sApiClient()
             yield kube_client
         finally:
             if kube_client is not None:
@@ -1021,48 +1036,101 @@ class AsyncKubernetesHook(KubernetesHook):
         timeout_seconds: int = 30,
     ) -> AsyncGenerator[CoreV1Event]:
         """
-        Watch pod events using Kubernetes Watch API.
+        Watch Kubernetes events for a pod.
+
+        Reconnects on watch termination and resumes from the last observed
+        resourceVersion. The watch stops when the pod is terminal or deleted,
+        and falls back to polling if watch access is denied.
 
         :param name: Pod name to watch events for
         :param namespace: Kubernetes namespace
         :param resource_version: Only return events not older than this 
resource version
         :param timeout_seconds: Timeout in seconds for the watch stream. A 
small additional buffer may be applied internally.
+                                This does not limit the total duration of 
event streaming.
         """
-        if self._event_polling_fallback:
-            async for event_polled in self.watch_pod_events_polling_fallback(
-                name, namespace, resource_version, timeout_seconds
-            ):
-                yield event_polled
-
-        try:
-            w = async_watch.Watch()
-            async with self.get_conn() as connection:
-                v1_api = async_client.CoreV1Api(connection)
+        last_rv = resource_version
 
-                async for event_watched in w.stream(
-                    v1_api.list_namespaced_event,
-                    namespace=namespace,
-                    field_selector=f"involvedObject.name={name}",
-                    resource_version=resource_version,
-                    timeout_seconds=timeout_seconds,
-                ):
-                    event: CoreV1Event = event_watched.get("object")
-                    yield event
-
-        except async_client.exceptions.ApiException as e:
-            if hasattr(e, "status") and e.status == 403:
-                self.log.warning(
-                    "Triggerer does not have Kubernetes API permission to 
'watch' events: %s Falling back to polling.",
-                    str(e),
-                )
-                self._event_polling_fallback = True
+        while True:
+            # If watch is known to be unavailable, use polling fallback
+            if self._event_polling_fallback:
                 async for event_polled in 
self.watch_pod_events_polling_fallback(
-                    name, namespace, resource_version, timeout_seconds
+                    name, namespace, last_rv, timeout_seconds
                 ):
                     yield event_polled
+                return
 
-        finally:
-            w.stop()
+            # Watch may not be created if pod inspection triggers early return.
+            w = None
+
+            try:
+                # Pod lifecycle is authoritative; events alone are not.
+                pod = await self.get_pod(name=name, namespace=namespace)
+                if pod.status and pod.status.phase in ("Succeeded", "Failed"):
+                    self.log.info(
+                        "Pod '%s' reached terminal phase '%s'; stopping event 
watch",
+                        name,
+                        pod.status.phase,
+                    )
+                    return
+
+                w = async_watch.Watch()
+                async with self.get_conn() as connection:
+                    v1_api = async_client.CoreV1Api(connection)
+
+                    async for event_watched in w.stream(
+                        v1_api.list_namespaced_event,
+                        namespace=namespace,
+                        field_selector=f"involvedObject.name={name}",
+                        resource_version=last_rv,
+                        timeout_seconds=timeout_seconds,
+                    ):
+                        event = event_watched.get("object")
+                        if not event or not event.metadata:
+                            continue
+
+                        if event.metadata.resource_version:
+                            last_rv = event.metadata.resource_version
+
+                        yield event
+
+            # Never swallow cancellation.
+            except asyncio.CancelledError:
+                raise
+
+            except async_client.exceptions.ApiException as e:
+                status = getattr(e, "status", None)
+
+                if status == 403:
+                    # Permanently fall back to polling when watch is not 
permitted.
+                    self.log.warning(
+                        "Kubernetes API does not permit watching events; 
falling back to polling: %s",
+                        str(e),
+                    )
+                    self._event_polling_fallback = True
+                    continue
+
+                if status == 404:
+                    # Terminate the watch if pod no longer exists.
+                    self.log.info("Pod '%s' no longer exists; stopping event 
watch", name)
+                    return
+
+                if status == 410:
+                    # Restart watch from current state if resourceVersion is 
too old.
+                    self.log.info(
+                        "resourceVersion too old while watching pod '%s'; 
restarting watch",
+                        name,
+                    )
+                    last_rv = None
+                    continue
+
+                # Other API errors are either transient or 
configuration/programming errors.
+                # Re-raise so generic_api_retry can apply centralized 
retry/backoff for
+                # transient failures, and fail fast for non-retryable ones.
+                raise
+
+            finally:
+                if w is not None:
+                    w.stop()
 
     async def watch_pod_events_polling_fallback(
         self,
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/hooks/test_kubernetes.py 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/hooks/test_kubernetes.py
index d1fac787049..0f51a6b6170 100644
--- 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/hooks/test_kubernetes.py
+++ 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/hooks/test_kubernetes.py
@@ -1149,10 +1149,11 @@ class TestAsyncKubernetesHook:
         assert result == mock_events
 
     @pytest.mark.asyncio
+    
@mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.AsyncKubernetesHook.get_pod")
     @mock.patch("kubernetes_asyncio.watch.Watch")
     @mock.patch(KUBE_API.format("list_namespaced_event"))
     async def test_async_watch_pod_events(
-        self, mock_list_namespaced_event, mock_watch_class, kube_config_loader
+        self, mock_list_namespaced_event, mock_watch_class, mock_get_pod, 
kube_config_loader
     ):
         """Test watching pod events using Watch API."""
         mock_event1 = mock.Mock()
@@ -1168,6 +1169,10 @@ class TestAsyncKubernetesHook:
         mock_watch_class.return_value = mock_watch
         mock_watch.stream = mock.Mock(side_effect=async_generator)
 
+        mock_pod = mock.MagicMock()
+        mock_pod.status.phase = "Running"
+        mock_get_pod.return_value = mock_pod
+
         hook = AsyncKubernetesHook(
             conn_id=None,
             in_cluster=False,
@@ -1176,10 +1181,16 @@ class TestAsyncKubernetesHook:
         )
 
         events = []
-        async for event in hook.watch_pod_events(
+        async_event_generator = hook.watch_pod_events(
             name=POD_NAME, namespace=NAMESPACE, resource_version="12345", 
timeout_seconds=30
-        ):
+        )
+
+        async for event in async_event_generator:
             events.append(event)
+            if len(events) == 2:
+                break
+
+        await async_event_generator.aclose()
 
         assert len(events) == 2
         assert events[0] == mock_event1
@@ -1187,10 +1198,11 @@ class TestAsyncKubernetesHook:
         mock_watch.stop.assert_called_once()
 
     @pytest.mark.asyncio
+    
@mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.AsyncKubernetesHook.get_pod")
     @mock.patch("kubernetes_asyncio.watch.Watch")
     @mock.patch(KUBE_API.format("list_namespaced_event"))
     async def test_async_watch_pod_events_permission_error_fallback(
-        self, mock_list_namespaced_event, mock_watch_class, kube_config_loader
+        self, mock_list_namespaced_event, mock_watch_class, mock_get_pod, 
kube_config_loader
     ):
         """Test fallback to polling when watch permission is denied."""
 
@@ -1210,6 +1222,10 @@ class TestAsyncKubernetesHook:
         mock_events.items = [mock_event]
         mock_list_namespaced_event.return_value = 
self.mock_await_result(mock_events)
 
+        mock_pod = mock.MagicMock()
+        mock_pod.status.phase = "Running"
+        mock_get_pod.return_value = mock_pod
+
         hook = AsyncKubernetesHook(
             conn_id=None,
             in_cluster=False,
@@ -1300,6 +1316,199 @@ class TestAsyncKubernetesHook:
         assert len(events) == 1
         assert events[0] == mock_event
 
+    @pytest.mark.asyncio
+    @mock.patch("kubernetes_asyncio.watch.Watch")
+    
@mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.AsyncKubernetesHook.get_pod")
+    async def test_watch_pod_events_reconnects_after_stream_timeout(
+        self,
+        mock_get_pod,
+        mock_watch_class,
+        kube_config_loader,
+    ):
+        """
+        The watch should reconnect when the watch stream ends (e.g. timeout)
+        and continue yielding events until the pod terminates.
+        """
+
+        mock_get_pod.side_effect = [
+            mock.MagicMock(status=mock.MagicMock(phase="Running")),
+            mock.MagicMock(status=mock.MagicMock(phase="Running")),
+        ]
+
+        mock_event1 = mock.Mock()
+        mock_event1.metadata.uid = "event-1"
+        mock_event2 = mock.Mock()
+        mock_event2.metadata.uid = "event-2"
+
+        # Simulate a watch stream ending naturally (e.g. server-side timeout).
+        # The hook should reconnect and continue watching.
+        async def timed_out_stream(*_, **__):
+            yield {"object": mock_event1}
+            return
+
+        async def fresh_stream(*_, **__):
+            yield {"object": mock_event2}
+
+        watch_instance1 = mock.Mock()
+        watch_instance1.stream = mock.Mock(side_effect=timed_out_stream)
+
+        watch_instance2 = mock.Mock()
+        watch_instance2.stream = mock.Mock(side_effect=fresh_stream)
+
+        mock_watch_class.side_effect = [watch_instance1, watch_instance2]
+
+        hook = AsyncKubernetesHook(
+            conn_id=None,
+            in_cluster=False,
+            config_file=None,
+            cluster_context=None,
+        )
+
+        events = []
+
+        async for event in hook.watch_pod_events(
+            name=POD_NAME, namespace=NAMESPACE, resource_version="12345", 
timeout_seconds=1
+        ):
+            events.append(event)
+            if len(events) == 2:
+                break
+
+        assert events == [mock_event1, mock_event2]
+        assert mock_watch_class.call_count == 2
+
+    @pytest.mark.asyncio
+    @mock.patch("kubernetes_asyncio.watch.Watch")
+    
@mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.AsyncKubernetesHook.get_pod")
+    async def test_watch_pod_events_restarts_on_stale_resource_version(
+        self,
+        mock_get_pod,
+        mock_watch_class,
+        kube_config_loader,
+    ):
+        """
+        When the Kubernetes API reports resourceVersion too old (410),
+        the watch should restart from the current state instead of failing.
+        """
+
+        mock_get_pod.side_effect = [
+            mock.MagicMock(status=mock.MagicMock(phase="Running")),
+            mock.MagicMock(status=mock.MagicMock(phase="Running")),
+        ]
+
+        mock_event = mock.Mock()
+        mock_event.metadata.uid = "event"
+        mock_event.metadata.resource_version = "2"
+
+        # Kubernetes signals a stale resourceVersion with HTTP 410.
+        # This should trigger a watch restart instead of failing the generator.
+        async def async_generator_with_error(*_, **__):
+            raise async_client.exceptions.ApiException(status=410)
+            yield
+
+        async def fresh_stream(*_, **__):
+            yield {"object": mock_event}
+
+        watch_instance1 = mock.Mock()
+        watch_instance1.stream = 
mock.Mock(side_effect=async_generator_with_error)
+
+        watch_instance2 = mock.Mock()
+        watch_instance2.stream = mock.Mock(side_effect=fresh_stream)
+
+        mock_watch_class.side_effect = [watch_instance1, watch_instance2]
+
+        hook = AsyncKubernetesHook(
+            conn_id=None,
+            in_cluster=False,
+            config_file=None,
+            cluster_context=None,
+        )
+
+        events = []
+        async for event in hook.watch_pod_events(
+            name=POD_NAME, namespace=NAMESPACE, resource_version="1", 
timeout_seconds=1
+        ):
+            events.append(event)
+            break
+
+        assert events == [mock_event]
+        assert mock_watch_class.call_count == 2
+
+    @pytest.mark.asyncio
+    @mock.patch("kubernetes_asyncio.watch.Watch")
+    
@mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.AsyncKubernetesHook.get_pod")
+    async def test_watch_pod_events_stops_on_pod_not_found(
+        self,
+        mock_get_pod,
+        mock_watch_class,
+        kube_config_loader,
+    ):
+        """
+        Verify that watch_pod_events stops cleanly when the pod no longer 
exists (404).
+        """
+
+        # Pod lifecycle is authoritative; a 404 means the watch must terminate 
cleanly.
+        mock_get_pod.side_effect = 
async_client.exceptions.ApiException(status=404)
+
+        mock_watch = mock.Mock()
+        mock_watch_class.return_value = mock_watch
+
+        hook = AsyncKubernetesHook(
+            conn_id=None,
+            in_cluster=False,
+            config_file=None,
+            cluster_context=None,
+        )
+
+        events = []
+        async for event in hook.watch_pod_events(
+            name=POD_NAME,
+            namespace=NAMESPACE,
+        ):
+            events.append(event)
+
+        # No events should be yielded.
+        assert events == []
+
+    @pytest.mark.parametrize("pod_status", ("Succeeded", "Failed"))
+    @pytest.mark.asyncio
+    @mock.patch("kubernetes_asyncio.watch.Watch")
+    
@mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.AsyncKubernetesHook.get_pod")
+    async def test_watch_pod_events_stops_on_pod_completion(
+        self,
+        mock_get_pod,
+        mock_watch_class,
+        pod_status,
+        kube_config_loader,
+    ):
+        """
+        Verify that watch_pod_events stops immediately when the pod
+        is already in a terminal phase.
+        """
+
+        mock_watch = mock.Mock()
+        mock_watch_class.return_value = mock_watch
+
+        mock_pod = mock.MagicMock()
+        mock_pod.status.phase = pod_status
+        mock_get_pod.return_value = mock_pod
+
+        hook = AsyncKubernetesHook(
+            conn_id=None,
+            in_cluster=False,
+            config_file=None,
+            cluster_context=None,
+        )
+
+        events = []
+        async for event in hook.watch_pod_events(
+            name=POD_NAME,
+            namespace=NAMESPACE,
+        ):
+            events.append(event)
+
+        # No events should be yielded.
+        assert events == []
+
     @pytest.mark.asyncio
     @mock.patch(KUBE_API.format("read_namespaced_pod"))
     async def test_get_pod(self, lib_method, kube_config_loader):

Reply via email to