This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new a2aa452  [SPARK-49329] Support user provided spec for SparkCluster
a2aa452 is described below

commit a2aa4521bc887b6aecc6c755f90b770ad4d6ee4f
Author: zhou-jiang <[email protected]>
AuthorDate: Sun Aug 25 14:25:13 2024 -0700

    [SPARK-49329] Support user provided spec for SparkCluster
    
    ### What changes were proposed in this pull request?
    
    This PR introduces the feature to enable user-provided metadata & spec for 
Spark Clusters.
    
    ### Why are the changes needed?
    
    Similar to pod template spec support for Apps, this is desired when user 
would like to introduce customization for Cluster master + worker spec.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No - not released yet
    
    ### How was this patch tested?
    
    Unit tests and integration test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #80 from jiangzho/cluster_api.
    
    Authored-by: zhou-jiang <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../spark/k8s/operator/spec/ClusterSpec.java       |   3 +
 .../spec/{ClusterSpec.java => MasterSpec.java}     |  18 ++-
 .../spec/{ClusterSpec.java => WorkerSpec.java}     |  18 ++-
 .../k8s/operator/SparkClusterResourceSpec.java     |  84 +++++++++----
 .../k8s/operator/SparkClusterResourceSpecTest.java | 132 +++++++++++++++++++++
 .../operator/SparkClusterSubmissionWorkerTest.java |   8 ++
 6 files changed, 219 insertions(+), 44 deletions(-)

diff --git 
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
 
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
index b7e9fa8..6786abb 100644
--- 
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
+++ 
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
@@ -40,4 +40,7 @@ public class ClusterSpec extends BaseSpec {
 
   @Required @Builder.Default
   protected ClusterTolerations clusterTolerations = new ClusterTolerations();
+
+  @Builder.Default protected MasterSpec masterSpec = new 
MasterSpec.MasterSpecBuilder().build();
+  @Builder.Default protected WorkerSpec workerSpec = new 
WorkerSpec.WorkerSpecBuilder().build();
 }
diff --git 
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
 
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/MasterSpec.java
similarity index 72%
copy from 
spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
copy to 
spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/MasterSpec.java
index b7e9fa8..7becfc0 100644
--- 
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
+++ 
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/MasterSpec.java
@@ -19,25 +19,23 @@
 
 package org.apache.spark.k8s.operator.spec;
 
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonInclude;
-import io.fabric8.generator.annotation.Required;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.ServiceSpec;
+import io.fabric8.kubernetes.api.model.apps.StatefulSetSpec;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
-import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 
 @Data
 @NoArgsConstructor
 @AllArgsConstructor
 @Builder
