This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 610747d25a Add timeout when watching pod events in k8s executor
(#39551)
610747d25a is described below
commit 610747d25a6153574c07624afaadcbf575aa2960
Author: Daniel Standish <[email protected]>
AuthorDate: Wed May 15 09:23:23 2024 -0700
Add timeout when watching pod events in k8s executor (#39551)
If we don't set a timeout, it may hang indefinitely if there's a network
issue.
---------
Co-authored-by: Ryan Hatter <[email protected]>
---
.../cncf/kubernetes/executors/kubernetes_executor_utils.py | 14 ++++++++++----
1 file changed, 10 insertions(+), 4 deletions(-)
diff --git
a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
index d26df876ef..b8235bb5ac 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
@@ -98,9 +98,7 @@ class KubernetesJobWatcher(multiprocessing.Process,
LoggingMixin):
kube_client, self.resource_version, self.scheduler_job_id,
self.kube_config
)
except ReadTimeoutError:
- self.log.warning(
- "There was a timeout error accessing the Kube API.
Retrying request.", exc_info=True
- )
+ self.log.info("Kubernetes watch timed out waiting for events.
Restarting watch.")
time.sleep(1)
except Exception:
self.log.exception("Unknown error in KubernetesJobWatcher.
Failing")
@@ -141,7 +139,7 @@ class KubernetesJobWatcher(multiprocessing.Process,
LoggingMixin):
) -> str | None:
self.log.info("Event: and now my watch begins starting at
resource_version: %s", resource_version)
- kwargs = {"label_selector": f"airflow-worker={scheduler_job_id}"}
+ kwargs: dict[str, Any] = {"label_selector":
f"airflow-worker={scheduler_job_id}"}
if resource_version:
kwargs["resource_version"] = resource_version
if kube_config.kube_client_request_args:
@@ -150,6 +148,14 @@ class KubernetesJobWatcher(multiprocessing.Process,
LoggingMixin):
last_resource_version: str | None = None
+ # For info about k8s timeout settings see
+ #
https://github.com/kubernetes-client/python/blob/v29.0.0/examples/watch/timeout-settings.md
+ # and
https://github.com/kubernetes-client/python/blob/v29.0.0/kubernetes/client/api_client.py#L336-L339
+ client_timeout = 30
+ server_conn_timeout = 3600
+ kwargs["_request_timeout"] = client_timeout
+ kwargs["timeout_seconds"] = server_conn_timeout
+
for event in self._pod_events(kube_client=kube_client,
query_kwargs=kwargs):
task = event["object"]
self.log.debug("Event: %s had an event of type %s",
task.metadata.name, event["type"])