This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 0b9d17f [SPARK-49838] Add `spark-version` label to `Spark Cluster`
resources
0b9d17f is described below
commit 0b9d17feaa218b07904fbeed1f6037ec970d15ff
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Mon Sep 30 12:17:10 2024 -0700
[SPARK-49838] Add `spark-version` label to `Spark Cluster` resources
### What changes were proposed in this pull request?
This PR aims to add `spark-version` label to `Spark Cluster` resources like
`Spark Application`
### Why are the changes needed?
`spark-version` is an important label to distinguish and search in the
production environment. This PR will add `spark-version` to the following
resources.
- `Master Service`
- `Master Statefulset`
- `Master Pod`
- `Worker Service`
- `Worker Statefulset`
- `Worker HorizontalPodAutoscaler`
- `Worker Pod`
We can use `spark-version` as a selector like the following.
```
$ kubectl get svc
NAME TYPE CLUSTER-IP EXTERNAL-IP
PORT(S) AGE
cluster-with-hpa-master-svc ClusterIP None <none>
8080/TCP,7077/TCP,6066/TCP 39s
cluster-with-hpa-worker-svc ClusterIP None <none>
8081/TCP 39s
kubernetes ClusterIP 10.96.0.1 <none>
443/TCP 6d12h
$ kubectl get svc -l spark-version=4.0.0-preview2
NAME TYPE CLUSTER-IP EXTERNAL-IP
PORT(S) AGE
cluster-with-hpa-master-svc ClusterIP None <none>
8080/TCP,7077/TCP,6066/TCP 89s
cluster-with-hpa-worker-svc ClusterIP None <none>
8081/TCP 89s
$ kubectl get pod -l spark-version=4.0.0-preview2
NAME READY STATUS RESTARTS AGE
cluster-with-hpa-master-0 1/1 Running 0 17m
cluster-with-hpa-worker-0 1/1 Running 0 17m
```
### Does this PR introduce _any_ user-facing change?
No, this is not released yet.
### How was this patch tested?
Pass the CIs with the revised test cases.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #142 from dongjoon-hyun/SPARK-49838.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../org/apache/spark/k8s/operator/Constants.java | 1 +
.../org/apache/spark/k8s/operator/utils/Utils.java | 2 +
.../k8s/operator/SparkClusterResourceSpec.java | 28 +++++++++++--
.../k8s/operator/SparkClusterResourceSpecTest.java | 47 ++++++++++++++++++++++
.../operator/SparkClusterSubmissionWorkerTest.java | 3 ++
5 files changed, 77 insertions(+), 4 deletions(-)
diff --git
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java
index 8c7fc78..72513ff 100644
---
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java
+++
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java
@@ -34,6 +34,7 @@ public class Constants {
public static final String LABEL_SPARK_ROLE_CLUSTER_VALUE = "cluster";
public static final String LABEL_SPARK_ROLE_MASTER_VALUE = "master";
public static final String LABEL_SPARK_ROLE_WORKER_VALUE = "worker";
+ public static final String LABEL_SPARK_VERSION_NAME = "spark-version";
public static final String SENTINEL_RESOURCE_DUMMY_FIELD =
"sentinel.dummy.number";
public static final String DRIVER_SPARK_CONTAINER_PROP_KEY =
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java
index 36c826c..7705a1d 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java
@@ -24,6 +24,7 @@ import static
org.apache.spark.k8s.operator.Constants.LABEL_SPARK_OPERATOR_NAME;
import static
org.apache.spark.k8s.operator.Constants.LABEL_SPARK_ROLE_CLUSTER_VALUE;
import static
org.apache.spark.k8s.operator.Constants.LABEL_SPARK_ROLE_DRIVER_VALUE;
import static
org.apache.spark.k8s.operator.Constants.LABEL_SPARK_ROLE_EXECUTOR_VALUE;
+import static org.apache.spark.k8s.operator.Constants.LABEL_SPARK_VERSION_NAME;
import static
org.apache.spark.k8s.operator.config.SparkOperatorConf.OPERATOR_APP_NAME;
import static
org.apache.spark.k8s.operator.config.SparkOperatorConf.OPERATOR_WATCHED_NAMESPACES;
import static
org.apache.spark.k8s.operator.config.SparkOperatorConf.SPARK_APP_STATUS_LISTENER_CLASS_NAMES;
@@ -111,6 +112,7 @@ public final class Utils {
public static Map<String, String> sparkClusterResourceLabels(final
SparkCluster cluster) {
Map<String, String> labels = commonManagedResourceLabels();
labels.put(Constants.LABEL_SPARK_CLUSTER_NAME,
cluster.getMetadata().getName());
+ labels.put(LABEL_SPARK_VERSION_NAME,
cluster.getSpec().getRuntimeVersions().getSparkVersion());
return labels;
}
diff --git
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
index e7bd8d4..158e0ee 100644
---
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
+++
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
@@ -61,6 +61,7 @@ public class SparkClusterResourceSpec {
String namespace = conf.get(Config.KUBERNETES_NAMESPACE().key(),
clusterNamespace);
String image = conf.get(Config.CONTAINER_IMAGE().key(),
"apache/spark:4.0.0-preview2");
ClusterSpec spec = cluster.getSpec();
+ String version = spec.getRuntimeVersions().getSparkVersion();
StringBuilder options = new StringBuilder();
for (Tuple2<String, String> t : conf.getAll()) {
options.append(String.format("-D%s=\"%s\" ", t._1, t._2));
@@ -69,15 +70,24 @@ public class SparkClusterResourceSpec {
WorkerSpec workerSpec = spec.getWorkerSpec();
masterService =
buildMasterService(
- clusterName, namespace, masterSpec.getServiceMetadata(),
masterSpec.getServiceSpec());
+ clusterName,
+ namespace,
+ version,
+ masterSpec.getServiceMetadata(),
+ masterSpec.getServiceSpec());
workerService =
buildWorkerService(
- clusterName, namespace, workerSpec.getServiceMetadata(),
workerSpec.getServiceSpec());
+ clusterName,
+ namespace,
+ version,
+ workerSpec.getServiceMetadata(),
+ workerSpec.getServiceSpec());
masterStatefulSet =
buildMasterStatefulSet(
scheduler,
clusterName,
namespace,
+ version,
image,
options.toString(),
masterSpec.getStatefulSetMetadata(),
@@ -87,6 +97,7 @@ public class SparkClusterResourceSpec {
scheduler,
clusterName,
namespace,
+ version,
image,
spec.getClusterTolerations().getInstanceConfig().getInitWorkers(),
options.toString(),
@@ -96,11 +107,12 @@ public class SparkClusterResourceSpec {
}
private static Service buildMasterService(
- String name, String namespace, ObjectMeta metadata, ServiceSpec
serviceSpec) {
+ String name, String namespace, String version, ObjectMeta metadata,
ServiceSpec serviceSpec) {
return new ServiceBuilder()
.withNewMetadataLike(metadata)
.withName(name + "-master-svc")
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_MASTER_VALUE)
+ .addToLabels(LABEL_SPARK_VERSION_NAME, version)
.withNamespace(namespace)
.endMetadata()
.withNewSpecLike(serviceSpec)
@@ -127,11 +139,12 @@ public class SparkClusterResourceSpec {
}
private static Service buildWorkerService(
- String name, String namespace, ObjectMeta metadata, ServiceSpec
serviceSpec) {
+ String name, String namespace, String version, ObjectMeta metadata,
ServiceSpec serviceSpec) {
return new ServiceBuilder()
.withNewMetadataLike(metadata)
.withName(name + "-worker-svc")
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE)
+ .addToLabels(LABEL_SPARK_VERSION_NAME, version)
.withNamespace(namespace)
.endMetadata()
.withNewSpecLike(serviceSpec)
@@ -151,6 +164,7 @@ public class SparkClusterResourceSpec {
String scheduler,
String name,
String namespace,
+ String version,
String image,
String options,
ObjectMeta objectMeta,
@@ -160,6 +174,7 @@ public class SparkClusterResourceSpec {
.withNewMetadataLike(objectMeta)
.withName(name + "-master")
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_MASTER_VALUE)
+ .addToLabels(LABEL_SPARK_VERSION_NAME, version)
.withNamespace(namespace)
.endMetadata()
.withNewSpecLike(statefulSetSpec)
@@ -171,6 +186,7 @@ public class SparkClusterResourceSpec {
.editOrNewTemplate()
.editOrNewMetadata()
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_MASTER_VALUE)
+ .addToLabels(LABEL_SPARK_VERSION_NAME, version)
.endMetadata()
.editOrNewSpec()
.withSchedulerName(scheduler)
@@ -213,6 +229,7 @@ public class SparkClusterResourceSpec {
String scheduler,
String name,
String namespace,
+ String version,
String image,
int initWorkers,
String options,
@@ -223,6 +240,7 @@ public class SparkClusterResourceSpec {
.withNewMetadataLike(metadata)
.withName(name + "-worker")
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE)
+ .addToLabels(LABEL_SPARK_VERSION_NAME, version)
.withNamespace(namespace)
.endMetadata()
.withNewSpecLike(statefulSetSpec)
@@ -235,6 +253,7 @@ public class SparkClusterResourceSpec {
.editOrNewTemplate()
.editOrNewMetadata()
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE)
+ .addToLabels(LABEL_SPARK_VERSION_NAME, version)
.endMetadata()
.editOrNewSpec()
.withSchedulerName(scheduler)
@@ -320,6 +339,7 @@ public class SparkClusterResourceSpec {
.withNewMetadata()
.withNamespace(namespace)
.withName(clusterName + "-worker-hpa")
+ .addToLabels(LABEL_SPARK_VERSION_NAME,
spec.getRuntimeVersions().getSparkVersion())
.endMetadata()
.withNewSpecLike(horizontalPodAutoscalerSpec)
.withNewScaleTargetRef()
diff --git
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
index bbe8ba0..3c0bbe7 100644
---
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
+++
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
@@ -19,6 +19,7 @@
package org.apache.spark.k8s.operator;
+import static org.apache.spark.k8s.operator.Constants.LABEL_SPARK_VERSION_NAME;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
@@ -41,6 +42,7 @@ import org.apache.spark.SparkConf;
import org.apache.spark.k8s.operator.spec.ClusterSpec;
import org.apache.spark.k8s.operator.spec.ClusterTolerations;
import org.apache.spark.k8s.operator.spec.MasterSpec;
+import org.apache.spark.k8s.operator.spec.RuntimeVersions;
import org.apache.spark.k8s.operator.spec.WorkerInstanceConfig;
import org.apache.spark.k8s.operator.spec.WorkerSpec;
@@ -52,6 +54,7 @@ class SparkClusterResourceSpecTest {
ServiceSpec serviceSpec;
MasterSpec masterSpec;
WorkerSpec workerSpec;
+ RuntimeVersions runtimeVersions = new RuntimeVersions();
SparkConf sparkConf = new SparkConf().set("spark.kubernetes.namespace",
"other-namespace");
ClusterTolerations clusterTolerations = new ClusterTolerations();
@@ -71,6 +74,8 @@ class SparkClusterResourceSpecTest {
when(clusterSpec.getClusterTolerations()).thenReturn(clusterTolerations);
when(clusterSpec.getMasterSpec()).thenReturn(masterSpec);
when(clusterSpec.getWorkerSpec()).thenReturn(workerSpec);
+ when(clusterSpec.getRuntimeVersions()).thenReturn(runtimeVersions);
+ runtimeVersions.setSparkVersion("4.0.0");
when(masterSpec.getStatefulSetSpec()).thenReturn(statefulSetSpec);
when(masterSpec.getStatefulSetMetadata()).thenReturn(objectMeta);
when(masterSpec.getServiceSpec()).thenReturn(serviceSpec);
@@ -86,9 +91,11 @@ class SparkClusterResourceSpecTest {
Service service1 = new SparkClusterResourceSpec(cluster, new
SparkConf()).getMasterService();
assertEquals("my-namespace", service1.getMetadata().getNamespace());
assertEquals("cluster-name-master-svc", service1.getMetadata().getName());
+ assertEquals("4.0.0",
service1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
Service service2 = new SparkClusterResourceSpec(cluster,
sparkConf).getMasterService();
assertEquals("other-namespace", service2.getMetadata().getNamespace());
+ assertEquals("4.0.0",
service1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
}
@Test
@@ -96,6 +103,7 @@ class SparkClusterResourceSpecTest {
Service service1 = new SparkClusterResourceSpec(cluster, new
SparkConf()).getWorkerService();
assertEquals("my-namespace", service1.getMetadata().getNamespace());
assertEquals("cluster-name-worker-svc", service1.getMetadata().getName());
+ assertEquals("4.0.0",
service1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
Service service2 = new SparkClusterResourceSpec(cluster,
sparkConf).getMasterService();
assertEquals("other-namespace", service2.getMetadata().getNamespace());
@@ -119,6 +127,7 @@ class SparkClusterResourceSpecTest {
assertEquals("my-namespace", service1.getMetadata().getNamespace());
assertEquals("cluster-name-worker-svc", service1.getMetadata().getName());
assertEquals("bar", service1.getMetadata().getLabels().get("foo"));
+ assertEquals("4.0.0",
service1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
assertEquals("foo", service1.getSpec().getExternalName());
}
@@ -140,6 +149,7 @@ class SparkClusterResourceSpecTest {
assertEquals("my-namespace", service1.getMetadata().getNamespace());
assertEquals("cluster-name-master-svc", service1.getMetadata().getName());
assertEquals("bar", service1.getMetadata().getLabels().get("foo"));
+ assertEquals("4.0.0",
service1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
assertEquals("foo", service1.getSpec().getExternalName());
}
@@ -149,6 +159,15 @@ class SparkClusterResourceSpecTest {
StatefulSet statefulSet1 = spec1.getMasterStatefulSet();
assertEquals("my-namespace", statefulSet1.getMetadata().getNamespace());
assertEquals("cluster-name-master", statefulSet1.getMetadata().getName());
+ assertEquals("4.0.0",
statefulSet1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
+ assertEquals(
+ "4.0.0",
+ statefulSet1
+ .getSpec()
+ .getTemplate()
+ .getMetadata()
+ .getLabels()
+ .get(LABEL_SPARK_VERSION_NAME));
SparkClusterResourceSpec spec2 = new SparkClusterResourceSpec(cluster,
sparkConf);
StatefulSet statefulSet2 = spec2.getMasterStatefulSet();
@@ -185,8 +204,17 @@ class SparkClusterResourceSpecTest {
assertEquals("my-namespace", statefulSet1.getMetadata().getNamespace());
assertEquals("cluster-name-master", statefulSet1.getMetadata().getName());
assertEquals("bar", statefulSet1.getMetadata().getLabels().get("foo"));
+ assertEquals("4.0.0",
statefulSet1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
assertEquals(1,
statefulSet1.getSpec().getTemplate().getSpec().getInitContainers().size());
assertEquals(2,
statefulSet1.getSpec().getTemplate().getSpec().getContainers().size());
+ assertEquals(
+ "4.0.0",
+ statefulSet1
+ .getSpec()
+ .getTemplate()
+ .getMetadata()
+ .getLabels()
+ .get(LABEL_SPARK_VERSION_NAME));
}
@Test
@@ -195,6 +223,15 @@ class SparkClusterResourceSpecTest {
StatefulSet statefulSet = spec.getWorkerStatefulSet();
assertEquals("my-namespace", statefulSet.getMetadata().getNamespace());
assertEquals("cluster-name-worker", statefulSet.getMetadata().getName());
+ assertEquals("4.0.0",
statefulSet.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
+ assertEquals(
+ "4.0.0",
+ statefulSet
+ .getSpec()
+ .getTemplate()
+ .getMetadata()
+ .getLabels()
+ .get(LABEL_SPARK_VERSION_NAME));
SparkClusterResourceSpec spec2 = new SparkClusterResourceSpec(cluster,
sparkConf);
StatefulSet statefulSet2 = spec2.getWorkerStatefulSet();
@@ -230,6 +267,15 @@ class SparkClusterResourceSpecTest {
StatefulSet statefulSet = spec.getWorkerStatefulSet();
assertEquals("my-namespace", statefulSet.getMetadata().getNamespace());
assertEquals("cluster-name-worker", statefulSet.getMetadata().getName());
+ assertEquals("4.0.0",
statefulSet.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
+ assertEquals(
+ "4.0.0",
+ statefulSet
+ .getSpec()
+ .getTemplate()
+ .getMetadata()
+ .getLabels()
+ .get(LABEL_SPARK_VERSION_NAME));
}
@Test
@@ -255,6 +301,7 @@ class SparkClusterResourceSpecTest {
assertEquals("HorizontalPodAutoscaler", hpa.getKind());
assertEquals("my-namespace", hpa.getMetadata().getNamespace());
assertEquals("cluster-name-worker-hpa", hpa.getMetadata().getName());
+ assertEquals("4.0.0",
hpa.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
assertEquals(1, hpa.getSpec().getMinReplicas());
assertEquals(3, hpa.getSpec().getMaxReplicas());
}
diff --git
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorkerTest.java
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorkerTest.java
index 3852d3a..6418292 100644
---
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorkerTest.java
+++
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorkerTest.java
@@ -31,6 +31,7 @@ import org.junit.jupiter.api.Test;
import org.apache.spark.k8s.operator.spec.ClusterSpec;
import org.apache.spark.k8s.operator.spec.ClusterTolerations;
import org.apache.spark.k8s.operator.spec.MasterSpec;
+import org.apache.spark.k8s.operator.spec.RuntimeVersions;
import org.apache.spark.k8s.operator.spec.WorkerSpec;
class SparkClusterSubmissionWorkerTest {
@@ -40,6 +41,7 @@ class SparkClusterSubmissionWorkerTest {
ClusterTolerations clusterTolerations = new ClusterTolerations();
MasterSpec masterSpec;
WorkerSpec workerSpec;
+ RuntimeVersions runtimeVersions = new RuntimeVersions();
@BeforeEach
void setUp() {
@@ -55,6 +57,7 @@ class SparkClusterSubmissionWorkerTest {
when(clusterSpec.getClusterTolerations()).thenReturn(clusterTolerations);
when(clusterSpec.getMasterSpec()).thenReturn(masterSpec);
when(clusterSpec.getWorkerSpec()).thenReturn(workerSpec);
+ when(clusterSpec.getRuntimeVersions()).thenReturn(runtimeVersions);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]