This is an automated email from the ASF dual-hosted git repository.
husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new fcc6f284c7 Update the watcher resource version in SparkK8SOp when it's
too old (#32768)
fcc6f284c7 is described below
commit fcc6f284c742bdc554edecc5a83d9eaa7d9d7ba4
Author: Hussein Awala <[email protected]>
AuthorDate: Sat Jul 22 13:32:53 2023 +0200
Update the watcher resource version in SparkK8SOp when it's too old (#32768)
---
.../cncf/kubernetes/operators/spark_kubernetes.py | 30 +++++++++++++++++++---
1 file changed, 26 insertions(+), 4 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
b/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
index ccc13469fa..42913a23de 100644
--- a/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
@@ -20,6 +20,7 @@ from __future__ import annotations
import datetime
from typing import TYPE_CHECKING, Sequence
+from kubernetes.client import ApiException
from kubernetes.watch import Watch
from airflow import AirflowException
@@ -27,6 +28,8 @@ from airflow.models import BaseOperator
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook,
_load_body_to_dict
if TYPE_CHECKING:
+ from kubernetes.client.models import CoreV1EventList
+
from airflow.utils.context import Context
@@ -85,6 +88,25 @@ class SparkKubernetesOperator(BaseOperator):
cluster_context=self.cluster_context,
)
+ def _get_namespace_event_stream(self, namespace, query_kwargs=None):
+ try:
+ return Watch().stream(
+ self.hook.core_v1_client.list_namespaced_event,
+ namespace=namespace,
+ watch=True,
+ **(query_kwargs or {}),
+ )
+ except ApiException as e:
+ if e.status == 410: # Resource version is too old
+ events: CoreV1EventList =
self.hook.core_v1_client.list_namespaced_event(
+ namespace=namespace, watch=False
+ )
+ resource_version = events.metadata.resource_version
+ query_kwargs["resource_version"] = resource_version
+ return self._get_namespace_event_stream(namespace,
query_kwargs)
+ else:
+ raise
+
def execute(self, context: Context):
body = _load_body_to_dict(self.application_file)
name = body["metadata"]["name"]
@@ -94,11 +116,11 @@ class SparkKubernetesOperator(BaseOperator):
is_job_created = False
if self.watch:
try:
- namespace_event_stream = Watch().stream(
- self.hook.core_v1_client.list_namespaced_event,
+ namespace_event_stream = self._get_namespace_event_stream(
namespace=namespace,
- watch=True,
-
field_selector=f"involvedObject.kind=SparkApplication,involvedObject.name={name}",
+ query_kwargs={
+ "field_selector":
f"involvedObject.kind=SparkApplication,involvedObject.name={name}"
+ },
)
response = self.hook.create_custom_object(