This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 52867d9cc4b AsyncKubernetesHook.watch_pod_events could silently stop
emitting events when (#60532)
52867d9cc4b is described below
commit 52867d9cc4bbda2223162e01e7dd818327a69efc
Author: SameerMesiah97 <[email protected]>
AuthorDate: Mon Jan 26 19:30:34 2026 +0000
AsyncKubernetesHook.watch_pod_events could silently stop emitting events
when (#60532)
the Kubernetes watch stream ended (e.g. due to timeout_seconds), even while
the pod was still running.
This change reconnects on watch termination, resumes from the last observed
resourceVersion, restarts on stale resourceVersion errors (410), and stops
only when the pod completes or is deleted. Permission-denied watches still
fall
back to polling.
As part of this fix, kubeconfig loading is cached and _load_config no longer
returns an API client, clarifying its responsibility and avoiding repeated
config loading during watch reconnects.
Tests cover reconnection behavior, stale resourceVersion recovery, and clean
termination on pod completion or deletion.
Co-authored-by: Sameer Mesiah <[email protected]>
---
.../providers/cncf/kubernetes/hooks/kubernetes.py | 168 +++++++++++-----
.../unit/cncf/kubernetes/hooks/test_kubernetes.py | 217 ++++++++++++++++++++-
2 files changed, 331 insertions(+), 54 deletions(-)
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
index a76e9390db6..fa33fe44951 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
@@ -816,48 +816,54 @@ class AsyncKubernetesHook(KubernetesHook):
self.config_dict = config_dict
self._extras: dict | None = connection_extras
self._event_polling_fallback = False
+ self._config_loaded = False
async def _load_config(self):
- """Return Kubernetes API session for use with requests."""
+ """Load Kubernetes configuration once per hook instance."""
+ if self._config_loaded:
+ return
+
in_cluster = self._coalesce_param(self.in_cluster, await
self._get_field("in_cluster"))
cluster_context = self._coalesce_param(self.cluster_context, await
self._get_field("cluster_context"))
kubeconfig_path = await self._get_field("kube_config_path")
kubeconfig = await self._get_field("kube_config")
+
num_selected_configuration = sum(
1 for o in [in_cluster, kubeconfig, kubeconfig_path,
self.config_dict] if o
)
- async def api_client_from_kubeconfig_file(_kubeconfig_path: str |
None):
- await async_config.load_kube_config(
- config_file=_kubeconfig_path,
- client_configuration=self.client_configuration,
- context=cluster_context,
- )
- return _TimeoutAsyncK8sApiClient()
-
if num_selected_configuration > 1:
raise AirflowException(
"Invalid connection configuration. Options kube_config_path, "
- "kube_config, in_cluster are mutually exclusive. "
+ "kube_config, in_cluster, and config_dict are mutually
exclusive. "
"You can only use one option at a time."
)
if in_cluster:
self.log.debug(LOADING_KUBE_CONFIG_FILE_RESOURCE.format("within a
pod"))
- self._is_in_cluster = True
async_config.load_incluster_config()
- return _TimeoutAsyncK8sApiClient()
+ self._is_in_cluster = True
+ self._config_loaded = True
+ return
+
+ # If above block does not return, we are not in a cluster.
+ self._is_in_cluster = False
if self.config_dict:
self.log.debug(LOADING_KUBE_CONFIG_FILE_RESOURCE.format("config
dictionary"))
- self._is_in_cluster = False
await async_config.load_kube_config_from_dict(self.config_dict,
context=cluster_context)
- return _TimeoutAsyncK8sApiClient()
+ self._config_loaded = True
+ return
if kubeconfig_path is not None:
self.log.debug("loading kube_config from: %s", kubeconfig_path)
- self._is_in_cluster = False
- return await api_client_from_kubeconfig_file(kubeconfig_path)
+ await async_config.load_kube_config(
+ config_file=kubeconfig_path,
+ client_configuration=self.client_configuration,
+ context=cluster_context,
+ )
+ self._config_loaded = True
+ return
if kubeconfig is not None:
async with aiofiles.tempfile.NamedTemporaryFile() as temp_config:
@@ -874,13 +880,21 @@ class AsyncKubernetesHook(KubernetesHook):
kubeconfig = json.dumps(kubeconfig)
await temp_config.write(kubeconfig.encode())
await temp_config.flush()
- self._is_in_cluster = False
- return await api_client_from_kubeconfig_file(temp_config.name)
+
+ await async_config.load_kube_config(
+ config_file=temp_config.name,
+ client_configuration=self.client_configuration,
+ context=cluster_context,
+ )
+ self._config_loaded = True
+ return
+
self.log.debug(LOADING_KUBE_CONFIG_FILE_RESOURCE.format("default
configuration file"))
await async_config.load_kube_config(
client_configuration=self.client_configuration,
context=cluster_context,
)
+ self._config_loaded = True
async def get_conn_extras(self) -> dict:
if self._extras is None:
@@ -907,7 +921,8 @@ class AsyncKubernetesHook(KubernetesHook):
async def get_conn(self) -> AsyncGenerator[async_client.ApiClient, None]:
kube_client = None
try:
- kube_client = await self._load_config() or
_TimeoutAsyncK8sApiClient()
+ await self._load_config()
+ kube_client = _TimeoutAsyncK8sApiClient()
yield kube_client
finally:
if kube_client is not None:
@@ -1021,48 +1036,101 @@ class AsyncKubernetesHook(KubernetesHook):
timeout_seconds: int = 30,
) -> AsyncGenerator[CoreV1Event]:
"""
- Watch pod events using Kubernetes Watch API.
+ Watch Kubernetes events for a pod.
+
+ Reconnects on watch termination and resumes from the last observed
+ resourceVersion. The watch stops when the pod is terminal or deleted,
+ and falls back to polling if watch access is denied.
:param name: Pod name to watch events for
:param namespace: Kubernetes namespace
:param resource_version: Only return events not older than this
resource version
:param timeout_seconds: Timeout in seconds for the watch stream. A
small additional buffer may be applied internally.
+ This does not limit the total duration of
event streaming.
"""
- if self._event_polling_fallback:
- async for event_polled in self.watch_pod_events_polling_fallback(
- name, namespace, resource_version, timeout_seconds
- ):
- yield event_polled
-
- try:
- w = async_watch.Watch()
- async with self.get_conn() as connection:
- v1_api = async_client.CoreV1Api(connection)
+ last_rv = resource_version
- async for event_watched in w.stream(
- v1_api.list_namespaced_event,
- namespace=namespace,
- field_selector=f"involvedObject.name={name}",
- resource_version=resource_version,
- timeout_seconds=timeout_seconds,
- ):
- event: CoreV1Event = event_watched.get("object")
- yield event
-
- except async_client.exceptions.ApiException as e:
- if hasattr(e, "status") and e.status == 403:
- self.log.warning(
- "Triggerer does not have Kubernetes API permission to
'watch' events: %s Falling back to polling.",
- str(e),
- )
- self._event_polling_fallback = True
+ while True:
+ # If watch is known to be unavailable, use polling fallback
+ if self._event_polling_fallback:
async for event_polled in
self.watch_pod_events_polling_fallback(
- name, namespace, resource_version, timeout_seconds
+ name, namespace, last_rv, timeout_seconds
):
yield event_polled
+ return
- finally:
- w.stop()
+ # Watch may not be created if pod inspection triggers early return.
+ w = None
+
+ try:
+ # Pod lifecycle is authoritative; events alone are not.
+ pod = await self.get_pod(name=name, namespace=namespace)
+ if pod.status and pod.status.phase in ("Succeeded", "Failed"):
+ self.log.info(
+ "Pod '%s' reached terminal phase '%s'; stopping event
watch",
+ name,
+ pod.status.phase,
+ )
+ return
+
+ w = async_watch.Watch()
+ async with self.get_conn() as connection:
+ v1_api = async_client.CoreV1Api(connection)
+
+ async for event_watched in w.stream(
+ v1_api.list_namespaced_event,
+ namespace=namespace,
+ field_selector=f"involvedObject.name={name}",
+ resource_version=last_rv,
+ timeout_seconds=timeout_seconds,
+ ):
+ event = event_watched.get("object")
+ if not event or not event.metadata:
+ continue
+
+ if event.metadata.resource_version:
+ last_rv = event.metadata.resource_version
+
+ yield event
+
+ # Never swallow cancellation.
+ except asyncio.CancelledError:
+ raise
+
+ except async_client.exceptions.ApiException as e:
+ status = getattr(e, "status", None)
+
+ if status == 403:
+ # Permanently fall back to polling when watch is not
permitted.
+ self.log.warning(
+ "Kubernetes API does not permit watching events;
falling back to polling: %s",
+ str(e),
+ )
+ self._event_polling_fallback = True
+ continue
+
+ if status == 404:
+ # Terminate the watch if pod no longer exists.
+ self.log.info("Pod '%s' no longer exists; stopping event
watch", name)
+ return
+
+ if status == 410:
+ # Restart watch from current state if resourceVersion is
too old.
+ self.log.info(
+ "resourceVersion too old while watching pod '%s';
restarting watch",
+ name,
+ )
+ last_rv = None
+ continue
+
+ # Other API errors are either transient or
configuration/programming errors.
+ # Re-raise so generic_api_retry can apply centralized
retry/backoff for
+ # transient failures, and fail fast for non-retryable ones.
+ raise
+
+ finally:
+ if w is not None:
+ w.stop()
async def watch_pod_events_polling_fallback(
self,
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/hooks/test_kubernetes.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/hooks/test_kubernetes.py
index d1fac787049..0f51a6b6170 100644
---
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/hooks/test_kubernetes.py
+++
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/hooks/test_kubernetes.py
@@ -1149,10 +1149,11 @@ class TestAsyncKubernetesHook:
assert result == mock_events
@pytest.mark.asyncio
+
@mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.AsyncKubernetesHook.get_pod")
@mock.patch("kubernetes_asyncio.watch.Watch")
@mock.patch(KUBE_API.format("list_namespaced_event"))
async def test_async_watch_pod_events(
- self, mock_list_namespaced_event, mock_watch_class, kube_config_loader
+ self, mock_list_namespaced_event, mock_watch_class, mock_get_pod,
kube_config_loader
):
"""Test watching pod events using Watch API."""
mock_event1 = mock.Mock()
@@ -1168,6 +1169,10 @@ class TestAsyncKubernetesHook:
mock_watch_class.return_value = mock_watch
mock_watch.stream = mock.Mock(side_effect=async_generator)
+ mock_pod = mock.MagicMock()
+ mock_pod.status.phase = "Running"
+ mock_get_pod.return_value = mock_pod
+
hook = AsyncKubernetesHook(
conn_id=None,
in_cluster=False,
@@ -1176,10 +1181,16 @@ class TestAsyncKubernetesHook:
)
events = []
- async for event in hook.watch_pod_events(
+ async_event_generator = hook.watch_pod_events(
name=POD_NAME, namespace=NAMESPACE, resource_version="12345",
timeout_seconds=30
- ):
+ )
+
+ async for event in async_event_generator:
events.append(event)
+ if len(events) == 2:
+ break
+
+ await async_event_generator.aclose()
assert len(events) == 2
assert events[0] == mock_event1
@@ -1187,10 +1198,11 @@ class TestAsyncKubernetesHook:
mock_watch.stop.assert_called_once()
@pytest.mark.asyncio
+
@mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.AsyncKubernetesHook.get_pod")
@mock.patch("kubernetes_asyncio.watch.Watch")
@mock.patch(KUBE_API.format("list_namespaced_event"))
async def test_async_watch_pod_events_permission_error_fallback(
- self, mock_list_namespaced_event, mock_watch_class, kube_config_loader
+ self, mock_list_namespaced_event, mock_watch_class, mock_get_pod,
kube_config_loader
):
"""Test fallback to polling when watch permission is denied."""
@@ -1210,6 +1222,10 @@ class TestAsyncKubernetesHook:
mock_events.items = [mock_event]
mock_list_namespaced_event.return_value =
self.mock_await_result(mock_events)
+ mock_pod = mock.MagicMock()
+ mock_pod.status.phase = "Running"
+ mock_get_pod.return_value = mock_pod
+
hook = AsyncKubernetesHook(
conn_id=None,
in_cluster=False,
@@ -1300,6 +1316,199 @@ class TestAsyncKubernetesHook:
assert len(events) == 1
assert events[0] == mock_event
+ @pytest.mark.asyncio
+ @mock.patch("kubernetes_asyncio.watch.Watch")
+
@mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.AsyncKubernetesHook.get_pod")
+ async def test_watch_pod_events_reconnects_after_stream_timeout(
+ self,
+ mock_get_pod,
+ mock_watch_class,
+ kube_config_loader,
+ ):
+ """
+ The watch should reconnect when the watch stream ends (e.g. timeout)
+ and continue yielding events until the pod terminates.
+ """
+
+ mock_get_pod.side_effect = [
+ mock.MagicMock(status=mock.MagicMock(phase="Running")),
+ mock.MagicMock(status=mock.MagicMock(phase="Running")),
+ ]
+
+ mock_event1 = mock.Mock()
+ mock_event1.metadata.uid = "event-1"
+ mock_event2 = mock.Mock()
+ mock_event2.metadata.uid = "event-2"
+
+ # Simulate a watch stream ending naturally (e.g. server-side timeout).
+ # The hook should reconnect and continue watching.
+ async def timed_out_stream(*_, **__):
+ yield {"object": mock_event1}
+ return
+
+ async def fresh_stream(*_, **__):
+ yield {"object": mock_event2}
+
+ watch_instance1 = mock.Mock()
+ watch_instance1.stream = mock.Mock(side_effect=timed_out_stream)
+
+ watch_instance2 = mock.Mock()
+ watch_instance2.stream = mock.Mock(side_effect=fresh_stream)
+
+ mock_watch_class.side_effect = [watch_instance1, watch_instance2]
+
+ hook = AsyncKubernetesHook(
+ conn_id=None,
+ in_cluster=False,
+ config_file=None,
+ cluster_context=None,
+ )
+
+ events = []
+
+ async for event in hook.watch_pod_events(
+ name=POD_NAME, namespace=NAMESPACE, resource_version="12345",
timeout_seconds=1
+ ):
+ events.append(event)
+ if len(events) == 2:
+ break
+
+ assert events == [mock_event1, mock_event2]
+ assert mock_watch_class.call_count == 2
+
+ @pytest.mark.asyncio
+ @mock.patch("kubernetes_asyncio.watch.Watch")
+
@mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.AsyncKubernetesHook.get_pod")
+ async def test_watch_pod_events_restarts_on_stale_resource_version(
+ self,
+ mock_get_pod,
+ mock_watch_class,
+ kube_config_loader,
+ ):
+ """
+ When the Kubernetes API reports resourceVersion too old (410),
+ the watch should restart from the current state instead of failing.
+ """
+
+ mock_get_pod.side_effect = [
+ mock.MagicMock(status=mock.MagicMock(phase="Running")),
+ mock.MagicMock(status=mock.MagicMock(phase="Running")),
+ ]
+
+ mock_event = mock.Mock()
+ mock_event.metadata.uid = "event"
+ mock_event.metadata.resource_version = "2"
+
+ # Kubernetes signals a stale resourceVersion with HTTP 410.
+ # This should trigger a watch restart instead of failing the generator.
+ async def async_generator_with_error(*_, **__):
+ raise async_client.exceptions.ApiException(status=410)
+ yield
+
+ async def fresh_stream(*_, **__):
+ yield {"object": mock_event}
+
+ watch_instance1 = mock.Mock()
+ watch_instance1.stream =
mock.Mock(side_effect=async_generator_with_error)
+
+ watch_instance2 = mock.Mock()
+ watch_instance2.stream = mock.Mock(side_effect=fresh_stream)
+
+ mock_watch_class.side_effect = [watch_instance1, watch_instance2]
+
+ hook = AsyncKubernetesHook(
+ conn_id=None,
+ in_cluster=False,
+ config_file=None,
+ cluster_context=None,
+ )
+
+ events = []
+ async for event in hook.watch_pod_events(
+ name=POD_NAME, namespace=NAMESPACE, resource_version="1",
timeout_seconds=1
+ ):
+ events.append(event)
+ break
+
+ assert events == [mock_event]
+ assert mock_watch_class.call_count == 2
+
+ @pytest.mark.asyncio
+ @mock.patch("kubernetes_asyncio.watch.Watch")
+
@mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.AsyncKubernetesHook.get_pod")
+ async def test_watch_pod_events_stops_on_pod_not_found(
+ self,
+ mock_get_pod,
+ mock_watch_class,
+ kube_config_loader,
+ ):
+ """
+ Verify that watch_pod_events stops cleanly when the pod no longer
exists (404).
+ """
+
+ # Pod lifecycle is authoritative; a 404 means the watch must terminate
cleanly.
+ mock_get_pod.side_effect =
async_client.exceptions.ApiException(status=404)
+
+ mock_watch = mock.Mock()
+ mock_watch_class.return_value = mock_watch
+
+ hook = AsyncKubernetesHook(
+ conn_id=None,
+ in_cluster=False,
+ config_file=None,
+ cluster_context=None,
+ )
+
+ events = []
+ async for event in hook.watch_pod_events(
+ name=POD_NAME,
+ namespace=NAMESPACE,
+ ):
+ events.append(event)
+
+ # No events should be yielded.
+ assert events == []
+
+ @pytest.mark.parametrize("pod_status", ("Succeeded", "Failed"))
+ @pytest.mark.asyncio
+ @mock.patch("kubernetes_asyncio.watch.Watch")
+
@mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.AsyncKubernetesHook.get_pod")
+ async def test_watch_pod_events_stops_on_pod_completion(
+ self,
+ mock_get_pod,
+ mock_watch_class,
+ pod_status,
+ kube_config_loader,
+ ):
+ """
+ Verify that watch_pod_events stops immediately when the pod
+ is already in a terminal phase.
+ """
+
+ mock_watch = mock.Mock()
+ mock_watch_class.return_value = mock_watch
+
+ mock_pod = mock.MagicMock()
+ mock_pod.status.phase = pod_status
+ mock_get_pod.return_value = mock_pod
+
+ hook = AsyncKubernetesHook(
+ conn_id=None,
+ in_cluster=False,
+ config_file=None,
+ cluster_context=None,
+ )
+
+ events = []
+ async for event in hook.watch_pod_events(
+ name=POD_NAME,
+ namespace=NAMESPACE,
+ ):
+ events.append(event)
+
+ # No events should be yielded.
+ assert events == []
+
@pytest.mark.asyncio
@mock.patch(KUBE_API.format("read_namespaced_pod"))
async def test_get_pod(self, lib_method, kube_config_loader):