SameerMesiah97 opened a new issue, #60495:
URL: https://github.com/apache/airflow/issues/60495
### Apache Airflow Provider(s)
cncf-kubernetes
### Versions of Apache Airflow Providers
`apache-airflow-providers-cncf-kubernete==10.12.0 `
### Apache Airflow version
main
### Operating System
Debian GNU/Linux 12 (bookworm)
### Deployment
Other
### Deployment details
_No response_
### What happened
When using `def watch_pod_events` from `AsyncKubernetesHook`, the event
watch terminates silently after `timeout_seconds`, even if the pod is still
running and continuing to emit events.
The Kubernetes Watch API closes watch streams after the `timeout_seconds`
has elapsed without signaling an error. The current implementation treats this
normal stream termination as completion and exits the generator instead of
reconnecting. As a result, events emitted after the first watch timeout are
dropped, and the Airflow task completes successfully without consuming all pod
events.
This behavior is silent and surprising: no error is raised, no warning is
logged, and users receive incomplete event logs despite the pod still being
alive.
### What you think should happen instead
`def watch_pod_events` should treat watch stream termination as a reconnect
signal, not as task completion.
The method should:
- Reconnect when the watch stream ends normally
- Resume watching from the last observed resourceVersion
- Continue yielding events for as long as the pod exists
- Terminate only when:
- The pod reaches a terminal phase (Succeeded / Failed)
- The pod is deleted
- The task is cancelled
- An irrecoverable Kubernetes API error occurs (e.g. authorization failure)
This matches Kubernetes watch semantics and user expectations for a
long-running event stream.
### How to reproduce
1. Create a pod (for this reproduction, named `event-spammer` in the
`default` namespace) that continuously emits events for a duration longer than
the watch timeout (for example, a pod that runs for several minutes).
2. Run the following DAG, ensuring that the pod name and namespace in the
DAG match the pod you created.
```
from datetime import datetime
import asyncio
from airflow.decorators import dag, task
from airflow.providers.cncf.kubernetes.hooks.kubernetes import
AsyncKubernetesHook
@task
def watch_pod_events():
async def _watch():
hook = AsyncKubernetesHook(in_cluster=True)
async for event in hook.watch_pod_events(
name="event-spammer",
namespace="default",
timeout_seconds=30, # triggers premature termination
):
print(f"[EVENT] {event.message}")
asyncio.run(_watch())
@dag(
start_date=datetime(2025, 1, 1),
schedule=None,
catchup=False,
)
def k8s_watch_timeout_repro():
watch_pod_events()
dag = k8s_watch_timeout_repro()
```
3. Trigger the DAG and observe the task logs.
4. The task will stop logging events after approximately `timeout_seconds`,
even though the pod is still running and emitting events. No error or warning
is produced that indicates the possibility of events being missed or the watch
being terminated prematurely.
### Anything else
**This is unlikely to be a documentation-only issue.**
The current behavior violates the principle of least surprise. A function
named `watch_pod_events`, with documentation (please refer to 'async
watch_pod_events' on this
[page](https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/_api/airflow/providers/cncf/kubernetes/hooks/kubernetes/index.html))
stating that it “watches pod events,” reasonably implies a continuous event
stream for the lifetime of the pod, not a best-effort stream that silently
stops after a fixed timeout.
While it is possible that some existing deployments have come to rely on the
current behavior, this is unlikely to be intentional. In practice, users who
require reliable event streaming already need to implement their own
workarounds (e.g. manual polling loops, external controllers, or repeated watch
restarts) to compensate for this limitation.
The proposed change does alter behavior, but it does not break existing
deployments:
- Consumers that stop iteration early will continue to work unchanged
- No new exceptions are introduced
- The watch still terminates on pod completion, deletion, or cancellation
- The API surface remains identical
Instead, the change aligns runtime behavior with what users can reasonably
expect based on the function name, docstring, and public API contract. It makes
the default behavior safer, more intuitive, and more consistent with Kubernetes
watch semantics, without removing any existing escape hatches.
### Are you willing to submit PR?
- [x] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]