This is an automated email from the ASF dual-hosted git repository.
rom pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 223acdb13f Fix SparkKubernetesOperator spark name. (#42427)
223acdb13f is described below
commit 223acdb13fd479f4a0aede9f866bcae8d918b91f
Author: GPK <[email protected]>
AuthorDate: Fri Sep 27 13:55:24 2024 +0100
Fix SparkKubernetesOperator spark name. (#42427)
* use name parameter from spark yaml config or from operator argument
parameter
* update tests and name usage condition check
* adding test, to check spark name starts with task_id
* use set_name function in create_job
* remove lower
---
.../cncf/kubernetes/operators/spark_kubernetes.py | 15 ++-
.../application_test_with_no_name_from_config.json | 57 ++++++++
.../application_test_with_no_name_from_config.yaml | 55 ++++++++
.../kubernetes/operators/test_spark_kubernetes.py | 143 +++++++++++++++++++++
4 files changed, 265 insertions(+), 5 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
b/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
index 39fadae90e..9bcf46d0d4 100644
--- a/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
@@ -17,7 +17,6 @@
# under the License.
from __future__ import annotations
-import re
from functools import cached_property
from pathlib import Path
from typing import TYPE_CHECKING, Any
@@ -83,7 +82,7 @@ class SparkKubernetesOperator(KubernetesPodOperator):
image: str | None = None,
code_path: str | None = None,
namespace: str = "default",
- name: str = "default",
+ name: str | None = None,
application_file: str | None = None,
template_spec=None,
get_logs: bool = True,
@@ -103,7 +102,6 @@ class SparkKubernetesOperator(KubernetesPodOperator):
self.code_path = code_path
self.application_file = application_file
self.template_spec = template_spec
- self.name = self.create_job_name()
self.kubernetes_conn_id = kubernetes_conn_id
self.startup_timeout_seconds = startup_timeout_seconds
self.reattach_on_restart = reattach_on_restart
@@ -161,8 +159,13 @@ class SparkKubernetesOperator(KubernetesPodOperator):
return template_body
def create_job_name(self):
- initial_name = add_unique_suffix(name=self.task_id,
max_len=MAX_LABEL_LEN)
- return re.sub(r"[^a-z0-9-]+", "-", initial_name.lower())
+ name = (
+ self.name or self.template_body.get("spark", {}).get("metadata",
{}).get("name") or self.task_id
+ )
+
+ updated_name = add_unique_suffix(name=name, max_len=MAX_LABEL_LEN)
+
+ return self._set_name(updated_name)
@staticmethod
def _get_pod_identifying_label_string(labels) -> str:
@@ -282,6 +285,8 @@ class SparkKubernetesOperator(KubernetesPodOperator):
return CustomObjectsApi()
def execute(self, context: Context):
+ self.name = self.create_job_name()
+
self.log.info("Creating sparkApplication.")
self.launcher = CustomObjectLauncher(
name=self.name,
diff --git
a/tests/providers/cncf/kubernetes/data_files/spark/application_test_with_no_name_from_config.json
b/tests/providers/cncf/kubernetes/data_files/spark/application_test_with_no_name_from_config.json
new file mode 100644
index 0000000000..1504c40fbd
--- /dev/null
+++
b/tests/providers/cncf/kubernetes/data_files/spark/application_test_with_no_name_from_config.json
@@ -0,0 +1,57 @@
+{
+ "apiVersion":"sparkoperator.k8s.io/v1beta2",
+ "kind":"SparkApplication",
+ "metadata":{
+ "namespace":"default"
+ },
+ "spec":{
+ "type":"Scala",
+ "mode":"cluster",
+ "image":"gcr.io/spark-operator/spark:v2.4.5",
+ "imagePullPolicy":"Always",
+ "mainClass":"org.apache.spark.examples.SparkPi",
+
"mainApplicationFile":"local:///opt/spark/examples/jars/spark-examples_2.11-2.4.5.jar",
+ "sparkVersion":"2.4.5",
+ "restartPolicy":{
+ "type":"Never"
+ },
+ "volumes":[
+ {
+ "name":"test-volume",
+ "hostPath":{
+ "path":"/tmp",
+ "type":"Directory"
+ }
+ }
+ ],
+ "driver":{
+ "cores":1,
+ "coreLimit":"1200m",
+ "memory":"512m",
+ "labels":{
+ "version":"2.4.5"
+ },
+ "serviceAccount":"spark",
+ "volumeMounts":[
+ {
+ "name":"test-volume",
+ "mountPath":"/tmp"
+ }
+ ]
+ },
+ "executor":{
+ "cores":1,
+ "instances":1,
+ "memory":"512m",
+ "labels":{
+ "version":"2.4.5"
+ },
+ "volumeMounts":[
+ {
+ "name":"test-volume",
+ "mountPath":"/tmp"
+ }
+ ]
+ }
+ }
+}
diff --git
a/tests/providers/cncf/kubernetes/data_files/spark/application_test_with_no_name_from_config.yaml
b/tests/providers/cncf/kubernetes/data_files/spark/application_test_with_no_name_from_config.yaml
new file mode 100644
index 0000000000..9172398095
--- /dev/null
+++
b/tests/providers/cncf/kubernetes/data_files/spark/application_test_with_no_name_from_config.yaml
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+apiVersion: "sparkoperator.k8s.io/v1beta2"
+kind: SparkApplication
+metadata:
+ namespace: default
+spec:
+ type: Scala
+ mode: cluster
+ image: "gcr.io/spark-operator/spark:v2.4.5"
+ imagePullPolicy: Always
+ mainClass: org.apache.spark.examples.SparkPi
+ mainApplicationFile:
"local:///opt/spark/examples/jars/spark-examples_2.11-2.4.5.jar"
+ sparkVersion: "2.4.5"
+ restartPolicy:
+ type: Never
+ volumes:
+ - name: "test-volume"
+ hostPath:
+ path: "/tmp"
+ type: Directory
+ driver:
+ cores: 1
+ coreLimit: "1200m"
+ memory: "512m"
+ labels:
+ version: 2.4.5
+ serviceAccount: spark
+ volumeMounts:
+ - name: "test-volume"
+ mountPath: "/tmp"
+ executor:
+ cores: 1
+ instances: 1
+ memory: "512m"
+ labels:
+ version: 2.4.5
+ volumeMounts:
+ - name: "test-volume"
+ mountPath: "/tmp"
diff --git a/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py
b/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py
index bc8404aa85..9c8c40de65 100644
--- a/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py
+++ b/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py
@@ -273,6 +273,149 @@ class TestSparkKubernetesOperator:
version="v1beta2",
)
+ def test_create_application_from_yaml_json_and_use_name_from_metadata(
+ self,
+ mock_create_namespaced_crd,
+ mock_get_namespaced_custom_object_status,
+ mock_cleanup,
+ mock_create_job_name,
+ mock_get_kube_client,
+ mock_create_pod,
+ mock_await_pod_start,
+ mock_await_pod_completion,
+ mock_fetch_requested_container_logs,
+ data_file,
+ ):
+ op = SparkKubernetesOperator(
+
application_file=data_file("spark/application_test.yaml").as_posix(),
+ kubernetes_conn_id="kubernetes_default_kube_config",
+ task_id="create_app_and_use_name_from_metadata",
+ )
+ context = create_context(op)
+ op.execute(context)
+ TEST_APPLICATION_DICT["metadata"]["name"] = op.name
+ mock_create_namespaced_crd.assert_called_with(
+ body=TEST_APPLICATION_DICT,
+ group="sparkoperator.k8s.io",
+ namespace="default",
+ plural="sparkapplications",
+ version="v1beta2",
+ )
+ assert op.name.startswith("default_yaml")
+
+ op = SparkKubernetesOperator(
+
application_file=data_file("spark/application_test.json").as_posix(),
+ kubernetes_conn_id="kubernetes_default_kube_config",
+ task_id="create_app_and_use_name_from_metadata",
+ )
+ context = create_context(op)
+ op.execute(context)
+ TEST_APPLICATION_DICT["metadata"]["name"] = op.name
+ mock_create_namespaced_crd.assert_called_with(
+ body=TEST_APPLICATION_DICT,
+ group="sparkoperator.k8s.io",
+ namespace="default",
+ plural="sparkapplications",
+ version="v1beta2",
+ )
+ assert op.name.startswith("default_json")
+
+ def test_create_application_from_yaml_json_and_use_name_from_operator_args(
+ self,
+ mock_create_namespaced_crd,
+ mock_get_namespaced_custom_object_status,
+ mock_cleanup,
+ mock_create_job_name,
+ mock_get_kube_client,
+ mock_create_pod,
+ mock_await_pod_start,
+ mock_await_pod_completion,
+ mock_fetch_requested_container_logs,
+ data_file,
+ ):
+ op = SparkKubernetesOperator(
+
application_file=data_file("spark/application_test.yaml").as_posix(),
+ kubernetes_conn_id="kubernetes_default_kube_config",
+ task_id="default_yaml",
+ name="test-spark",
+ )
+ context = create_context(op)
+ op.execute(context)
+ TEST_APPLICATION_DICT["metadata"]["name"] = op.name
+ mock_create_namespaced_crd.assert_called_with(
+ body=TEST_APPLICATION_DICT,
+ group="sparkoperator.k8s.io",
+ namespace="default",
+ plural="sparkapplications",
+ version="v1beta2",
+ )
+ assert op.name.startswith("test-spark")
+
+ op = SparkKubernetesOperator(
+
application_file=data_file("spark/application_test.json").as_posix(),
+ kubernetes_conn_id="kubernetes_default_kube_config",
+ task_id="default_json",
+ name="test-spark",
+ )
+ context = create_context(op)
+ op.execute(context)
+ TEST_APPLICATION_DICT["metadata"]["name"] = op.name
+ mock_create_namespaced_crd.assert_called_with(
+ body=TEST_APPLICATION_DICT,
+ group="sparkoperator.k8s.io",
+ namespace="default",
+ plural="sparkapplications",
+ version="v1beta2",
+ )
+ assert op.name.startswith("test-spark")
+
+ def test_create_application_from_yaml_json_and_use_name_task_id(
+ self,
+ mock_create_namespaced_crd,
+ mock_get_namespaced_custom_object_status,
+ mock_cleanup,
+ mock_create_job_name,
+ mock_get_kube_client,
+ mock_create_pod,
+ mock_await_pod_start,
+ mock_await_pod_completion,
+ mock_fetch_requested_container_logs,
+ data_file,
+ ):
+ op = SparkKubernetesOperator(
+
application_file=data_file("spark/application_test_with_no_name_from_config.yaml").as_posix(),
+ kubernetes_conn_id="kubernetes_default_kube_config",
+ task_id="create_app_and_use_name_from_task_id",
+ )
+ context = create_context(op)
+ op.execute(context)
+ TEST_APPLICATION_DICT["metadata"]["name"] = op.name
+ mock_create_namespaced_crd.assert_called_with(
+ body=TEST_APPLICATION_DICT,
+ group="sparkoperator.k8s.io",
+ namespace="default",
+ plural="sparkapplications",
+ version="v1beta2",
+ )
+ assert op.name.startswith("create_app_and_use_name_from_task_id")
+
+ op = SparkKubernetesOperator(
+
application_file=data_file("spark/application_test_with_no_name_from_config.json").as_posix(),
+ kubernetes_conn_id="kubernetes_default_kube_config",
+ task_id="create_app_and_use_name_from_task_id",
+ )
+ context = create_context(op)
+ op.execute(context)
+ TEST_APPLICATION_DICT["metadata"]["name"] = op.name
+ mock_create_namespaced_crd.assert_called_with(
+ body=TEST_APPLICATION_DICT,
+ group="sparkoperator.k8s.io",
+ namespace="default",
+ plural="sparkapplications",
+ version="v1beta2",
+ )
+ assert op.name.startswith("create_app_and_use_name_from_task_id")
+
def test_new_template_from_yaml(
self,
mock_create_namespaced_crd,