This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b8c03ee  [SPARK-38455][SPARK-38187][K8S] Support driver/executor 
`PodGroup` templates
b8c03ee is described below

commit b8c03eeb15a22895d3ab55b931b468ad012a28d4
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Tue Mar 8 20:16:43 2022 -0800

    [SPARK-38455][SPARK-38187][K8S] Support driver/executor `PodGroup` templates
    
    ### What changes were proposed in this pull request?
    
    This PR aims to support driver/executor `PodGroup` templates like the 
following.
    
    ```yaml
    apiVersion: scheduling.volcano.sh/v1beta1
    kind: PodGroup
    spec:
      minMember: 1000
      minResources:
        cpu: "4"
        memory: "16Gi"
      priorityClassName: executor-priority
      queue: executor-queue
    ```
    
    ### Why are the changes needed?
    
    This is a simpler, more extensible and robust way to support Volcano future 
because we don't need to add new configurations like 
https://github.com/apache/spark/pull/35640 for all Volcano features.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No because this is a new feature.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    Closes #35776 from dongjoon-hyun/SPARK-38455.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 resource-managers/kubernetes/core/pom.xml          |  5 +++
 .../scala/org/apache/spark/deploy/k8s/Config.scala | 14 ++++++++
 .../deploy/k8s/features/VolcanoFeatureStep.scala   | 31 +++++++++++-------
 .../test/resources/driver-podgroup-template.yml    | 25 ++++++++++++++
 .../test/resources/executor-podgroup-template.yml  | 25 ++++++++++++++
 .../k8s/features/VolcanoFeatureStepSuite.scala     | 38 ++++++++++++++++++++++
 6 files changed, 127 insertions(+), 11 deletions(-)

diff --git a/resource-managers/kubernetes/core/pom.xml 
b/resource-managers/kubernetes/core/pom.xml
index 6eb357e..611fee6 100644
--- a/resource-managers/kubernetes/core/pom.xml
+++ b/resource-managers/kubernetes/core/pom.xml
@@ -44,6 +44,11 @@
           <artifactId>volcano-model-v1beta1</artifactId>
           <version>${kubernetes-client.version}</version>
         </dependency>
+        <dependency>
+          <groupId>io.fabric8</groupId>
+          <artifactId>volcano-client</artifactId>
+          <version>${kubernetes-client.version}</version>
+        </dependency>
       </dependencies>
     </profile>
   </profiles>
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index a0270fa..e66ecf4 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -292,6 +292,20 @@ private[spark] object Config extends Logging {
       .stringConf
       .createOptional
 
+  val KUBERNETES_DRIVER_PODGROUP_TEMPLATE_FILE =
+    ConfigBuilder("spark.kubernetes.driver.podGroupTemplateFile")
+      .doc("File containing a template pod group spec for driver")
+      .version("3.3.0")
+      .stringConf
+      .createOptional
+
+  val KUBERNETES_EXECUTOR_PODGROUP_TEMPLATE_FILE =
+    ConfigBuilder("spark.kubernetes.executor.podGroupTemplateFile")
+      .doc("File containing a template pod group spec for executors")
+      .version("3.3.0")
+      .stringConf
+      .createOptional
+
   val KUBERNETES_JOB_QUEUE = ConfigBuilder("spark.kubernetes.job.queue")
     .doc("The name of the queue to which the job is submitted. This info " +
       "will be stored in configuration and passed to specific feature step.")
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStep.scala
index 58d6c5c..5fd0fc6 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStep.scala
@@ -17,7 +17,8 @@
 package org.apache.spark.deploy.k8s.features
 
 import io.fabric8.kubernetes.api.model._
-import io.fabric8.volcano.scheduling.v1beta1.PodGroupBuilder
+import io.fabric8.volcano.client.DefaultVolcanoClient
+import io.fabric8.volcano.scheduling.v1beta1.{PodGroup, PodGroupSpec}
 
 import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverConf, 
KubernetesExecutorConf, SparkPod}
 import org.apache.spark.deploy.k8s.Config._
