This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 63b2bbd167a Add random_name_suffix to SparkKubernetesOperator (#43800) 
(#43847)
63b2bbd167a is described below

commit 63b2bbd167af1105f1fdd340c24fbf0f1bb6ec90
Author: Mark Andreev <[email protected]>
AuthorDate: Sat Nov 9 17:22:37 2024 +0000

    Add random_name_suffix to SparkKubernetesOperator (#43800) (#43847)
    
    * Add random_name_suffix to SparkKubernetesOperator (#43800)
    
    Prior to this change, `random_name_suffix` was only documented but not 
implemented as a configurable option. Passing this value as an argument had no 
effect. This commit introduces a `false` option for `random_name_suffix`, which 
prevents the generation of a random suffix for the pod name. For compatibility, 
the default value is set to `true`, ensuring the pod name will still conform to 
`MAX_LABEL_LEN = 63`.
    
    Fixes: #43800
    
    * Add random_name_suffix to SparkKubernetesOperator (#43800)
    
    Prior to this change, `random_name_suffix` was only documented but not 
implemented as a configurable option. Passing this value as an argument had no 
effect. This commit introduces a `false` option for `random_name_suffix`, which 
prevents the generation of a random suffix for the pod name. For compatibility, 
the default value is set to `true`, ensuring the pod name will still conform to 
`MAX_LABEL_LEN = 63`.
    
    Fixes: #43800
---
 .../cncf/kubernetes/operators/spark_kubernetes.py  |  9 +++++++-
 .../kubernetes/operators/test_spark_kubernetes.py  | 26 ++++++++++++++++++++++
 2 files changed, 34 insertions(+), 1 deletion(-)

diff --git 
a/providers/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py 
b/providers/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
index c1f5b36d6d3..acab68d85ff 100644
--- 
a/providers/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
+++ 
b/providers/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
@@ -68,6 +68,7 @@ class SparkKubernetesOperator(KubernetesPodOperator):
         state, or the execution is interrupted. If True (default), delete the
         pod; if False, leave the pod.
     :param kubernetes_conn_id: the connection to Kubernetes cluster
+    :param random_name_suffix: If True, adds a random suffix to the pod name
     """
 
     template_fields = ["application_file", "namespace", "template_spec", 
"kubernetes_conn_id"]
@@ -94,6 +95,7 @@ class SparkKubernetesOperator(KubernetesPodOperator):
         reattach_on_restart: bool = True,
         delete_on_termination: bool = True,
         kubernetes_conn_id: str = "kubernetes_default",
+        random_name_suffix: bool = True,
         **kwargs,
     ) -> None:
         if kwargs.get("xcom_push") is not None:
@@ -112,6 +114,7 @@ class SparkKubernetesOperator(KubernetesPodOperator):
         self.get_logs = get_logs
         self.log_events_on_failure = log_events_on_failure
         self.success_run_history_limit = success_run_history_limit
+        self.random_name_suffix = random_name_suffix
 
         if self.base_container_name != self.BASE_CONTAINER_NAME:
             self.log.warning(
@@ -164,7 +167,11 @@ class SparkKubernetesOperator(KubernetesPodOperator):
             self.name or self.template_body.get("spark", {}).get("metadata", 
{}).get("name") or self.task_id
         )
 
-        updated_name = add_unique_suffix(name=name, max_len=MAX_LABEL_LEN)
+        if self.random_name_suffix:
+            updated_name = add_unique_suffix(name=name, max_len=MAX_LABEL_LEN)
+        else:
+            # truncation is required to maintain the same behavior as before
+            updated_name = name[:MAX_LABEL_LEN]
 
         return self._set_name(updated_name)
 
diff --git a/providers/tests/cncf/kubernetes/operators/test_spark_kubernetes.py 
b/providers/tests/cncf/kubernetes/operators/test_spark_kubernetes.py
index 39a649b613a..24372dcdca4 100644
--- a/providers/tests/cncf/kubernetes/operators/test_spark_kubernetes.py
+++ b/providers/tests/cncf/kubernetes/operators/test_spark_kubernetes.py
@@ -22,6 +22,7 @@ import json
 from datetime import date
 from unittest import mock
 from unittest.mock import patch
+from uuid import uuid4
 
 import pendulum
 import pytest
@@ -31,6 +32,7 @@ from kubernetes.client import models as k8s
 from airflow import DAG
 from airflow.models import Connection, DagRun, TaskInstance
 from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import 
SparkKubernetesOperator
+from airflow.providers.cncf.kubernetes.pod_generator import MAX_LABEL_LEN
 from airflow.utils import db, timezone
 from airflow.utils.types import DagRunType
 
@@ -828,3 +830,27 @@ def 
test_resolve_application_file_real_file_not_exists(create_task_instance_of_o
     task: SparkKubernetesOperator = ti.task
     with pytest.raises(TypeError, match="application_file body can't 
transformed into the dictionary"):
         _ = task.template_body
+
+
[email protected](
+    "random_name_suffix",
+    [pytest.param(True, id="use-random_name_suffix"), pytest.param(False, 
id="skip-random_name_suffix")],
+)
+def test_create_job_name(random_name_suffix: bool):
+    name = f"x{uuid4()}"
+    op = SparkKubernetesOperator(task_id="task_id", name=name, 
random_name_suffix=random_name_suffix)
+    pod_name = op.create_job_name()
+
+    if random_name_suffix:
+        assert pod_name.startswith(name)
+        assert pod_name != name
+    else:
+        assert pod_name == name
+
+
+def test_create_job_name_should_truncate_long_names():
+    long_name = f"{uuid4()}" + "x" * MAX_LABEL_LEN
+    op = SparkKubernetesOperator(task_id="task_id", name=long_name, 
random_name_suffix=False)
+    pod_name = op.create_job_name()
+
+    assert pod_name == long_name[:MAX_LABEL_LEN]

Reply via email to