This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 0b9d17f  [SPARK-49838] Add `spark-version` label to `Spark Cluster` 
resources
0b9d17f is described below

commit 0b9d17feaa218b07904fbeed1f6037ec970d15ff
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Mon Sep 30 12:17:10 2024 -0700

    [SPARK-49838] Add `spark-version` label to `Spark Cluster` resources
    
    ### What changes were proposed in this pull request?
    
    This PR aims to add `spark-version` label to `Spark Cluster` resources like 
`Spark Application`
    
    ### Why are the changes needed?
    
    `spark-version` is an important label to distinguish and search in the 
production environment. This PR will add `spark-version` to the following 
resources.
    - `Master Service`
    - `Master Statefulset`
    - `Master Pod`
    - `Worker Service`
    - `Worker Statefulset`
    - `Worker HorizontalPodAutoscaler`
    - `Worker Pod`
    
    We can use `spark-version` as a selector like the following.
    ```
    $ kubectl get svc
    NAME                          TYPE        CLUSTER-IP   EXTERNAL-IP   
PORT(S)                      AGE
    cluster-with-hpa-master-svc   ClusterIP   None         <none>        
8080/TCP,7077/TCP,6066/TCP   39s
    cluster-with-hpa-worker-svc   ClusterIP   None         <none>        
8081/TCP                     39s
    kubernetes                    ClusterIP   10.96.0.1    <none>        
443/TCP                      6d12h
    
    $ kubectl get svc -l spark-version=4.0.0-preview2
    NAME                          TYPE        CLUSTER-IP   EXTERNAL-IP   
PORT(S)                      AGE
    cluster-with-hpa-master-svc   ClusterIP   None         <none>        
8080/TCP,7077/TCP,6066/TCP   89s
    cluster-with-hpa-worker-svc   ClusterIP   None         <none>        
8081/TCP                     89s
    
    $ kubectl get pod -l spark-version=4.0.0-preview2
    NAME                        READY   STATUS    RESTARTS   AGE
    cluster-with-hpa-master-0   1/1     Running   0          17m
    cluster-with-hpa-worker-0   1/1     Running   0          17m
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, this is not released yet.
    
    ### How was this patch tested?
    
    Pass the CIs with the revised test cases.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #142 from dongjoon-hyun/SPARK-49838.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../org/apache/spark/k8s/operator/Constants.java   |  1 +
 .../org/apache/spark/k8s/operator/utils/Utils.java |  2 +
 .../k8s/operator/SparkClusterResourceSpec.java     | 28 +++++++++++--
 .../k8s/operator/SparkClusterResourceSpecTest.java | 47 ++++++++++++++++++++++
 .../operator/SparkClusterSubmissionWorkerTest.java |  3 ++
 5 files changed, 77 insertions(+), 4 deletions(-)

diff --git 
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java 
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java
index 8c7fc78..72513ff 100644
--- 
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java
+++ 
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java
@@ -34,6 +34,7 @@ public class Constants {
   public static final String LABEL_SPARK_ROLE_CLUSTER_VALUE = "cluster";
   public static final String LABEL_SPARK_ROLE_MASTER_VALUE = "master";
   public static final String LABEL_SPARK_ROLE_WORKER_VALUE = "worker";
+  public static final String LABEL_SPARK_VERSION_NAME = "spark-version";
   public static final String SENTINEL_RESOURCE_DUMMY_FIELD = 
"sentinel.dummy.number";
 
   public static final String DRIVER_SPARK_CONTAINER_PROP_KEY =
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java
index 36c826c..7705a1d 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java
@@ -24,6 +24,7 @@ import static 
org.apache.spark.k8s.operator.Constants.LABEL_SPARK_OPERATOR_NAME;
 import static 
org.apache.spark.k8s.operator.Constants.LABEL_SPARK_ROLE_CLUSTER_VALUE;
 import static 
org.apache.spark.k8s.operator.Constants.LABEL_SPARK_ROLE_DRIVER_VALUE;
 import static 
org.apache.spark.k8s.operator.Constants.LABEL_SPARK_ROLE_EXECUTOR_VALUE;
+import static org.apache.spark.k8s.operator.Constants.LABEL_SPARK_VERSION_NAME;
 import static 
org.apache.spark.k8s.operator.config.SparkOperatorConf.OPERATOR_APP_NAME;
 import static 
org.apache.spark.k8s.operator.config.SparkOperatorConf.OPERATOR_WATCHED_NAMESPACES;
 import static 
org.apache.spark.k8s.operator.config.SparkOperatorConf.SPARK_APP_STATUS_LISTENER_CLASS_NAMES;
@@ -111,6 +112,7 @@ public final class Utils {
   public static Map<String, String> sparkClusterResourceLabels(final 
SparkCluster cluster) {
     Map<String, String> labels = commonManagedResourceLabels();
     labels.put(Constants.LABEL_SPARK_CLUSTER_NAME, 
cluster.getMetadata().getName());
+    labels.put(LABEL_SPARK_VERSION_NAME, 
cluster.getSpec().getRuntimeVersions().getSparkVersion());
     return labels;
   }
 
diff --git 
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
 
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
index e7bd8d4..158e0ee 100644
--- 
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
+++ 
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
@@ -61,6 +61,7 @@ public class SparkClusterResourceSpec {
     String namespace = conf.get(Config.KUBERNETES_NAMESPACE().key(), 
clusterNamespace);
     String image = conf.get(Config.CONTAINER_IMAGE().key(), 
"apache/spark:4.0.0-preview2");
     ClusterSpec spec = cluster.getSpec();
+    String version = spec.getRuntimeVersions().getSparkVersion();
     StringBuilder options = new StringBuilder();
     for (Tuple2<String, String> t : conf.getAll()) {
       options.append(String.format("-D%s=\"%s\" ", t._1, t._2));
@@ -69,15 +70,24 @@ public class SparkClusterResourceSpec {
     WorkerSpec workerSpec = spec.getWorkerSpec();
     masterService =
         buildMasterService(
-            clusterName, namespace, masterSpec.getServiceMetadata(), 
masterSpec.getServiceSpec());
+            clusterName,
+            namespace,
+            version,
+            masterSpec.getServiceMetadata(),
+            masterSpec.getServiceSpec());
     workerService =
         buildWorkerService(
-            clusterName, namespace, workerSpec.getServiceMetadata(), 
workerSpec.getServiceSpec());
+            clusterName,
+            namespace,
+            version,
+            workerSpec.getServiceMetadata(),
+            workerSpec.getServiceSpec());
     masterStatefulSet =
         buildMasterStatefulSet(
             scheduler,
             clusterName,
             namespace,
+            version,
             image,
             options.toString(),
             masterSpec.getStatefulSetMetadata(),
@@ -87,6 +97,7 @@ public class SparkClusterResourceSpec {
             scheduler,
             clusterName,
             namespace,
+            version,
             image,
             spec.getClusterTolerations().getInstanceConfig().getInitWorkers(),
             options.toString(),
@@ -96,11 +107,12 @@ public class SparkClusterResourceSpec {
   }
 
   private static Service buildMasterService(
-      String name, String namespace, ObjectMeta metadata, ServiceSpec 
serviceSpec) {
+      String name, String namespace, String version, ObjectMeta metadata, 
ServiceSpec serviceSpec) {
     return new ServiceBuilder()
         .withNewMetadataLike(metadata)
         .withName(name + "-master-svc")
         .addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_MASTER_VALUE)
+        .addToLabels(LABEL_SPARK_VERSION_NAME, version)
         .withNamespace(namespace)
         .endMetadata()
         .withNewSpecLike(serviceSpec)
@@ -127,11 +139,12 @@ public class SparkClusterResourceSpec {
   }
 
   private static Service buildWorkerService(
-      String name, String namespace, ObjectMeta metadata, ServiceSpec 
serviceSpec) {
+      String name, String namespace, String version, ObjectMeta metadata, 
ServiceSpec serviceSpec) {
     return new ServiceBuilder()
         .withNewMetadataLike(metadata)
         .withName(name + "-worker-svc")
         .addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE)
+        .addToLabels(LABEL_SPARK_VERSION_NAME, version)
         .withNamespace(namespace)
         .endMetadata()
         .withNewSpecLike(serviceSpec)
@@ -151,6 +164,7 @@ public class SparkClusterResourceSpec {
       String scheduler,
       String name,
       String namespace,
+      String version,
       String image,
       String options,
       ObjectMeta objectMeta,
@@ -160,6 +174,7 @@ public class SparkClusterResourceSpec {
             .withNewMetadataLike(objectMeta)
             .withName(name + "-master")
             .addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_MASTER_VALUE)
+            .addToLabels(LABEL_SPARK_VERSION_NAME, version)
             .withNamespace(namespace)
             .endMetadata()
             .withNewSpecLike(statefulSetSpec)
@@ -171,6 +186,7 @@ public class SparkClusterResourceSpec {
             .editOrNewTemplate()
             .editOrNewMetadata()
             .addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_MASTER_VALUE)
+            .addToLabels(LABEL_SPARK_VERSION_NAME, version)
             .endMetadata()
             .editOrNewSpec()
             .withSchedulerName(scheduler)
@@ -213,6 +229,7 @@ public class SparkClusterResourceSpec {
       String scheduler,
       String name,
       String namespace,
+      String version,
       String image,
       int initWorkers,
       String options,
@@ -223,6 +240,7 @@ public class SparkClusterResourceSpec {
             .withNewMetadataLike(metadata)
             .withName(name + "-worker")
             .addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE)
+            .addToLabels(LABEL_SPARK_VERSION_NAME, version)
             .withNamespace(namespace)
             .endMetadata()
             .withNewSpecLike(statefulSetSpec)
@@ -235,6 +253,7 @@ public class SparkClusterResourceSpec {
             .editOrNewTemplate()
             .editOrNewMetadata()
             .addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE)
+            .addToLabels(LABEL_SPARK_VERSION_NAME, version)
             .endMetadata()
             .editOrNewSpec()
             .withSchedulerName(scheduler)
@@ -320,6 +339,7 @@ public class SparkClusterResourceSpec {
             .withNewMetadata()
             .withNamespace(namespace)
             .withName(clusterName + "-worker-hpa")
+            .addToLabels(LABEL_SPARK_VERSION_NAME, 
spec.getRuntimeVersions().getSparkVersion())
             .endMetadata()
             .withNewSpecLike(horizontalPodAutoscalerSpec)
             .withNewScaleTargetRef()
diff --git 
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
index bbe8ba0..3c0bbe7 100644
--- 
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
+++ 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.spark.k8s.operator;
 
+import static org.apache.spark.k8s.operator.Constants.LABEL_SPARK_VERSION_NAME;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -41,6 +42,7 @@ import org.apache.spark.SparkConf;
 import org.apache.spark.k8s.operator.spec.ClusterSpec;
 import org.apache.spark.k8s.operator.spec.ClusterTolerations;
 import org.apache.spark.k8s.operator.spec.MasterSpec;
+import org.apache.spark.k8s.operator.spec.RuntimeVersions;
 import org.apache.spark.k8s.operator.spec.WorkerInstanceConfig;
 import org.apache.spark.k8s.operator.spec.WorkerSpec;
 
@@ -52,6 +54,7 @@ class SparkClusterResourceSpecTest {
   ServiceSpec serviceSpec;
   MasterSpec masterSpec;
   WorkerSpec workerSpec;
+  RuntimeVersions runtimeVersions = new RuntimeVersions();
   SparkConf sparkConf = new SparkConf().set("spark.kubernetes.namespace", 
"other-namespace");
   ClusterTolerations clusterTolerations = new ClusterTolerations();
 
@@ -71,6 +74,8 @@ class SparkClusterResourceSpecTest {
     when(clusterSpec.getClusterTolerations()).thenReturn(clusterTolerations);
     when(clusterSpec.getMasterSpec()).thenReturn(masterSpec);
     when(clusterSpec.getWorkerSpec()).thenReturn(workerSpec);
+    when(clusterSpec.getRuntimeVersions()).thenReturn(runtimeVersions);
+    runtimeVersions.setSparkVersion("4.0.0");
     when(masterSpec.getStatefulSetSpec()).thenReturn(statefulSetSpec);
     when(masterSpec.getStatefulSetMetadata()).thenReturn(objectMeta);
     when(masterSpec.getServiceSpec()).thenReturn(serviceSpec);
@@ -86,9 +91,11 @@ class SparkClusterResourceSpecTest {
     Service service1 = new SparkClusterResourceSpec(cluster, new 
SparkConf()).getMasterService();
     assertEquals("my-namespace", service1.getMetadata().getNamespace());
     assertEquals("cluster-name-master-svc", service1.getMetadata().getName());
+    assertEquals("4.0.0", 
service1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
 
     Service service2 = new SparkClusterResourceSpec(cluster, 
sparkConf).getMasterService();
     assertEquals("other-namespace", service2.getMetadata().getNamespace());
+    assertEquals("4.0.0", 
service1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
   }
 
   @Test
@@ -96,6 +103,7 @@ class SparkClusterResourceSpecTest {
     Service service1 = new SparkClusterResourceSpec(cluster, new 
SparkConf()).getWorkerService();
     assertEquals("my-namespace", service1.getMetadata().getNamespace());
     assertEquals("cluster-name-worker-svc", service1.getMetadata().getName());
+    assertEquals("4.0.0", 
service1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
 
     Service service2 = new SparkClusterResourceSpec(cluster, 
sparkConf).getMasterService();
     assertEquals("other-namespace", service2.getMetadata().getNamespace());
@@ -119,6 +127,7 @@ class SparkClusterResourceSpecTest {
     assertEquals("my-namespace", service1.getMetadata().getNamespace());
     assertEquals("cluster-name-worker-svc", service1.getMetadata().getName());
     assertEquals("bar", service1.getMetadata().getLabels().get("foo"));
+    assertEquals("4.0.0", 
service1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
     assertEquals("foo", service1.getSpec().getExternalName());
   }
 
@@ -140,6 +149,7 @@ class SparkClusterResourceSpecTest {
     assertEquals("my-namespace", service1.getMetadata().getNamespace());
     assertEquals("cluster-name-master-svc", service1.getMetadata().getName());
     assertEquals("bar", service1.getMetadata().getLabels().get("foo"));
+    assertEquals("4.0.0", 
service1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
     assertEquals("foo", service1.getSpec().getExternalName());
   }
 
@@ -149,6 +159,15 @@ class SparkClusterResourceSpecTest {
     StatefulSet statefulSet1 = spec1.getMasterStatefulSet();
     assertEquals("my-namespace", statefulSet1.getMetadata().getNamespace());
     assertEquals("cluster-name-master", statefulSet1.getMetadata().getName());
+    assertEquals("4.0.0", 
statefulSet1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
+    assertEquals(
+        "4.0.0",
+        statefulSet1
+            .getSpec()
+            .getTemplate()
+            .getMetadata()
+            .getLabels()
+            .get(LABEL_SPARK_VERSION_NAME));
 
     SparkClusterResourceSpec spec2 = new SparkClusterResourceSpec(cluster, 
sparkConf);
     StatefulSet statefulSet2 = spec2.getMasterStatefulSet();
@@ -185,8 +204,17 @@ class SparkClusterResourceSpecTest {
     assertEquals("my-namespace", statefulSet1.getMetadata().getNamespace());
     assertEquals("cluster-name-master", statefulSet1.getMetadata().getName());
     assertEquals("bar", statefulSet1.getMetadata().getLabels().get("foo"));
+    assertEquals("4.0.0", 
statefulSet1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
     assertEquals(1, 
statefulSet1.getSpec().getTemplate().getSpec().getInitContainers().size());
     assertEquals(2, 
statefulSet1.getSpec().getTemplate().getSpec().getContainers().size());
+    assertEquals(
+        "4.0.0",
+        statefulSet1
+            .getSpec()
+            .getTemplate()
+            .getMetadata()
+            .getLabels()
+            .get(LABEL_SPARK_VERSION_NAME));
   }
 
   @Test
@@ -195,6 +223,15 @@ class SparkClusterResourceSpecTest {
     StatefulSet statefulSet = spec.getWorkerStatefulSet();
     assertEquals("my-namespace", statefulSet.getMetadata().getNamespace());
     assertEquals("cluster-name-worker", statefulSet.getMetadata().getName());
+    assertEquals("4.0.0", 
statefulSet.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
+    assertEquals(
+        "4.0.0",
+        statefulSet
+            .getSpec()
+            .getTemplate()
+            .getMetadata()
+            .getLabels()
+            .get(LABEL_SPARK_VERSION_NAME));
 
     SparkClusterResourceSpec spec2 = new SparkClusterResourceSpec(cluster, 
sparkConf);
     StatefulSet statefulSet2 = spec2.getWorkerStatefulSet();
@@ -230,6 +267,15 @@ class SparkClusterResourceSpecTest {
     StatefulSet statefulSet = spec.getWorkerStatefulSet();
     assertEquals("my-namespace", statefulSet.getMetadata().getNamespace());
     assertEquals("cluster-name-worker", statefulSet.getMetadata().getName());
+    assertEquals("4.0.0", 
statefulSet.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
+    assertEquals(
+        "4.0.0",
+        statefulSet
+            .getSpec()
+            .getTemplate()
+            .getMetadata()
+            .getLabels()
+            .get(LABEL_SPARK_VERSION_NAME));
   }
 
   @Test
@@ -255,6 +301,7 @@ class SparkClusterResourceSpecTest {
     assertEquals("HorizontalPodAutoscaler", hpa.getKind());
     assertEquals("my-namespace", hpa.getMetadata().getNamespace());
     assertEquals("cluster-name-worker-hpa", hpa.getMetadata().getName());
+    assertEquals("4.0.0", 
hpa.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
     assertEquals(1, hpa.getSpec().getMinReplicas());
     assertEquals(3, hpa.getSpec().getMaxReplicas());
   }
diff --git 
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorkerTest.java
 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorkerTest.java
index 3852d3a..6418292 100644
--- 
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorkerTest.java
+++ 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorkerTest.java
@@ -31,6 +31,7 @@ import org.junit.jupiter.api.Test;
 import org.apache.spark.k8s.operator.spec.ClusterSpec;
 import org.apache.spark.k8s.operator.spec.ClusterTolerations;
 import org.apache.spark.k8s.operator.spec.MasterSpec;
+import org.apache.spark.k8s.operator.spec.RuntimeVersions;
 import org.apache.spark.k8s.operator.spec.WorkerSpec;
 
 class SparkClusterSubmissionWorkerTest {
@@ -40,6 +41,7 @@ class SparkClusterSubmissionWorkerTest {
   ClusterTolerations clusterTolerations = new ClusterTolerations();
   MasterSpec masterSpec;
   WorkerSpec workerSpec;
+  RuntimeVersions runtimeVersions = new RuntimeVersions();
 
   @BeforeEach
   void setUp() {
@@ -55,6 +57,7 @@ class SparkClusterSubmissionWorkerTest {
     when(clusterSpec.getClusterTolerations()).thenReturn(clusterTolerations);
     when(clusterSpec.getMasterSpec()).thenReturn(masterSpec);
     when(clusterSpec.getWorkerSpec()).thenReturn(workerSpec);
+    when(clusterSpec.getRuntimeVersions()).thenReturn(runtimeVersions);
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to