This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 610747d25a Add timeout when watching pod events in k8s executor 
(#39551)
610747d25a is described below

commit 610747d25a6153574c07624afaadcbf575aa2960
Author: Daniel Standish <[email protected]>
AuthorDate: Wed May 15 09:23:23 2024 -0700

    Add timeout when watching pod events in k8s executor (#39551)
    
    If we don't set a timeout, it may hang indefinitely if there's a network 
issue.
    
    ---------
    
    Co-authored-by: Ryan Hatter <[email protected]>
---
 .../cncf/kubernetes/executors/kubernetes_executor_utils.py | 14 ++++++++++----
 1 file changed, 10 insertions(+), 4 deletions(-)

diff --git 
a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py 
b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
index d26df876ef..b8235bb5ac 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
@@ -98,9 +98,7 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin):
                     kube_client, self.resource_version, self.scheduler_job_id, 
self.kube_config
                 )
             except ReadTimeoutError:
-                self.log.warning(
-                    "There was a timeout error accessing the Kube API. 
Retrying request.", exc_info=True
-                )
+                self.log.info("Kubernetes watch timed out waiting for events. 
Restarting watch.")
                 time.sleep(1)
             except Exception:
                 self.log.exception("Unknown error in KubernetesJobWatcher. 
Failing")
@@ -141,7 +139,7 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin):
     ) -> str | None:
         self.log.info("Event: and now my watch begins starting at 
resource_version: %s", resource_version)
 
-        kwargs = {"label_selector": f"airflow-worker={scheduler_job_id}"}
+        kwargs: dict[str, Any] = {"label_selector": 
f"airflow-worker={scheduler_job_id}"}
         if resource_version:
             kwargs["resource_version"] = resource_version
         if kube_config.kube_client_request_args:
@@ -150,6 +148,14 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin):
 
         last_resource_version: str | None = None
 
+        # For info about k8s timeout settings see
+        # 
https://github.com/kubernetes-client/python/blob/v29.0.0/examples/watch/timeout-settings.md
+        # and 
https://github.com/kubernetes-client/python/blob/v29.0.0/kubernetes/client/api_client.py#L336-L339
+        client_timeout = 30
+        server_conn_timeout = 3600
+        kwargs["_request_timeout"] = client_timeout
+        kwargs["timeout_seconds"] = server_conn_timeout
+
         for event in self._pod_events(kube_client=kube_client, 
query_kwargs=kwargs):
             task = event["object"]
             self.log.debug("Event: %s had an event of type %s", 
task.metadata.name, event["type"])

Reply via email to