This is an automated email from the ASF dual-hosted git repository.

rom pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 223acdb13f Fix SparkKubernetesOperator spark name. (#42427)
223acdb13f is described below

commit 223acdb13fd479f4a0aede9f866bcae8d918b91f
Author: GPK <[email protected]>
AuthorDate: Fri Sep 27 13:55:24 2024 +0100

    Fix SparkKubernetesOperator spark name. (#42427)
    
    * use name parameter from spark yaml config or from operator argument 
parameter
    
    * update tests and name usage condition check
    
    * adding test, to check spark name starts with task_id
    
    * use set_name function in create_job
    
    * remove lower
---
 .../cncf/kubernetes/operators/spark_kubernetes.py  |  15 ++-
 .../application_test_with_no_name_from_config.json |  57 ++++++++
 .../application_test_with_no_name_from_config.yaml |  55 ++++++++
 .../kubernetes/operators/test_spark_kubernetes.py  | 143 +++++++++++++++++++++
 4 files changed, 265 insertions(+), 5 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py 
b/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
index 39fadae90e..9bcf46d0d4 100644
--- a/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
@@ -17,7 +17,6 @@
 # under the License.
 from __future__ import annotations
 
-import re
 from functools import cached_property
 from pathlib import Path
 from typing import TYPE_CHECKING, Any
@@ -83,7 +82,7 @@ class SparkKubernetesOperator(KubernetesPodOperator):
         image: str | None = None,
         code_path: str | None = None,
         namespace: str = "default",
-        name: str = "default",
+        name: str | None = None,
         application_file: str | None = None,
         template_spec=None,
         get_logs: bool = True,
@@ -103,7 +102,6 @@ class SparkKubernetesOperator(KubernetesPodOperator):
         self.code_path = code_path
         self.application_file = application_file
         self.template_spec = template_spec
-        self.name = self.create_job_name()
         self.kubernetes_conn_id = kubernetes_conn_id
         self.startup_timeout_seconds = startup_timeout_seconds
         self.reattach_on_restart = reattach_on_restart
@@ -161,8 +159,13 @@ class SparkKubernetesOperator(KubernetesPodOperator):
         return template_body
 
     def create_job_name(self):
-        initial_name = add_unique_suffix(name=self.task_id, 
max_len=MAX_LABEL_LEN)
-        return re.sub(r"[^a-z0-9-]+", "-", initial_name.lower())
+        name = (
+            self.name or self.template_body.get("spark", {}).get("metadata", 
{}).get("name") or self.task_id
+        )
+
+        updated_name = add_unique_suffix(name=name, max_len=MAX_LABEL_LEN)
+
+        return self._set_name(updated_name)
 
     @staticmethod
     def _get_pod_identifying_label_string(labels) -> str:
@@ -282,6 +285,8 @@ class SparkKubernetesOperator(KubernetesPodOperator):
         return CustomObjectsApi()
 
     def execute(self, context: Context):
+        self.name = self.create_job_name()
+
         self.log.info("Creating sparkApplication.")
         self.launcher = CustomObjectLauncher(
             name=self.name,
diff --git 
a/tests/providers/cncf/kubernetes/data_files/spark/application_test_with_no_name_from_config.json
 
b/tests/providers/cncf/kubernetes/data_files/spark/application_test_with_no_name_from_config.json
new file mode 100644
index 0000000000..1504c40fbd
--- /dev/null
+++ 
b/tests/providers/cncf/kubernetes/data_files/spark/application_test_with_no_name_from_config.json
@@ -0,0 +1,57 @@
+{
+   "apiVersion":"sparkoperator.k8s.io/v1beta2",
+   "kind":"SparkApplication",
+   "metadata":{
+      "namespace":"default"
+   },
+   "spec":{
+      "type":"Scala",
+      "mode":"cluster",
+      "image":"gcr.io/spark-operator/spark:v2.4.5",
+      "imagePullPolicy":"Always",
+      "mainClass":"org.apache.spark.examples.SparkPi",
+      
"mainApplicationFile":"local:///opt/spark/examples/jars/spark-examples_2.11-2.4.5.jar",
+      "sparkVersion":"2.4.5",
+      "restartPolicy":{
+         "type":"Never"
+      },
+      "volumes":[
+         {
+            "name":"test-volume",
+            "hostPath":{
+               "path":"/tmp",
+               "type":"Directory"
+            }
+         }
+      ],
+      "driver":{
+         "cores":1,
+         "coreLimit":"1200m",
+         "memory":"512m",
+         "labels":{
+            "version":"2.4.5"
+         },
+         "serviceAccount":"spark",
+         "volumeMounts":[
+            {
+               "name":"test-volume",
+               "mountPath":"/tmp"
+            }
+         ]
+      },
+      "executor":{
+         "cores":1,
+         "instances":1,
+         "memory":"512m",
+         "labels":{
+            "version":"2.4.5"
+         },
+         "volumeMounts":[
+            {
+               "name":"test-volume",
+               "mountPath":"/tmp"
+            }
+         ]
+      }
+   }
+}
diff --git 
a/tests/providers/cncf/kubernetes/data_files/spark/application_test_with_no_name_from_config.yaml
 
b/tests/providers/cncf/kubernetes/data_files/spark/application_test_with_no_name_from_config.yaml
new file mode 100644
index 0000000000..9172398095
--- /dev/null
+++ 
b/tests/providers/cncf/kubernetes/data_files/spark/application_test_with_no_name_from_config.yaml
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+apiVersion: "sparkoperator.k8s.io/v1beta2"
+kind: SparkApplication
+metadata:
+  namespace: default
+spec:
+  type: Scala
+  mode: cluster
+  image: "gcr.io/spark-operator/spark:v2.4.5"
+  imagePullPolicy: Always
+  mainClass: org.apache.spark.examples.SparkPi
+  mainApplicationFile: 
"local:///opt/spark/examples/jars/spark-examples_2.11-2.4.5.jar"
+  sparkVersion: "2.4.5"
+  restartPolicy:
+    type: Never
+  volumes:
+    - name: "test-volume"
+      hostPath:
+        path: "/tmp"
+        type: Directory
+  driver:
+    cores: 1
+    coreLimit: "1200m"
+    memory: "512m"
+    labels:
+      version: 2.4.5
+    serviceAccount: spark
+    volumeMounts:
+      - name: "test-volume"
+        mountPath: "/tmp"
+  executor:
+    cores: 1
+    instances: 1
+    memory: "512m"
+    labels:
+      version: 2.4.5
+    volumeMounts:
+      - name: "test-volume"
+        mountPath: "/tmp"
diff --git a/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py 
b/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py
index bc8404aa85..9c8c40de65 100644
--- a/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py
+++ b/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py
@@ -273,6 +273,149 @@ class TestSparkKubernetesOperator:
             version="v1beta2",
         )
 
+    def test_create_application_from_yaml_json_and_use_name_from_metadata(
+        self,
+        mock_create_namespaced_crd,
+        mock_get_namespaced_custom_object_status,
+        mock_cleanup,
+        mock_create_job_name,
+        mock_get_kube_client,
+        mock_create_pod,
+        mock_await_pod_start,
+        mock_await_pod_completion,
+        mock_fetch_requested_container_logs,
+        data_file,
+    ):
+        op = SparkKubernetesOperator(
+            
application_file=data_file("spark/application_test.yaml").as_posix(),
+            kubernetes_conn_id="kubernetes_default_kube_config",
+            task_id="create_app_and_use_name_from_metadata",
+        )
+        context = create_context(op)
+        op.execute(context)
+        TEST_APPLICATION_DICT["metadata"]["name"] = op.name
+        mock_create_namespaced_crd.assert_called_with(
+            body=TEST_APPLICATION_DICT,
+            group="sparkoperator.k8s.io",
+            namespace="default",
+            plural="sparkapplications",
+            version="v1beta2",
+        )
+        assert op.name.startswith("default_yaml")
+
+        op = SparkKubernetesOperator(
+            
application_file=data_file("spark/application_test.json").as_posix(),
+            kubernetes_conn_id="kubernetes_default_kube_config",
+            task_id="create_app_and_use_name_from_metadata",
+        )
+        context = create_context(op)
+        op.execute(context)
+        TEST_APPLICATION_DICT["metadata"]["name"] = op.name
+        mock_create_namespaced_crd.assert_called_with(
+            body=TEST_APPLICATION_DICT,
+            group="sparkoperator.k8s.io",
+            namespace="default",
+            plural="sparkapplications",
+            version="v1beta2",
+        )
+        assert op.name.startswith("default_json")
+
+    def test_create_application_from_yaml_json_and_use_name_from_operator_args(
+        self,
+        mock_create_namespaced_crd,
+        mock_get_namespaced_custom_object_status,
+        mock_cleanup,
+        mock_create_job_name,
+        mock_get_kube_client,
+        mock_create_pod,
+        mock_await_pod_start,
+        mock_await_pod_completion,
+        mock_fetch_requested_container_logs,
+        data_file,
+    ):
+        op = SparkKubernetesOperator(
+            
application_file=data_file("spark/application_test.yaml").as_posix(),
+            kubernetes_conn_id="kubernetes_default_kube_config",
+            task_id="default_yaml",
+            name="test-spark",
+        )
+        context = create_context(op)
+        op.execute(context)
+        TEST_APPLICATION_DICT["metadata"]["name"] = op.name
+        mock_create_namespaced_crd.assert_called_with(
+            body=TEST_APPLICATION_DICT,
+            group="sparkoperator.k8s.io",
+            namespace="default",
+            plural="sparkapplications",
+            version="v1beta2",
+        )
+        assert op.name.startswith("test-spark")
+
+        op = SparkKubernetesOperator(
+            
application_file=data_file("spark/application_test.json").as_posix(),
+            kubernetes_conn_id="kubernetes_default_kube_config",
+            task_id="default_json",
+            name="test-spark",
+        )
+        context = create_context(op)
+        op.execute(context)
+        TEST_APPLICATION_DICT["metadata"]["name"] = op.name
+        mock_create_namespaced_crd.assert_called_with(
+            body=TEST_APPLICATION_DICT,
+            group="sparkoperator.k8s.io",
+            namespace="default",
+            plural="sparkapplications",
+            version="v1beta2",
+        )
+        assert op.name.startswith("test-spark")
+
+    def test_create_application_from_yaml_json_and_use_name_task_id(
+        self,
+        mock_create_namespaced_crd,
+        mock_get_namespaced_custom_object_status,
+        mock_cleanup,
+        mock_create_job_name,
+        mock_get_kube_client,
+        mock_create_pod,
+        mock_await_pod_start,
+        mock_await_pod_completion,
+        mock_fetch_requested_container_logs,
+        data_file,
+    ):
+        op = SparkKubernetesOperator(
+            
application_file=data_file("spark/application_test_with_no_name_from_config.yaml").as_posix(),
+            kubernetes_conn_id="kubernetes_default_kube_config",
+            task_id="create_app_and_use_name_from_task_id",
+        )
+        context = create_context(op)
+        op.execute(context)
+        TEST_APPLICATION_DICT["metadata"]["name"] = op.name
+        mock_create_namespaced_crd.assert_called_with(
+            body=TEST_APPLICATION_DICT,
+            group="sparkoperator.k8s.io",
+            namespace="default",
+            plural="sparkapplications",
+            version="v1beta2",
+        )
+        assert op.name.startswith("create_app_and_use_name_from_task_id")
+
+        op = SparkKubernetesOperator(
+            
application_file=data_file("spark/application_test_with_no_name_from_config.json").as_posix(),
+            kubernetes_conn_id="kubernetes_default_kube_config",
+            task_id="create_app_and_use_name_from_task_id",
+        )
+        context = create_context(op)
+        op.execute(context)
+        TEST_APPLICATION_DICT["metadata"]["name"] = op.name
+        mock_create_namespaced_crd.assert_called_with(
+            body=TEST_APPLICATION_DICT,
+            group="sparkoperator.k8s.io",
+            namespace="default",
+            plural="sparkapplications",
+            version="v1beta2",
+        )
+        assert op.name.startswith("create_app_and_use_name_from_task_id")
+
     def test_new_template_from_yaml(
         self,
         mock_create_namespaced_crd,

Reply via email to