@@ -43,19 +44,27 @@ private[spark] class VolcanoFeatureStep extends 
KubernetesDriverCustomFeatureCon
   }
 
   override def getAdditionalPreKubernetesResources(): Seq[HasMetadata] = {
-    val podGroup = new PodGroupBuilder()
-      .editOrNewMetadata()
-        .withName(podGroupName)
-        .withNamespace(namespace)
-      .endMetadata()
-      .editOrNewSpec()
-      .endSpec()
+    val client = new DefaultVolcanoClient
 
-    queue.foreach(podGroup.editOrNewSpec().withQueue(_).endSpec())
+    val template = if (kubernetesConf.isInstanceOf[KubernetesDriverConf]) {
+      kubernetesConf.get(KUBERNETES_DRIVER_PODGROUP_TEMPLATE_FILE)
+    } else {
+      kubernetesConf.get(KUBERNETES_EXECUTOR_PODGROUP_TEMPLATE_FILE)
+    }
+    val pg = template.map(client.podGroups.load(_).get).getOrElse(new 
PodGroup())
+    var metadata = pg.getMetadata
+    if (metadata == null) metadata = new ObjectMeta
+    metadata.setName(podGroupName)
+    metadata.setNamespace(namespace)
+    pg.setMetadata(metadata)
 
-    
priorityClassName.foreach(podGroup.editOrNewSpec().withPriorityClassName(_).endSpec())
+    var spec = pg.getSpec
+    if (spec == null) spec = new PodGroupSpec
+    queue.foreach(spec.setQueue(_))
+    priorityClassName.foreach(spec.setPriorityClassName(_))
+    pg.setSpec(spec)
 
-    Seq(podGroup.build())
+    Seq(pg)
   }
 
   override def configurePod(pod: SparkPod): SparkPod = {
diff --git 
a/resource-managers/kubernetes/core/src/test/resources/driver-podgroup-template.yml
 
b/resource-managers/kubernetes/core/src/test/resources/driver-podgroup-template.yml
new file mode 100644
index 0000000..085d6b8
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/resources/driver-podgroup-template.yml
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+apiVersion: scheduling.volcano.sh/v1beta1
+kind: PodGroup
+spec:
+  minMember: 1
+  minResources:
+    cpu: "2"
+    memory: "2048Mi"
+  priorityClassName: driver-priority
+  queue: driver-queue
diff --git 
a/resource-managers/kubernetes/core/src/test/resources/executor-podgroup-template.yml
 
b/resource-managers/kubernetes/core/src/test/resources/executor-podgroup-template.yml
new file mode 100644
index 0000000..f0f7b35
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/resources/executor-podgroup-template.yml
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+apiVersion: scheduling.volcano.sh/v1beta1
+kind: PodGroup
+spec:
+  minMember: 1000
+  minResources:
+    cpu: "4"
+    memory: "16Gi"
+  priorityClassName: executor-priority
+  queue: executor-queue
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStepSuite.scala
index 350df77..e7f1e31 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStepSuite.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.spark.deploy.k8s.features
 
+import java.io.File
+
 import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder}
 import io.fabric8.volcano.scheduling.v1beta1.PodGroup
 
@@ -78,6 +80,42 @@ class VolcanoFeatureStepSuite extends SparkFunSuite {
     verifyPriority(podWithPriority)
   }
 
+  test("SPARK-38455: Support driver podgroup template") {
+    val templatePath = new File(
+      
getClass.getResource("/driver-podgroup-template.yml").getFile).getAbsolutePath
+    val sparkConf = new SparkConf()
+      .set(KUBERNETES_DRIVER_PODGROUP_TEMPLATE_FILE.key, templatePath)
+    val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf)
+    val step = new VolcanoFeatureStep()
+    step.init(kubernetesConf)
+    step.configurePod(SparkPod.initialPod())
+    val podGroup = 
step.getAdditionalPreKubernetesResources().head.asInstanceOf[PodGroup]
+    assert(podGroup.getSpec.getMinMember == 1)
+    assert(podGroup.getSpec.getMinResources.get("cpu").getAmount == "2")
+    assert(podGroup.getSpec.getMinResources.get("memory").getAmount == "2048")
+    assert(podGroup.getSpec.getMinResources.get("memory").getFormat == "Mi")
+    assert(podGroup.getSpec.getPriorityClassName == "driver-priority")
+    assert(podGroup.getSpec.getQueue == "driver-queue")
+  }
+
+  test("SPARK-38455: Support executor podgroup template") {
+    val templatePath = new File(
+      
getClass.getResource("/executor-podgroup-template.yml").getFile).getAbsolutePath
+    val sparkConf = new SparkConf()
+      .set(KUBERNETES_EXECUTOR_PODGROUP_TEMPLATE_FILE.key, templatePath)
+    val kubernetesConf = KubernetesTestConf.createExecutorConf(sparkConf)
+    val step = new VolcanoFeatureStep()
+    step.init(kubernetesConf)
+    step.configurePod(SparkPod.initialPod())
+    val podGroup = 
step.getAdditionalPreKubernetesResources().head.asInstanceOf[PodGroup]
+    assert(podGroup.getSpec.getMinMember == 1000)
+    assert(podGroup.getSpec.getMinResources.get("cpu").getAmount == "4")
+    assert(podGroup.getSpec.getMinResources.get("memory").getAmount == "16")
+    assert(podGroup.getSpec.getMinResources.get("memory").getFormat == "Gi")
+    assert(podGroup.getSpec.getPriorityClassName == "executor-priority")
+    assert(podGroup.getSpec.getQueue == "executor-queue")
+  }
+
   private def verifyPriority(pod: SparkPod): Unit = {
     val sparkConf = new SparkConf()
     val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to