This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9e5fabecb0 Recover from `too old resource version exception` by 
retrieving the latest `resource_version` (#30425)
9e5fabecb0 is described below

commit 9e5fabecb05e83700688d940d31a0fbb49000d64
Author: Hussein Awala <[email protected]>
AuthorDate: Thu Apr 13 15:56:03 2023 +0200

    Recover from `too old resource version exception` by retrieving the latest 
`resource_version` (#30425)
    
    * Recover from `too old resource version exception` by retrieving the 
latest `resource_version`
    
    * Update airflow/executors/kubernetes_executor.py
    
    Co-authored-by: Tzu-ping Chung <[email protected]>
    
    ---------
    
    Co-authored-by: Tzu-ping Chung <[email protected]>
---
 airflow/executors/kubernetes_executor.py | 21 ++++++++++++++++-----
 1 file changed, 16 insertions(+), 5 deletions(-)

diff --git a/airflow/executors/kubernetes_executor.py 
b/airflow/executors/kubernetes_executor.py
index bed56856c6..30d242dfd5 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -129,11 +129,22 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin):
 
     def _pod_events(self, kube_client: client.CoreV1Api, query_kwargs: dict):
         watcher = watch.Watch()
-
-        if self.namespace == ALL_NAMESPACES:
-            return watcher.stream(kube_client.list_pod_for_all_namespaces, 
**query_kwargs)
-        else:
-            return watcher.stream(kube_client.list_namespaced_pod, 
self.namespace, **query_kwargs)
+        try:
+            if self.namespace == ALL_NAMESPACES:
+                return watcher.stream(kube_client.list_pod_for_all_namespaces, 
**query_kwargs)
+            else:
+                return watcher.stream(kube_client.list_namespaced_pod, 
self.namespace, **query_kwargs)
+        except ApiException as e:
+            if e.status == 410:  # Resource version is too old
+                if self.namespace == ALL_NAMESPACES:
+                    pods = kube_client.list_pod_for_all_namespaces(watch=False)
+                else:
+                    pods = 
kube_client.list_namespaced_pod(namespace=self.namespace, watch=False)
+                resource_version = pods.metadata.resource_version
+                query_kwargs["resource_version"] = resource_version
+                return self._pod_events(kube_client=kube_client, 
query_kwargs=query_kwargs)
+            else:
+                raise
 
     def _run(
         self,

Reply via email to