This is an automated email from the ASF dual-hosted git repository.

mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git


The following commit(s) were added to refs/heads/master by this push:
     new 358751a6 [YUNIKORN-1935] add gang scheduling with priority test (#659)
358751a6 is described below

commit 358751a6dd3c7fd4489d301bbcdb3292397c3363
Author: PoAn Yang <[email protected]>
AuthorDate: Thu Sep 7 11:17:09 2023 +0530

    [YUNIKORN-1935] add gang scheduling with priority test (#659)
    
    Closes: #659
    
    Signed-off-by: Manikandan R <[email protected]>
---
 .../priority_scheduling_test.go                    | 192 ++++++++++++++++++++-
 1 file changed, 185 insertions(+), 7 deletions(-)

diff --git a/test/e2e/priority_scheduling/priority_scheduling_test.go 
b/test/e2e/priority_scheduling/priority_scheduling_test.go
index 6514090b..40dd8ce1 100644
--- a/test/e2e/priority_scheduling/priority_scheduling_test.go
+++ b/test/e2e/priority_scheduling/priority_scheduling_test.go
@@ -28,6 +28,7 @@ import (
        "k8s.io/apimachinery/pkg/api/resource"
 
        "github.com/apache/yunikorn-core/pkg/common/configs"
+       "github.com/apache/yunikorn-k8shim/pkg/appmgmt/interfaces"
        "github.com/apache/yunikorn-k8shim/pkg/common/constants"
        tests "github.com/apache/yunikorn-k8shim/test/e2e"
        "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common"
@@ -41,15 +42,17 @@ const (
        requestMem = "100M"
 )
 
-var rr = &v1.ResourceRequirements{
-       Requests: v1.ResourceList{
-               v1.ResourceCPU:    resource.MustParse(requestCPU),
-               v1.ResourceMemory: resource.MustParse(requestMem),
-       },
-}
+var (
+       ns string
+       rr = &v1.ResourceRequirements{
+               Requests: v1.ResourceList{
+                       v1.ResourceCPU:    resource.MustParse(requestCPU),
+                       v1.ResourceMemory: resource.MustParse(requestMem),
+               },
+       }
+)
 
 var _ = ginkgo.Describe("PriorityScheduling", func() {
-       var ns string
        var namespace *v1.Namespace
        var err error
        var oldConfigMap = new(v1.ConfigMap)
@@ -251,6 +254,158 @@ var _ = ginkgo.Describe("PriorityScheduling", func() {
                validatePodSchedulingOrder(ns, sleepPodConf, lowPodConf, 
normalPodConf, highPodConf)
        })
 
+       ginkgo.It("Verify_Gang_Scheduling_With_Priority", func() {
+               By("Setting custom YuniKorn configuration")
+               annotation = "ann-" + common.RandSeq(10)
+               yunikorn.UpdateCustomConfigMapWrapper(oldConfigMap, "fifo", 
annotation, func(sc *configs.SchedulerConfig) error {
+                       // remove placement rules so we can control queue
+                       sc.Partitions[0].PlacementRules = nil
+
+                       if err = common.AddQueue(sc, "default", "root", 
configs.QueueConfig{
+                               Name:      "default",
+                               Parent:    false,
+                               Resources: configs.Resources{Max: 
map[string]string{siCommon.CPU: "100m", siCommon.Memory: "100M"}},
+                       }); err != nil {
+                               return err
+                       }
+
+                       return nil
+               })
+               sleepPodConf = k8s.TestPodConfig{
+                       Name: "test-sleep-" + common.RandSeq(5),
+                       Labels: map[string]string{
+                               constants.LabelQueueName:     "root.default",
+                               constants.LabelApplicationID: "app-sleep-" + 
common.RandSeq(5)},
+                       Namespace: ns,
+                       Resources: rr,
+               }
+
+               taskGroupMinResource := map[string]resource.Quantity{}
+               for k, v := range rr.Requests {
+                       taskGroupMinResource[k.String()] = v
+               }
+               lowPodConf = createPodConfWithTaskGroup("low", 
lowPriorityClass.Name, taskGroupMinResource)
+               normalPodConf = createPodConfWithTaskGroup("normal", 
normalPriorityClass.Name, taskGroupMinResource)
+               highPodConf = createPodConfWithTaskGroup("high", 
highPriorityClass.Name, taskGroupMinResource)
+
+               var sleepPod, lowPod, normalPod, highPod *v1.Pod
+               By("Create sleep pod to consume queue")
+               sleepPod, err = k8s.InitTestPod(sleepPodConf)
+               Ω(err).NotTo(gomega.HaveOccurred())
+               sleepPod, err = kubeClient.CreatePod(sleepPod, ns)
+               Ω(err).NotTo(gomega.HaveOccurred())
+               err = kubeClient.WaitForPodRunning(ns, sleepPod.Name, 
1*time.Minute)
+               Ω(err).NotTo(gomega.HaveOccurred())
+
+               By("Submit low priority job")
+               lowPod, err = k8s.InitTestPod(lowPodConf)
+               Ω(err).NotTo(gomega.HaveOccurred())
+               lowJob := 
k8s.InitTestJob(lowPod.Labels[constants.LabelApplicationID], 1, 1, lowPod)
+               lowJob, err = kubeClient.CreateJob(lowJob, ns)
+               Ω(err).NotTo(gomega.HaveOccurred())
+               err = kubeClient.WaitForJobPodsCreated(ns, lowJob.Name, 1, 
30*time.Second)
+               Ω(err).NotTo(gomega.HaveOccurred())
+
+               By("Submit normal priority job")
+               normalPod, err = k8s.InitTestPod(normalPodConf)
+               Ω(err).NotTo(gomega.HaveOccurred())
+               normalJob := 
k8s.InitTestJob(normalPod.Labels[constants.LabelApplicationID], 1, 1, normalPod)
+               normalJob, err = kubeClient.CreateJob(normalJob, ns)
+               Ω(err).NotTo(gomega.HaveOccurred())
+               err = kubeClient.WaitForJobPodsCreated(ns, normalJob.Name, 1, 
30*time.Second)
+               Ω(err).NotTo(gomega.HaveOccurred())
+
+               By("Submit high priority job")
+               highPod, err = k8s.InitTestPod(highPodConf)
+               Ω(err).NotTo(gomega.HaveOccurred())
+               highJob := 
k8s.InitTestJob(highPod.Labels[constants.LabelApplicationID], 1, 1, highPod)
+               highJob, err = kubeClient.CreateJob(highJob, ns)
+               Ω(err).NotTo(gomega.HaveOccurred())
+               err = kubeClient.WaitForJobPodsCreated(ns, highJob.Name, 1, 
30*time.Second)
+               Ω(err).NotTo(gomega.HaveOccurred())
+
+               By("Wait for scheduler state to settle")
+               time.Sleep(10 * time.Second)
+
+               var lowPods, normalPods, highPods *v1.PodList
+               lowPods, err = kubeClient.ListPods(ns, 
fmt.Sprintf("job-name=%s", lowJob.Name))
+               Ω(err).NotTo(gomega.HaveOccurred())
+               lowPod = &lowPods.Items[0]
+
+               normalPods, err = kubeClient.ListPods(ns, 
fmt.Sprintf("job-name=%s", normalJob.Name))
+               Ω(err).NotTo(gomega.HaveOccurred())
+               normalPod = &normalPods.Items[0]
+
+               highPods, err = kubeClient.ListPods(ns, 
fmt.Sprintf("job-name=%s", highJob.Name))
+               Ω(err).NotTo(gomega.HaveOccurred())
+               highPod = &highPods.Items[0]
+
+               By("Ensure no test pods are running")
+               ensureNotRunning(ns, lowPod, normalPod, highPod)
+
+               By("Kill sleep pod to make room for test pods")
+               err = kubeClient.DeletePod(sleepPod.Name, ns)
+               Ω(err).NotTo(gomega.HaveOccurred())
+
+               By("Wait for high-priority placeholders terminated")
+               var tgPlaceHolders map[string][]string
+               tgPlaceHolders = 
yunikorn.GetPlaceholderNames(highPodConf.Annotations, 
highPodConf.Labels["applicationId"])
+               for _, phNames := range tgPlaceHolders {
+                       for _, ph := range phNames {
+                               phTermErr := 
kubeClient.WaitForPodTerminated(ns, ph, 1*time.Minute)
+                               Ω(phTermErr).NotTo(HaveOccurred())
+                       }
+               }
+
+               By("Wait for high-priority pod to begin running")
+               err = kubeClient.WaitForPodRunning(ns, highPod.Name, 
1*time.Minute)
+               Ω(err).NotTo(gomega.HaveOccurred())
+
+               By("Ensure low and normal priority pods are not running")
+               ensureNotRunning(ns, lowPod, normalPod)
+
+               By("Kill high-priority job")
+               err = kubeClient.DeleteJob(highJob.Name, ns)
+               Ω(err).NotTo(gomega.HaveOccurred())
+
+               By("Wait for normal-priority placeholders terminated")
+               tgPlaceHolders = 
yunikorn.GetPlaceholderNames(normalPodConf.Annotations, 
normalPodConf.Labels["applicationId"])
+               for _, phNames := range tgPlaceHolders {
+                       for _, ph := range phNames {
+                               phTermErr := 
kubeClient.WaitForPodTerminated(ns, ph, 1*time.Minute)
+                               Ω(phTermErr).NotTo(HaveOccurred())
+                       }
+               }
+
+               By("Wait for normal-priority pod to begin running")
+               err = kubeClient.WaitForPodRunning(ns, normalPod.Name, 
1*time.Minute)
+               Ω(err).NotTo(gomega.HaveOccurred())
+
+               By("Ensure low priority pod is not running")
+               ensureNotRunning(ns, lowPod)
+
+               By("Kill normal-priority job")
+               err = kubeClient.DeleteJob(normalJob.Name, ns)
+               Ω(err).NotTo(gomega.HaveOccurred())
+
+               By("Wait for low-priority placeholders terminated")
+               tgPlaceHolders = 
yunikorn.GetPlaceholderNames(lowPodConf.Annotations, 
lowPodConf.Labels["applicationId"])
+               for _, phNames := range tgPlaceHolders {
+                       for _, ph := range phNames {
+                               phTermErr := 
kubeClient.WaitForPodTerminated(ns, ph, 1*time.Minute)
+                               Ω(phTermErr).NotTo(HaveOccurred())
+                       }
+               }
+
+               By("Wait for low-priority pod to begin running")
+               err = kubeClient.WaitForPodRunning(ns, lowPod.Name, 
1*time.Minute)
+               Ω(err).NotTo(gomega.HaveOccurred())
+
+               By("Kill low-priority job")
+               err = kubeClient.DeleteJob(lowJob.Name, ns)
+               Ω(err).NotTo(gomega.HaveOccurred())
+       })
+
        ginkgo.AfterEach(func() {
                testDescription := ginkgo.CurrentSpecReport()
                if testDescription.Failed() {
@@ -355,3 +510,26 @@ func ensureNotRunning(ns string, pods ...*v1.Pod) {
                Ω(podResult.Status.Phase).ShouldNot(Equal(v1.PodRunning), 
pod.Name)
        }
 }
+
+func createPodConfWithTaskGroup(name, priorityClassName string, 
taskGroupMinResource map[string]resource.Quantity) k8s.TestPodConfig {
+       return k8s.TestPodConfig{
+               Name: fmt.Sprintf("test-%s-priority-%s", name, 
common.RandSeq(5)),
+               Labels: map[string]string{
+                       constants.LabelQueueName:     "root.default",
+                       constants.LabelApplicationID: fmt.Sprintf("app-%s-%s", 
name, common.RandSeq(5))},
+               Namespace: ns,
+               Annotations: &k8s.PodAnnotation{
+                       TaskGroupName: "group-" + name,
+                       TaskGroups: []interfaces.TaskGroup{
+                               {
+                                       Name:        "group-" + name,
+                                       MinMember:   int32(1),
+                                       MinResource: taskGroupMinResource,
+                               },
+                       },
+               },
+               Resources:         rr,
+               PriorityClassName: priorityClassName,
+               RestartPolicy:     v1.RestartPolicyNever,
+       }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to