This is an automated email from the ASF dual-hosted git repository.

husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fcc6f284c7 Update the watcher resource version in SparkK8SOp when it's 
too old (#32768)
fcc6f284c7 is described below

commit fcc6f284c742bdc554edecc5a83d9eaa7d9d7ba4
Author: Hussein Awala <[email protected]>
AuthorDate: Sat Jul 22 13:32:53 2023 +0200

    Update the watcher resource version in SparkK8SOp when it's too old (#32768)
---
 .../cncf/kubernetes/operators/spark_kubernetes.py  | 30 +++++++++++++++++++---
 1 file changed, 26 insertions(+), 4 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py 
b/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
index ccc13469fa..42913a23de 100644
--- a/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
@@ -20,6 +20,7 @@ from __future__ import annotations
 import datetime
 from typing import TYPE_CHECKING, Sequence
 
+from kubernetes.client import ApiException
 from kubernetes.watch import Watch
 
 from airflow import AirflowException
@@ -27,6 +28,8 @@ from airflow.models import BaseOperator
 from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook, 
_load_body_to_dict
 
 if TYPE_CHECKING:
+    from kubernetes.client.models import CoreV1EventList
+
     from airflow.utils.context import Context
 
 
@@ -85,6 +88,25 @@ class SparkKubernetesOperator(BaseOperator):
             cluster_context=self.cluster_context,
         )
 
+    def _get_namespace_event_stream(self, namespace, query_kwargs=None):
+        try:
+            return Watch().stream(
+                self.hook.core_v1_client.list_namespaced_event,
+                namespace=namespace,
+                watch=True,
+                **(query_kwargs or {}),
+            )
+        except ApiException as e:
+            if e.status == 410:  # Resource version is too old
+                events: CoreV1EventList = 
self.hook.core_v1_client.list_namespaced_event(
+                    namespace=namespace, watch=False
+                )
+                resource_version = events.metadata.resource_version
+                query_kwargs["resource_version"] = resource_version
+                return self._get_namespace_event_stream(namespace, 
query_kwargs)
+            else:
+                raise
+
     def execute(self, context: Context):
         body = _load_body_to_dict(self.application_file)
         name = body["metadata"]["name"]
@@ -94,11 +116,11 @@ class SparkKubernetesOperator(BaseOperator):
         is_job_created = False
         if self.watch:
             try:
-                namespace_event_stream = Watch().stream(
-                    self.hook.core_v1_client.list_namespaced_event,
+                namespace_event_stream = self._get_namespace_event_stream(
                     namespace=namespace,
-                    watch=True,
-                    
field_selector=f"involvedObject.kind=SparkApplication,involvedObject.name={name}",
+                    query_kwargs={
+                        "field_selector": 
f"involvedObject.kind=SparkApplication,involvedObject.name={name}"
+                    },
                 )
 
                 response = self.hook.create_custom_object(

Reply via email to