This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 63b2bbd167a Add random_name_suffix to SparkKubernetesOperator (#43800)
(#43847)
63b2bbd167a is described below
commit 63b2bbd167af1105f1fdd340c24fbf0f1bb6ec90
Author: Mark Andreev <[email protected]>
AuthorDate: Sat Nov 9 17:22:37 2024 +0000
Add random_name_suffix to SparkKubernetesOperator (#43800) (#43847)
* Add random_name_suffix to SparkKubernetesOperator (#43800)
Prior to this change, `random_name_suffix` was only documented but not
implemented as a configurable option. Passing this value as an argument had no
effect. This commit introduces a `false` option for `random_name_suffix`, which
prevents the generation of a random suffix for the pod name. For compatibility,
the default value is set to `true`, ensuring the pod name will still conform to
`MAX_LABEL_LEN = 63`.
Fixes: #43800
* Add random_name_suffix to SparkKubernetesOperator (#43800)
Prior to this change, `random_name_suffix` was only documented but not
implemented as a configurable option. Passing this value as an argument had no
effect. This commit introduces a `false` option for `random_name_suffix`, which
prevents the generation of a random suffix for the pod name. For compatibility,
the default value is set to `true`, ensuring the pod name will still conform to
`MAX_LABEL_LEN = 63`.
Fixes: #43800
---
.../cncf/kubernetes/operators/spark_kubernetes.py | 9 +++++++-
.../kubernetes/operators/test_spark_kubernetes.py | 26 ++++++++++++++++++++++
2 files changed, 34 insertions(+), 1 deletion(-)
diff --git
a/providers/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
b/providers/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
index c1f5b36d6d3..acab68d85ff 100644
---
a/providers/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
+++
b/providers/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
@@ -68,6 +68,7 @@ class SparkKubernetesOperator(KubernetesPodOperator):
state, or the execution is interrupted. If True (default), delete the
pod; if False, leave the pod.
:param kubernetes_conn_id: the connection to Kubernetes cluster
+ :param random_name_suffix: If True, adds a random suffix to the pod name
"""
template_fields = ["application_file", "namespace", "template_spec",
"kubernetes_conn_id"]
@@ -94,6 +95,7 @@ class SparkKubernetesOperator(KubernetesPodOperator):
reattach_on_restart: bool = True,
delete_on_termination: bool = True,
kubernetes_conn_id: str = "kubernetes_default",
+ random_name_suffix: bool = True,
**kwargs,
) -> None:
if kwargs.get("xcom_push") is not None:
@@ -112,6 +114,7 @@ class SparkKubernetesOperator(KubernetesPodOperator):
self.get_logs = get_logs
self.log_events_on_failure = log_events_on_failure
self.success_run_history_limit = success_run_history_limit
+ self.random_name_suffix = random_name_suffix
if self.base_container_name != self.BASE_CONTAINER_NAME:
self.log.warning(
@@ -164,7 +167,11 @@ class SparkKubernetesOperator(KubernetesPodOperator):
self.name or self.template_body.get("spark", {}).get("metadata",
{}).get("name") or self.task_id
)
- updated_name = add_unique_suffix(name=name, max_len=MAX_LABEL_LEN)
+ if self.random_name_suffix:
+ updated_name = add_unique_suffix(name=name, max_len=MAX_LABEL_LEN)
+ else:
+ # truncation is required to maintain the same behavior as before
+ updated_name = name[:MAX_LABEL_LEN]
return self._set_name(updated_name)
diff --git a/providers/tests/cncf/kubernetes/operators/test_spark_kubernetes.py
b/providers/tests/cncf/kubernetes/operators/test_spark_kubernetes.py
index 39a649b613a..24372dcdca4 100644
--- a/providers/tests/cncf/kubernetes/operators/test_spark_kubernetes.py
+++ b/providers/tests/cncf/kubernetes/operators/test_spark_kubernetes.py
@@ -22,6 +22,7 @@ import json
from datetime import date
from unittest import mock
from unittest.mock import patch
+from uuid import uuid4
import pendulum
import pytest
@@ -31,6 +32,7 @@ from kubernetes.client import models as k8s
from airflow import DAG
from airflow.models import Connection, DagRun, TaskInstance
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import
SparkKubernetesOperator
+from airflow.providers.cncf.kubernetes.pod_generator import MAX_LABEL_LEN
from airflow.utils import db, timezone
from airflow.utils.types import DagRunType
@@ -828,3 +830,27 @@ def
test_resolve_application_file_real_file_not_exists(create_task_instance_of_o
task: SparkKubernetesOperator = ti.task
with pytest.raises(TypeError, match="application_file body can't
transformed into the dictionary"):
_ = task.template_body
+
+
[email protected](
+ "random_name_suffix",
+ [pytest.param(True, id="use-random_name_suffix"), pytest.param(False,
id="skip-random_name_suffix")],
+)
+def test_create_job_name(random_name_suffix: bool):
+ name = f"x{uuid4()}"
+ op = SparkKubernetesOperator(task_id="task_id", name=name,
random_name_suffix=random_name_suffix)
+ pod_name = op.create_job_name()
+
+ if random_name_suffix:
+ assert pod_name.startswith(name)
+ assert pod_name != name
+ else:
+ assert pod_name == name
+
+
+def test_create_job_name_should_truncate_long_names():
+ long_name = f"{uuid4()}" + "x" * MAX_LABEL_LEN
+ op = SparkKubernetesOperator(task_id="task_id", name=long_name,
random_name_suffix=False)
+ pod_name = op.create_job_name()
+
+ assert pod_name == long_name[:MAX_LABEL_LEN]