-@EqualsAndHashCode(callSuper = true)
 @JsonInclude(JsonInclude.Include.NON_NULL)
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class ClusterSpec extends BaseSpec {
-  @Required protected RuntimeVersions runtimeVersions;
-
-  @Required @Builder.Default
-  protected ClusterTolerations clusterTolerations = new ClusterTolerations();
+public class MasterSpec {
+  protected StatefulSetSpec masterStatefulSetSpec;
+  protected ObjectMeta masterStatefulSetMetadata;
+  protected ServiceSpec masterServiceSpec;
+  protected ObjectMeta masterServiceMetadata;
 }
diff --git 
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
 
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/WorkerSpec.java
similarity index 72%
copy from 
spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
copy to 
spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/WorkerSpec.java
index b7e9fa8..2c5beb1 100644
--- 
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/ClusterSpec.java
+++ 
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/WorkerSpec.java
@@ -19,25 +19,23 @@
 
 package org.apache.spark.k8s.operator.spec;
 
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonInclude;
-import io.fabric8.generator.annotation.Required;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.ServiceSpec;
+import io.fabric8.kubernetes.api.model.apps.StatefulSetSpec;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
-import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 
 @Data
 @NoArgsConstructor
 @AllArgsConstructor
 @Builder
-@EqualsAndHashCode(callSuper = true)
 @JsonInclude(JsonInclude.Include.NON_NULL)
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class ClusterSpec extends BaseSpec {
-  @Required protected RuntimeVersions runtimeVersions;
-
-  @Required @Builder.Default
-  protected ClusterTolerations clusterTolerations = new ClusterTolerations();
+public class WorkerSpec {
+  protected StatefulSetSpec workerStatefulSetSpec;
+  protected ObjectMeta workerStatefulSetMetadata;
+  protected ServiceSpec workerServiceSpec;
+  protected ObjectMeta workerServiceMetadata;
 }
diff --git 
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
 
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
index d5c0324..390af3d 100644
--- 
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
+++ 
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
@@ -25,15 +25,20 @@ import java.util.Collections;
 
 import scala.Tuple2;
 
+import io.fabric8.kubernetes.api.model.ObjectMeta;
 import io.fabric8.kubernetes.api.model.Service;
 import io.fabric8.kubernetes.api.model.ServiceBuilder;
+import io.fabric8.kubernetes.api.model.ServiceSpec;
 import io.fabric8.kubernetes.api.model.apps.StatefulSet;
 import io.fabric8.kubernetes.api.model.apps.StatefulSetBuilder;
+import io.fabric8.kubernetes.api.model.apps.StatefulSetSpec;
 import lombok.Getter;
 
 import org.apache.spark.SparkConf;
 import org.apache.spark.deploy.k8s.Config;
 import org.apache.spark.k8s.operator.spec.ClusterSpec;
+import org.apache.spark.k8s.operator.spec.MasterSpec;
+import org.apache.spark.k8s.operator.spec.WorkerSpec;
 
 /** Spark Cluster Resource Spec: Master Service, Master StatefulSet, Worker 
StatefulSet */
 public class SparkClusterResourceSpec {
@@ -53,10 +58,29 @@ public class SparkClusterResourceSpec {
     for (Tuple2<String, String> t : conf.getAll()) {
       options.append(String.format("-D%s=\"%s\" ", t._1, t._2));
     }
-    masterService = buildMasterService(clusterName, namespace);
-    workerService = buildWorkerService(clusterName, namespace);
+    MasterSpec masterSpec = spec.getMasterSpec();
+    WorkerSpec workerSpec = spec.getWorkerSpec();
+    masterService =
+        buildMasterService(
+            clusterName,
+            namespace,
+            masterSpec.getMasterServiceMetadata(),
+            masterSpec.getMasterServiceSpec());
+    workerService =
+        buildWorkerService(
+            clusterName,
+            namespace,
+            workerSpec.getWorkerServiceMetadata(),
+            workerSpec.getWorkerServiceSpec());
     masterStatefulSet =
-        buildMasterStatefulSet(scheduler, clusterName, namespace, image, 
options.toString());
+        buildMasterStatefulSet(
+            scheduler,
+            clusterName,
+            namespace,
+            image,
+            options.toString(),
+            masterSpec.getMasterStatefulSetMetadata(),
+            masterSpec.getMasterStatefulSetSpec());
     workerStatefulSet =
         buildWorkerStatefulSet(
             scheduler,
@@ -64,17 +88,20 @@ public class SparkClusterResourceSpec {
             namespace,
             image,
             spec.getClusterTolerations().getInstanceConfig().getInitWorkers(),
-            options.toString());
+            options.toString(),
+            workerSpec.getWorkerStatefulSetMetadata(),
+            workerSpec.getWorkerStatefulSetSpec());
   }
 
-  private static Service buildMasterService(String name, String namespace) {
+  private static Service buildMasterService(
+      String name, String namespace, ObjectMeta metadata, ServiceSpec 
serviceSpec) {
     return new ServiceBuilder()
-        .withNewMetadata()
+        .withNewMetadataLike(metadata)
         .withName(name + "-master-svc")
         .addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_MASTER_VALUE)
         .withNamespace(namespace)
         .endMetadata()
-        .withNewSpec()
+        .withNewSpecLike(serviceSpec)
         .withClusterIP("None")
         .withSelector(
             Collections.singletonMap(LABEL_SPARK_ROLE_NAME, 
LABEL_SPARK_ROLE_MASTER_VALUE))
@@ -97,14 +124,15 @@ public class SparkClusterResourceSpec {
         .build();
   }
 
-  private static Service buildWorkerService(String name, String namespace) {
+  private static Service buildWorkerService(
+      String name, String namespace, ObjectMeta metadata, ServiceSpec 
serviceSpec) {
     return new ServiceBuilder()
-        .withNewMetadata()
+        .withNewMetadataLike(metadata)
         .withName(name + "-worker-svc")
         .addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE)
         .withNamespace(namespace)
         .endMetadata()
-        .withNewSpec()
+        .withNewSpecLike(serviceSpec)
         .withClusterIP("None")
         .withSelector(
             Collections.singletonMap(LABEL_SPARK_ROLE_NAME, 
LABEL_SPARK_ROLE_WORKER_VALUE))
@@ -118,24 +146,30 @@ public class SparkClusterResourceSpec {
   }
 
   private static StatefulSet buildMasterStatefulSet(
-      String scheduler, String name, String namespace, String image, String 
options) {
+      String scheduler,
+      String name,
+      String namespace,
+      String image,
+      String options,
+      ObjectMeta objectMeta,
+      StatefulSetSpec statefulSetSpec) {
     return new StatefulSetBuilder()
-        .withNewMetadata()
+        .withNewMetadataLike(objectMeta)
         .withName(name + "-master")
         .addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_MASTER_VALUE)
         .withNamespace(namespace)
         .endMetadata()
-        .withNewSpec()
+        .withNewSpecLike(statefulSetSpec)
         .withPodManagementPolicy("Parallel")
         .withReplicas(1)
-        .withNewSelector()
+        .editOrNewSelector()
         .addToMatchLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_MASTER_VALUE)
         .endSelector()
-        .withNewTemplate()
-        .withNewMetadata()
+        .editOrNewTemplate()
+        .editOrNewMetadata()
         .addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_MASTER_VALUE)
         .endMetadata()
-        .withNewSpec()
+        .editOrNewSpec()
         .withSchedulerName(scheduler)
         .withTerminationGracePeriodSeconds(0L)
         .addNewContainer()
@@ -176,25 +210,27 @@ public class SparkClusterResourceSpec {
       String namespace,
       String image,
       int initWorkers,
-      String options) {
+      String options,
+      ObjectMeta metadata,
+      StatefulSetSpec statefulSetSpec) {
     return new StatefulSetBuilder()
-        .withNewMetadata()
+        .withNewMetadataLike(metadata)
         .withName(name + "-worker")
         .addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE)
         .withNamespace(namespace)
         .endMetadata()
-        .withNewSpec()
+        .withNewSpecLike(statefulSetSpec)
         .withPodManagementPolicy("Parallel")
         .withReplicas(initWorkers)
         .withServiceName(name + "-worker-svc")
-        .withNewSelector()
+        .editOrNewSelector()
         .addToMatchLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE)
         .endSelector()
-        .withNewTemplate()
-        .withNewMetadata()
+        .editOrNewTemplate()
+        .editOrNewMetadata()
         .addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE)
         .endMetadata()
-        .withNewSpec()
+        .editOrNewSpec()
         .withSchedulerName(scheduler)
         .withTerminationGracePeriodSeconds(0L)
         .withNewDnsConfig()
diff --git 
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
index ef1cbaf..3a6ae7e 100644
--- 
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
+++ 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
@@ -24,19 +24,30 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
 import io.fabric8.kubernetes.api.model.Service;
+import io.fabric8.kubernetes.api.model.ServiceSpec;
+import io.fabric8.kubernetes.api.model.ServiceSpecBuilder;
 import io.fabric8.kubernetes.api.model.apps.StatefulSet;
+import io.fabric8.kubernetes.api.model.apps.StatefulSetSpec;
+import io.fabric8.kubernetes.api.model.apps.StatefulSetSpecBuilder;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import org.apache.spark.SparkConf;
 import org.apache.spark.k8s.operator.spec.ClusterSpec;
 import org.apache.spark.k8s.operator.spec.ClusterTolerations;
+import org.apache.spark.k8s.operator.spec.MasterSpec;
+import org.apache.spark.k8s.operator.spec.WorkerSpec;
 
 class SparkClusterResourceSpecTest {
   SparkCluster cluster;
   ObjectMeta objectMeta;
   ClusterSpec clusterSpec;
+  StatefulSetSpec statefulSetSpec;
+  ServiceSpec serviceSpec;
+  MasterSpec masterSpec;
+  WorkerSpec workerSpec;
   SparkConf sparkConf = new SparkConf().set("spark.kubernetes.namespace", 
"other-namespace");
   ClusterTolerations clusterTolerations = new ClusterTolerations();
 
@@ -45,11 +56,25 @@ class SparkClusterResourceSpecTest {
     cluster = mock(SparkCluster.class);
     objectMeta = mock(ObjectMeta.class);
     clusterSpec = mock(ClusterSpec.class);
+    serviceSpec = mock(ServiceSpec.class);
+    masterSpec = mock(MasterSpec.class);
+    workerSpec = mock(WorkerSpec.class);
+    statefulSetSpec = mock(StatefulSetSpec.class);
     when(cluster.getMetadata()).thenReturn(objectMeta);
     when(cluster.getSpec()).thenReturn(clusterSpec);
     when(objectMeta.getNamespace()).thenReturn("my-namespace");
     when(objectMeta.getName()).thenReturn("cluster-name");
     when(clusterSpec.getClusterTolerations()).thenReturn(clusterTolerations);
+    when(clusterSpec.getMasterSpec()).thenReturn(masterSpec);
+    when(clusterSpec.getWorkerSpec()).thenReturn(workerSpec);
+    when(masterSpec.getMasterStatefulSetSpec()).thenReturn(statefulSetSpec);
+    when(masterSpec.getMasterStatefulSetMetadata()).thenReturn(objectMeta);
+    when(masterSpec.getMasterServiceSpec()).thenReturn(serviceSpec);
+    when(masterSpec.getMasterServiceMetadata()).thenReturn(objectMeta);
+    when(workerSpec.getWorkerStatefulSetSpec()).thenReturn(statefulSetSpec);
+    when(workerSpec.getWorkerStatefulSetMetadata()).thenReturn(objectMeta);
+    when(workerSpec.getWorkerServiceSpec()).thenReturn(serviceSpec);
+    when(workerSpec.getWorkerServiceMetadata()).thenReturn(objectMeta);
   }
 
   @Test
@@ -72,6 +97,48 @@ class SparkClusterResourceSpecTest {
     assertEquals("other-namespace", service2.getMetadata().getNamespace());
   }
 
+  @Test
+  void testWorkerServiceWithTemplate() {
+    ObjectMeta objectMeta1 =
+        new ObjectMetaBuilder()
+            .withNamespace("foo")
+            .withName("bar")
+            .addToLabels("foo", "bar")
+            .build();
+    ServiceSpec serviceSpec1 = new 
ServiceSpecBuilder().withExternalName("foo").build();
+    WorkerSpec workerSpec1 = mock(WorkerSpec.class);
+    when(workerSpec1.getWorkerServiceSpec()).thenReturn(serviceSpec1);
+    when(workerSpec1.getWorkerServiceMetadata()).thenReturn(objectMeta1);
+    when(clusterSpec.getWorkerSpec()).thenReturn(workerSpec1);
+
+    Service service1 = new SparkClusterResourceSpec(cluster, new 
SparkConf()).getWorkerService();
+    assertEquals("my-namespace", service1.getMetadata().getNamespace());
+    assertEquals("cluster-name-worker-svc", service1.getMetadata().getName());
+    assertEquals("bar", service1.getMetadata().getLabels().get("foo"));
+    assertEquals("foo", service1.getSpec().getExternalName());
+  }
+
+  @Test
+  void testMasterServiceWithTemplate() {
+    ObjectMeta objectMeta1 =
+        new ObjectMetaBuilder()
+            .withNamespace("foo")
+            .withName("bar")
+            .addToLabels("foo", "bar")
+            .build();
+    ServiceSpec serviceSpec1 = new 
ServiceSpecBuilder().withExternalName("foo").build();
+    MasterSpec masterSpec1 = mock(MasterSpec.class);
+    when(masterSpec1.getMasterServiceSpec()).thenReturn(serviceSpec1);
+    when(masterSpec1.getMasterServiceMetadata()).thenReturn(objectMeta1);
+    when(clusterSpec.getMasterSpec()).thenReturn(masterSpec1);
+
+    Service service1 = new SparkClusterResourceSpec(cluster, new 
SparkConf()).getMasterService();
+    assertEquals("my-namespace", service1.getMetadata().getNamespace());
+    assertEquals("cluster-name-master-svc", service1.getMetadata().getName());
+    assertEquals("bar", service1.getMetadata().getLabels().get("foo"));
+    assertEquals("foo", service1.getSpec().getExternalName());
+  }
+
   @Test
   void testMasterStatefulSet() {
     SparkClusterResourceSpec spec1 = new SparkClusterResourceSpec(cluster, new 
SparkConf());
@@ -84,6 +151,40 @@ class SparkClusterResourceSpecTest {
     assertEquals("other-namespace", statefulSet2.getMetadata().getNamespace());
   }
 
+  @Test
+  void testMasterStatefulSetWithTemplate() {
+    ObjectMeta objectMeta1 =
+        new ObjectMetaBuilder()
+            .withNamespace("foo")
+            .withName("bar")
+            .addToLabels("foo", "bar")
+            .build();
+    StatefulSetSpec statefulSetSpec1 =
+        new StatefulSetSpecBuilder()
+            .withNewTemplate()
+            .withNewSpec()
+            .addNewInitContainer()
+            .withName("init-foo")
+            .endInitContainer()
+            .addNewContainer()
+            .withName("sidecar-foo")
+            .endContainer()
+            .endSpec()
+            .endTemplate()
+            .build();
+    MasterSpec masterSpec1 = mock(MasterSpec.class);
+    when(masterSpec1.getMasterStatefulSetMetadata()).thenReturn(objectMeta1);
+    when(masterSpec1.getMasterStatefulSetSpec()).thenReturn(statefulSetSpec1);
+    when(clusterSpec.getMasterSpec()).thenReturn(masterSpec1);
+    SparkClusterResourceSpec spec1 = new SparkClusterResourceSpec(cluster, new 
SparkConf());
+    StatefulSet statefulSet1 = spec1.getMasterStatefulSet();
+    assertEquals("my-namespace", statefulSet1.getMetadata().getNamespace());
+    assertEquals("cluster-name-master", statefulSet1.getMetadata().getName());
+    assertEquals("bar", statefulSet1.getMetadata().getLabels().get("foo"));
+    assertEquals(1, 
statefulSet1.getSpec().getTemplate().getSpec().getInitContainers().size());
+    assertEquals(2, 
statefulSet1.getSpec().getTemplate().getSpec().getContainers().size());
+  }
+
   @Test
   void testWorkerStatefulSet() {
     SparkClusterResourceSpec spec = new SparkClusterResourceSpec(cluster, new 
SparkConf());
@@ -95,4 +196,35 @@ class SparkClusterResourceSpecTest {
     StatefulSet statefulSet2 = spec2.getWorkerStatefulSet();
     assertEquals("other-namespace", statefulSet2.getMetadata().getNamespace());
   }
+
+  @Test
+  void testWorkerStatefulSetWithTemplate() {
+    ObjectMeta objectMeta1 =
+        new ObjectMetaBuilder()
+            .withNamespace("foo")
+            .withName("bar")
+            .addToLabels("foo", "bar")
+            .build();
+    StatefulSetSpec statefulSetSpec1 =
+        new StatefulSetSpecBuilder()
+            .withNewTemplate()
+            .withNewSpec()
+            .addNewInitContainer()
+            .withName("init-foo")
+            .endInitContainer()
+            .addNewContainer()
+            .withName("sidecar-foo")
+            .endContainer()
+            .endSpec()
+            .endTemplate()
+            .build();
+    WorkerSpec workerSpec1 = mock(WorkerSpec.class);
+    when(workerSpec1.getWorkerStatefulSetMetadata()).thenReturn(objectMeta1);
+    when(workerSpec1.getWorkerStatefulSetSpec()).thenReturn(statefulSetSpec1);
+    when(clusterSpec.getWorkerSpec()).thenReturn(workerSpec1);
+    SparkClusterResourceSpec spec = new SparkClusterResourceSpec(cluster, new 
SparkConf());
+    StatefulSet statefulSet = spec.getWorkerStatefulSet();
+    assertEquals("my-namespace", statefulSet.getMetadata().getNamespace());
+    assertEquals("cluster-name-worker", statefulSet.getMetadata().getName());
+  }
 }
diff --git 
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorkerTest.java
 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorkerTest.java
index b0d02c4..0e2ea9b 100644
--- 
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorkerTest.java
+++ 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorkerTest.java
@@ -30,23 +30,31 @@ import org.junit.jupiter.api.Test;
 
 import org.apache.spark.k8s.operator.spec.ClusterSpec;
 import org.apache.spark.k8s.operator.spec.ClusterTolerations;
+import org.apache.spark.k8s.operator.spec.MasterSpec;
+import org.apache.spark.k8s.operator.spec.WorkerSpec;
 
 class SparkClusterSubmissionWorkerTest {
   SparkCluster cluster;
   ObjectMeta objectMeta;
   ClusterSpec clusterSpec;
   ClusterTolerations clusterTolerations = new ClusterTolerations();
+  MasterSpec masterSpec;
+  WorkerSpec workerSpec;
 
   @BeforeEach
   void setUp() {
     cluster = mock(SparkCluster.class);
     objectMeta = mock(ObjectMeta.class);
     clusterSpec = mock(ClusterSpec.class);
+    masterSpec = mock(MasterSpec.class);
+    workerSpec = mock(WorkerSpec.class);
     when(cluster.getMetadata()).thenReturn(objectMeta);
     when(cluster.getSpec()).thenReturn(clusterSpec);
     when(objectMeta.getNamespace()).thenReturn("my-namespace");
     when(objectMeta.getName()).thenReturn("cluster-name");
     when(clusterSpec.getClusterTolerations()).thenReturn(clusterTolerations);
+    when(clusterSpec.getMasterSpec()).thenReturn(masterSpec);
+    when(clusterSpec.getWorkerSpec()).thenReturn(workerSpec);
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to