This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9e5fabecb0 Recover from `too old resource version exception` by
retrieving the latest `resource_version` (#30425)
9e5fabecb0 is described below
commit 9e5fabecb05e83700688d940d31a0fbb49000d64
Author: Hussein Awala <[email protected]>
AuthorDate: Thu Apr 13 15:56:03 2023 +0200
Recover from `too old resource version exception` by retrieving the latest
`resource_version` (#30425)
* Recover from `too old resource version exception` by retrieving the
latest `resource_version`
* Update airflow/executors/kubernetes_executor.py
Co-authored-by: Tzu-ping Chung <[email protected]>
---------
Co-authored-by: Tzu-ping Chung <[email protected]>
---
airflow/executors/kubernetes_executor.py | 21 ++++++++++++++++-----
1 file changed, 16 insertions(+), 5 deletions(-)
diff --git a/airflow/executors/kubernetes_executor.py
b/airflow/executors/kubernetes_executor.py
index bed56856c6..30d242dfd5 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -129,11 +129,22 @@ class KubernetesJobWatcher(multiprocessing.Process,
LoggingMixin):
def _pod_events(self, kube_client: client.CoreV1Api, query_kwargs: dict):
watcher = watch.Watch()
-
- if self.namespace == ALL_NAMESPACES:
- return watcher.stream(kube_client.list_pod_for_all_namespaces,
**query_kwargs)
- else:
- return watcher.stream(kube_client.list_namespaced_pod,
self.namespace, **query_kwargs)
+ try:
+ if self.namespace == ALL_NAMESPACES:
+ return watcher.stream(kube_client.list_pod_for_all_namespaces,
**query_kwargs)
+ else:
+ return watcher.stream(kube_client.list_namespaced_pod,
self.namespace, **query_kwargs)
+ except ApiException as e:
+ if e.status == 410: # Resource version is too old
+ if self.namespace == ALL_NAMESPACES:
+ pods = kube_client.list_pod_for_all_namespaces(watch=False)
+ else:
+ pods =
kube_client.list_namespaced_pod(namespace=self.namespace, watch=False)
+ resource_version = pods.metadata.resource_version
+ query_kwargs["resource_version"] = resource_version
+ return self._pod_events(kube_client=kube_client,
query_kwargs=query_kwargs)
+ else:
+ raise
def _run(
self,