This is an automated email from the ASF dual-hosted git repository.

pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git


The following commit(s) were added to refs/heads/master by this push:
     new 346138b6 [YUNIKORN-1471] add preemption e2e test Signed-off-by: Frank 
Yang <[email protected]>
346138b6 is described below

commit 346138b64cd234da7708dbc7cb05a55b11c3553d
Author: Frank Yang <[email protected]>
AuthorDate: Fri Jun 9 12:22:24 2023 +0200

    [YUNIKORN-1471] add preemption e2e test
    Signed-off-by: Frank Yang <[email protected]>
    
    Closes: #608
    
    Signed-off-by: Peter Bacsko <[email protected]>
---
 test/e2e/framework/helpers/k8s/k8s_utils.go        |  31 ++
 .../framework/helpers/yunikorn/rest_api_utils.go   |   4 +-
 test/e2e/gang_scheduling/gang_scheduling_test.go   |   8 +-
 test/e2e/node_resources/node_resources_test.go     |  38 ++-
 test/e2e/preemption/preemption_suite_test.go       |  50 +++
 test/e2e/preemption/preemption_test.go             | 352 +++++++++++++++++++++
 .../recovery_and_restart_test.go                   |  18 +-
 test/e2e/simple_preemptor/simple_preemptor_test.go |  14 +-
 8 files changed, 466 insertions(+), 49 deletions(-)

diff --git a/test/e2e/framework/helpers/k8s/k8s_utils.go 
b/test/e2e/framework/helpers/k8s/k8s_utils.go
index 562c9dca..78922c20 100644
--- a/test/e2e/framework/helpers/k8s/k8s_utils.go
+++ b/test/e2e/framework/helpers/k8s/k8s_utils.go
@@ -765,6 +765,17 @@ func (k *KubeCtl) ListPodsByFieldSelector(namespace 
string, selector string) (*v
        return podList, nil
 }
 
+// Returns the list of pods in `namespace` with the given label selector
+func (k *KubeCtl) ListPodsByLabelSelector(namespace string, selector string) 
(*v1.PodList, error) {
+       listOptions := metav1.ListOptions{LabelSelector: selector}
+       podList, err := 
k.clientSet.CoreV1().Pods(namespace).List(context.TODO(), listOptions)
+
+       if err != nil {
+               return nil, err
+       }
+       return podList, nil
+}
+
 // Wait up to timeout seconds for all pods in 'namespace' with given 
'selector' to enter running state.
 // Returns an error if no pods are found or not all discovered pods enter 
running state.
 func (k *KubeCtl) WaitForPodBySelectorRunning(namespace string, selector 
string, timeout int) error {
@@ -1268,6 +1279,16 @@ func (k *KubeCtl) TaintNode(name, key, val string, 
effect v1.TaintEffect) error
        return err
 }
 
+func (k *KubeCtl) TaintNodes(names []string, key, val string, effect 
v1.TaintEffect) error {
+       for _, name := range names {
+               err := k.TaintNode(name, key, val, effect)
+               if err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
 func (k *KubeCtl) UntaintNode(name, key string) error {
        node, err := k.clientSet.CoreV1().Nodes().Get(context.TODO(), name, 
metav1.GetOptions{})
        if err != nil {
@@ -1286,6 +1307,16 @@ func (k *KubeCtl) UntaintNode(name, key string) error {
        return err
 }
 
+func (k *KubeCtl) UntaintNodes(names []string, key string) error {
+       for _, name := range names {
+               err := k.UntaintNode(name, key)
+               if err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
 func IsMasterNode(node *v1.Node) bool {
        for _, taint := range node.Spec.Taints {
                if _, ok := common.MasterTaints[taint.Key]; ok {
diff --git a/test/e2e/framework/helpers/yunikorn/rest_api_utils.go 
b/test/e2e/framework/helpers/yunikorn/rest_api_utils.go
index 97f4d73b..a12f976a 100644
--- a/test/e2e/framework/helpers/yunikorn/rest_api_utils.go
+++ b/test/e2e/framework/helpers/yunikorn/rest_api_utils.go
@@ -223,12 +223,12 @@ func (c *RClient) isAppInDesiredState(partition string, 
queue string, appID stri
        }
 }
 
-func (c *RClient) GetNodes(partition string) (*[]dao.NodesDAOInfo, error) {
+func (c *RClient) GetNodes(partition string) (*[]dao.NodeDAOInfo, error) {
        req, err := c.newRequest("GET", fmt.Sprintf(configmanager.NodesPath, 
partition), nil)
        if err != nil {
                return nil, err
        }
-       var nodes []dao.NodesDAOInfo
+       var nodes []dao.NodeDAOInfo
        _, err = c.do(req, &nodes)
        return &nodes, err
 }
diff --git a/test/e2e/gang_scheduling/gang_scheduling_test.go 
b/test/e2e/gang_scheduling/gang_scheduling_test.go
index 1653c1ed..8b9bd096 100644
--- a/test/e2e/gang_scheduling/gang_scheduling_test.go
+++ b/test/e2e/gang_scheduling/gang_scheduling_test.go
@@ -781,11 +781,9 @@ var _ = Describe("", func() {
                // Verify no app allocation in nodeA
                ykNodes, nodeErr := restClient.GetNodes(defaultPartition)
                Ω(nodeErr).NotTo(HaveOccurred())
-               for _, nodeDAO := range *ykNodes {
-                       for _, node := range nodeDAO.Nodes {
-                               for _, alloc := range node.Allocations {
-                                       
Ω(alloc.ApplicationID).NotTo(Equal(podConf.Labels["applicationId"]), 
"Placeholder allocation not removed from node")
-                               }
+               for _, node := range *ykNodes {
+                       for _, alloc := range node.Allocations {
+                               
Ω(alloc.ApplicationID).NotTo(Equal(podConf.Labels["applicationId"]), 
"Placeholder allocation not removed from node")
                        }
                }
 
diff --git a/test/e2e/node_resources/node_resources_test.go 
b/test/e2e/node_resources/node_resources_test.go
index 80070800..9087dfd6 100644
--- a/test/e2e/node_resources/node_resources_test.go
+++ b/test/e2e/node_resources/node_resources_test.go
@@ -74,26 +74,24 @@ var _ = Describe("", func() {
                Ω(restErr).NotTo(HaveOccurred())
 
                // For each yk node capacity, compare to official Kubernetes 
value
-               for _, nodeDAO := range *ykNodes {
-                       for _, ykNode := range nodeDAO.Nodes {
-                               nodeName := ykNode.NodeID
-                               var ykCapacity yunikorn.ResourceUsage
-                               ykCapacity.ParseResourceUsage(ykNode.Capacity)
-
-                               kubeNodeCPU := 
kClientNodeCapacities[nodeName][siCommon.CPU]
-                               kubeNodeMem := 
kClientNodeCapacities[nodeName][siCommon.Memory]
-                               kubeNodeMem.RoundUp(resource.Mega) // round to 
nearest megabyte for comparison
-
-                               // Compare memory to nearest megabyte
-                               roundedYKMem := ykCapacity.GetMemory()
-                               roundedYKMem.RoundUp(resource.Mega)
-                               cmpRes := kubeNodeMem.Cmp(roundedYKMem)
-                               Ω(cmpRes).To(Equal(0))
-
-                               // Compare cpu cores to nearest millicore
-                               cmpRes = kubeNodeCPU.Cmp(ykCapacity.GetCPU())
-                               Ω(cmpRes).To(Equal(0))
-                       }
+               for _, ykNode := range *ykNodes {
+                       nodeName := ykNode.NodeID
+                       var ykCapacity yunikorn.ResourceUsage
+                       ykCapacity.ParseResourceUsage(ykNode.Capacity)
+
+                       kubeNodeCPU := kClientNodeCapacities[nodeName]["cpu"]
+                       kubeNodeMem := 
kClientNodeCapacities[nodeName][siCommon.Memory]
+                       kubeNodeMem.RoundUp(resource.Mega) // round to nearest 
megabyte for comparison
+
+                       // Compare memory to nearest megabyte
+                       roundedYKMem := ykCapacity.GetMemory()
+                       roundedYKMem.RoundUp(resource.Mega)
+                       cmpRes := kubeNodeMem.Cmp(roundedYKMem)
+                       Ω(cmpRes).To(Equal(0))
+
+                       // Compare cpu cores to nearest millicore
+                       cmpRes = kubeNodeCPU.Cmp(ykCapacity.GetCPU())
+                       Ω(cmpRes).To(Equal(0))
                }
        })
 })
diff --git a/test/e2e/preemption/preemption_suite_test.go 
b/test/e2e/preemption/preemption_suite_test.go
new file mode 100644
index 00000000..680589ad
--- /dev/null
+++ b/test/e2e/preemption/preemption_suite_test.go
@@ -0,0 +1,50 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package preemption_test
+
+import (
+       "path/filepath"
+       "testing"
+
+       "github.com/onsi/ginkgo/v2"
+       "github.com/onsi/ginkgo/v2/reporters"
+       "github.com/onsi/gomega"
+
+       "github.com/apache/yunikorn-k8shim/test/e2e/framework/configmanager"
+)
+
+func init() {
+       configmanager.YuniKornTestConfig.ParseFlags()
+}
+
+func TestPreemption(t *testing.T) {
+       ginkgo.ReportAfterSuite("TestPreemption", func(report ginkgo.Report) {
+               err := reporters.GenerateJUnitReportWithConfig(
+                       report,
+                       filepath.Join(configmanager.YuniKornTestConfig.LogDir, 
"TEST-preemption_junit.xml"),
+                       reporters.JunitReportConfig{OmitSpecLabels: true},
+               )
+               Ω(err).NotTo(HaveOccurred())
+       })
+       gomega.RegisterFailHandler(ginkgo.Fail)
+       ginkgo.RunSpecs(t, "TestPreemption", ginkgo.Label("TestPreemption"))
+}
+
+var Ω = gomega.Ω
+var HaveOccurred = gomega.HaveOccurred
diff --git a/test/e2e/preemption/preemption_test.go 
b/test/e2e/preemption/preemption_test.go
new file mode 100644
index 00000000..cab16844
--- /dev/null
+++ b/test/e2e/preemption/preemption_test.go
@@ -0,0 +1,352 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package preemption_test
+
+import (
+       "fmt"
+       "strings"
+       "time"
+
+       "github.com/onsi/ginkgo/v2"
+       "github.com/onsi/gomega"
+       v1 "k8s.io/api/core/v1"
+
+       "github.com/apache/yunikorn-core/pkg/common/configs"
+       "github.com/apache/yunikorn-k8shim/pkg/common/constants"
+       tests "github.com/apache/yunikorn-k8shim/test/e2e"
+       "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common"
+       "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/k8s"
+       "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/yunikorn"
+)
+
+var kClient k8s.KubeCtl
+var restClient yunikorn.RClient
+var ns *v1.Namespace
+var dev = "dev" + common.RandSeq(5)
+var oldConfigMap = new(v1.ConfigMap)
+var annotation = "ann-" + common.RandSeq(10)
+
+// Nodes
+var Worker = ""
+var WorkerMemRes int64
+var sleepPodMemLimit int64
+var taintKey = "e2e_test_preemption"
+var nodesToTaint []string
+
+var _ = ginkgo.BeforeSuite(func() {
+       // Initializing kubectl client
+       kClient = k8s.KubeCtl{}
+       Ω(kClient.SetClient()).To(gomega.BeNil())
+       // Initializing rest client
+       restClient = yunikorn.RClient{}
+       Ω(restClient).NotTo(gomega.BeNil())
+
+       yunikorn.EnsureYuniKornConfigsPresent()
+
+       ginkgo.By("Port-forward the scheduler pod")
+       var err = kClient.PortForwardYkSchedulerPod()
+       Ω(err).NotTo(gomega.HaveOccurred())
+
+       ginkgo.By("create development namespace")
+       ns, err = kClient.CreateNamespace(dev, nil)
+       gomega.Ω(err).NotTo(gomega.HaveOccurred())
+       gomega.Ω(ns.Status.Phase).To(gomega.Equal(v1.NamespaceActive))
+
+       var nodes *v1.NodeList
+       nodes, err = kClient.GetNodes()
+       Ω(err).NotTo(gomega.HaveOccurred())
+       Ω(len(nodes.Items)).NotTo(gomega.BeZero(), "Nodes cant be empty")
+
+       // Extract node allocatable resources
+       for _, node := range nodes.Items {
+               // skip master if it's marked as such
+               node := node
+               if k8s.IsMasterNode(&node) || !k8s.IsComputeNode(&node) {
+                       continue
+               }
+               if Worker == "" {
+                       Worker = node.Name
+               } else {
+                       nodesToTaint = append(nodesToTaint, node.Name)
+               }
+       }
+       Ω(Worker).NotTo(gomega.BeEmpty(), "Worker node not found")
+
+       ginkgo.By("Tainting some nodes..")
+       err = kClient.TaintNodes(nodesToTaint, taintKey, "value", 
v1.TaintEffectNoSchedule)
+       Ω(err).NotTo(gomega.HaveOccurred())
+
+       nodesDAOInfo, err := restClient.GetNodes(constants.DefaultPartition)
+       Ω(err).NotTo(gomega.HaveOccurred())
+       Ω(nodesDAOInfo).NotTo(gomega.BeNil())
+
+       for _, node := range *nodesDAOInfo {
+               if node.NodeID == Worker {
+                       WorkerMemRes = node.Available["memory"]
+               }
+       }
+       WorkerMemRes /= (1000 * 1000) // change to M
+       fmt.Fprintf(ginkgo.GinkgoWriter, "Worker node %s available memory 
%dM\n", Worker, WorkerMemRes)
+
+       sleepPodMemLimit = int64(float64(WorkerMemRes) / 3)
+       Ω(sleepPodMemLimit).NotTo(gomega.BeZero(), "Sleep pod memory limit 
cannot be zero")
+       fmt.Fprintf(ginkgo.GinkgoWriter, "Sleep pod limit memory %dM\n", 
sleepPodMemLimit)
+})
+
+var _ = ginkgo.AfterSuite(func() {
+
+       ginkgo.By("Untainting some nodes")
+       err := kClient.UntaintNodes(nodesToTaint, taintKey)
+       Ω(err).NotTo(gomega.HaveOccurred(), "Could not remove taint from nodes 
"+strings.Join(nodesToTaint, ","))
+
+       ginkgo.By("Check Yunikorn's health")
+       checks, err := yunikorn.GetFailedHealthChecks()
+       Ω(err).NotTo(gomega.HaveOccurred())
+       Ω(checks).To(gomega.Equal(""), checks)
+
+       testDescription := ginkgo.CurrentSpecReport()
+       if testDescription.Failed() {
+               
tests.LogTestClusterInfoWrapper(testDescription.FailureMessage(), 
[]string{ns.Name})
+               tests.LogYunikornContainer(testDescription.FailureMessage())
+       }
+       ginkgo.By("Tearing down namespace: " + ns.Name)
+       err = kClient.TearDownNamespace(ns.Name)
+       Ω(err).NotTo(gomega.HaveOccurred())
+})
+
+var _ = ginkgo.Describe("Preemption", func() {
+       ginkgo.It("Verify_basic_preemption", func() {
+               ginkgo.By("A queue uses resource more than the guaranteed value 
even after removing one of the pods. The cluster doesn't have enough resource 
to deploy a pod in another queue which uses resource less than the guaranteed 
value.")
+               // update config
+               ginkgo.By(fmt.Sprintf("Update root.sandbox1 and root.sandbox2 
with guaranteed memory %dM", sleepPodMemLimit))
+               annotation = "ann-" + common.RandSeq(10)
+               yunikorn.UpdateCustomConfigMapWrapper(oldConfigMap, "", 
annotation, func(sc *configs.SchedulerConfig) error {
+                       // remove placement rules so we can control queue
+                       sc.Partitions[0].PlacementRules = nil
+
+                       var err error
+                       if err = common.AddQueue(sc, "default", "root", 
configs.QueueConfig{
+                               Name:       "sandbox1",
+                               Resources:  configs.Resources{Guaranteed: 
map[string]string{"memory": fmt.Sprintf("%dM", sleepPodMemLimit)}},
+                               Properties: 
map[string]string{"preemption.delay": "1s"},
+                       }); err != nil {
+                               return err
+                       }
+
+                       if err = common.AddQueue(sc, "default", "root", 
configs.QueueConfig{
+                               Name:       "sandbox2",
+                               Resources:  configs.Resources{Guaranteed: 
map[string]string{"memory": fmt.Sprintf("%dM", sleepPodMemLimit)}},
+                               Properties: 
map[string]string{"preemption.delay": "1s"},
+                       }); err != nil {
+                               return err
+                       }
+                       return nil
+               })
+
+               // Define sleepPod
+               sleepPodConfigs := createSandbox1SleepPodCofigs(3, 600)
+               sleepPod4Config := k8s.SleepPodConfig{Name: "sleepjob4", NS: 
dev, Mem: sleepPodMemLimit, Time: 600, Optedout: true, Labels: 
map[string]string{"queue": "root.sandbox2"}}
+               sleepPodConfigs = append(sleepPodConfigs, sleepPod4Config)
+
+               for _, config := range sleepPodConfigs {
+                       ginkgo.By("Deploy the sleep pod " + config.Name + " to 
the development namespace")
+                       sleepObj, podErr := k8s.InitSleepPod(config)
+                       Ω(podErr).NotTo(gomega.HaveOccurred())
+                       sleepRespPod, podErr := kClient.CreatePod(sleepObj, dev)
+                       gomega.Ω(podErr).NotTo(gomega.HaveOccurred())
+
+                       // Wait for pod to move to running state
+                       podErr = kClient.WaitForPodBySelectorRunning(dev,
+                               fmt.Sprintf("app=%s", 
sleepRespPod.ObjectMeta.Labels["app"]),
+                               60)
+                       gomega.Ω(podErr).NotTo(gomega.HaveOccurred())
+               }
+
+               // assert one of the pods in root.sandbox1 is preempted
+               ginkgo.By("One of the pods in root.sanbox1 is preempted")
+               sandbox1RunningPodsCnt := 0
+               pods, err := kClient.ListPodsByLabelSelector(dev, 
"queue=root.sandbox1")
+               gomega.Ω(err).NotTo(gomega.HaveOccurred())
+               for _, pod := range pods.Items {
+                       if pod.DeletionTimestamp != nil {
+                               continue
+                       }
+                       if pod.Status.Phase == v1.PodRunning {
+                               sandbox1RunningPodsCnt++
+                       }
+               }
+               Ω(sandbox1RunningPodsCnt).To(gomega.Equal(2), "One of the pods 
in root.sandbox1 should be preempted")
+       })
+
+       
ginkgo.It("Verify_no_preemption_on_resources_less_than_guaranteed_value", 
func() {
+               ginkgo.By("A queue uses resource less than the guaranteed value 
can't be preempted.")
+               // update config
+               ginkgo.By(fmt.Sprintf("Update root.sandbox1 and root.sandbox2 
with guaranteed memory %dM", WorkerMemRes))
+               annotation = "ann-" + common.RandSeq(10)
+               yunikorn.UpdateCustomConfigMapWrapper(oldConfigMap, "", 
annotation, func(sc *configs.SchedulerConfig) error {
+                       // remove placement rules so we can control queue
+                       sc.Partitions[0].PlacementRules = nil
+
+                       var err error
+                       if err = common.AddQueue(sc, "default", "root", 
configs.QueueConfig{
+                               Name:       "sandbox1",
+                               Resources:  configs.Resources{Guaranteed: 
map[string]string{"memory": fmt.Sprintf("%dM", WorkerMemRes)}},
+                               Properties: 
map[string]string{"preemption.delay": "1s"},
+                       }); err != nil {
+                               return err
+                       }
+
+                       if err = common.AddQueue(sc, "default", "root", 
configs.QueueConfig{
+                               Name:       "sandbox2",
+                               Resources:  configs.Resources{Guaranteed: 
map[string]string{"memory": fmt.Sprintf("%dM", WorkerMemRes)}},
+                               Properties: 
map[string]string{"preemption.delay": "1s"},
+                       }); err != nil {
+                               return err
+                       }
+                       return nil
+               })
+
+               // Define sleepPod
+               sandbox1SleepPodConfigs := createSandbox1SleepPodCofigs(3, 30)
+               sleepPod4Config := k8s.SleepPodConfig{Name: "sleepjob4", NS: 
dev, Mem: sleepPodMemLimit, Time: 30, Optedout: true, Labels: 
map[string]string{"queue": "root.sandbox2"}}
+
+               // Deploy pods in root.sandbox1
+               for _, config := range sandbox1SleepPodConfigs {
+                       ginkgo.By("Deploy the sleep pod " + config.Name + " to 
the development namespace")
+                       sleepObj, podErr := k8s.InitSleepPod(config)
+                       Ω(podErr).NotTo(gomega.HaveOccurred())
+                       sleepRespPod, podErr := kClient.CreatePod(sleepObj, dev)
+                       gomega.Ω(podErr).NotTo(gomega.HaveOccurred())
+
+                       // Wait for pod to move to running state
+                       podErr = kClient.WaitForPodBySelectorRunning(dev,
+                               fmt.Sprintf("app=%s", 
sleepRespPod.ObjectMeta.Labels["app"]),
+                               30)
+                       gomega.Ω(podErr).NotTo(gomega.HaveOccurred())
+               }
+
+               // Deploy sleepjob4 pod in root.sandbox2
+               ginkgo.By("Deploy the sleep pod " + sleepPod4Config.Name + " to 
the development namespace")
+               sleepObj, podErr := k8s.InitSleepPod(sleepPod4Config)
+               Ω(podErr).NotTo(gomega.HaveOccurred())
+               sleepRespPod4, err := kClient.CreatePod(sleepObj, dev)
+               gomega.Ω(err).NotTo(gomega.HaveOccurred())
+
+               // sleepjob4 pod can't be scheduled before pods in 
root.sandbox1 are succeeded
+               ginkgo.By("The sleep pod " + sleepPod4Config.Name + " can't be 
scheduled")
+               err = kClient.WaitForPodUnschedulable(sleepRespPod4, 
60*time.Second)
+               gomega.Ω(err).NotTo(gomega.HaveOccurred())
+
+               // pods in root.sandbox1 can be succeeded
+               ginkgo.By("The pods in root.sandbox1 can be succeeded")
+               for _, config := range sandbox1SleepPodConfigs {
+                       err = kClient.WaitForPodSucceeded(dev, config.Name, 
30*time.Second)
+                       gomega.Ω(err).NotTo(gomega.HaveOccurred())
+               }
+       })
+
+       ginkgo.It("Verify_no_preemption_outside_fence", func() {
+               ginkgo.By("The preemption can't go outside the fence.")
+               // update config
+               ginkgo.By(fmt.Sprintf("Update root.sandbox1 and root.sandbox2 
with guaranteed memory %dM. The root.sandbox2 has fence preemption policy.", 
sleepPodMemLimit))
+               annotation = "ann-" + common.RandSeq(10)
+               yunikorn.UpdateCustomConfigMapWrapper(oldConfigMap, "", 
annotation, func(sc *configs.SchedulerConfig) error {
+                       // remove placement rules so we can control queue
+                       sc.Partitions[0].PlacementRules = nil
+
+                       var err error
+                       if err = common.AddQueue(sc, "default", "root", 
configs.QueueConfig{
+                               Name:       "sandbox1",
+                               Resources:  configs.Resources{Guaranteed: 
map[string]string{"memory": fmt.Sprintf("%dM", sleepPodMemLimit)}},
+                               Properties: 
map[string]string{"preemption.delay": "1s"},
+                       }); err != nil {
+                               return err
+                       }
+
+                       if err = common.AddQueue(sc, "default", "root", 
configs.QueueConfig{
+                               Name:       "sandbox2",
+                               Resources:  configs.Resources{Guaranteed: 
map[string]string{"memory": fmt.Sprintf("%dM", sleepPodMemLimit)}},
+                               Properties: 
map[string]string{"preemption.delay": "1s", "preemption.policy": "fence"},
+                       }); err != nil {
+                               return err
+                       }
+                       return nil
+               })
+
+               // Define sleepPod
+               sandbox1SleepPodConfigs := createSandbox1SleepPodCofigs(3, 30)
+               sleepPod4Config := k8s.SleepPodConfig{Name: "sleepjob4", NS: 
dev, Mem: sleepPodMemLimit, Time: 30, Optedout: true, Labels: 
map[string]string{"queue": "root.sandbox2"}}
+
+               // Deploy pods in root.sandbox1
+               for _, config := range sandbox1SleepPodConfigs {
+                       ginkgo.By("Deploy the sleep pod " + config.Name + " to 
the development namespace")
+                       sleepObj, podErr := k8s.InitSleepPod(config)
+                       Ω(podErr).NotTo(gomega.HaveOccurred())
+                       sleepRespPod, podErr := kClient.CreatePod(sleepObj, dev)
+                       gomega.Ω(podErr).NotTo(gomega.HaveOccurred())
+
+                       // Wait for pod to move to running state
+                       podErr = kClient.WaitForPodBySelectorRunning(dev,
+                               fmt.Sprintf("app=%s", 
sleepRespPod.ObjectMeta.Labels["app"]),
+                               30)
+                       gomega.Ω(podErr).NotTo(gomega.HaveOccurred())
+               }
+
+               // Deploy sleepjob4 pod in root.sandbox2
+               ginkgo.By("Deploy the sleep pod " + sleepPod4Config.Name + " to 
the development namespace")
+               sleepObj, podErr := k8s.InitSleepPod(sleepPod4Config)
+               Ω(podErr).NotTo(gomega.HaveOccurred())
+               sleepRespPod4, err := kClient.CreatePod(sleepObj, dev)
+               gomega.Ω(err).NotTo(gomega.HaveOccurred())
+
+               // sleepjob4 pod can't be scheduled before pods in 
root.sandbox1 are succeeded
+               ginkgo.By("The sleep pod " + sleepPod4Config.Name + " can't be 
scheduled")
+               err = kClient.WaitForPodUnschedulable(sleepRespPod4, 
60*time.Second)
+               gomega.Ω(err).NotTo(gomega.HaveOccurred())
+
+               // pods in root.sandbox1 can be succeeded
+               ginkgo.By("The pods in root.sandbox1 can be succeeded")
+               for _, config := range sandbox1SleepPodConfigs {
+                       err = kClient.WaitForPodSucceeded(dev, config.Name, 
30*time.Second)
+                       gomega.Ω(err).NotTo(gomega.HaveOccurred())
+               }
+       })
+
+       ginkgo.AfterEach(func() {
+
+               // Delete all sleep pods
+               ginkgo.By("Delete all sleep pods")
+               err := kClient.DeletePods(ns.Name)
+               if err != nil {
+                       fmt.Fprintf(ginkgo.GinkgoWriter, "Failed to delete pods 
in namespace %s - reason is %s\n", ns.Name, err.Error())
+               }
+
+               // reset config
+               ginkgo.By("Restoring YuniKorn configuration")
+               yunikorn.RestoreConfigMapWrapper(oldConfigMap, annotation)
+       })
+})
+
+func createSandbox1SleepPodCofigs(cnt, time int) []k8s.SleepPodConfig {
+       sandbox1Configs := make([]k8s.SleepPodConfig, 0, cnt)
+       for i := 0; i < cnt; i++ {
+               sandbox1Configs = append(sandbox1Configs, 
k8s.SleepPodConfig{Name: fmt.Sprintf("sleepjob%d", i+1), NS: dev, Mem: 
sleepPodMemLimit, Time: time, Optedout: true, Labels: 
map[string]string{"queue": "root.sandbox1"}})
+       }
+       return sandbox1Configs
+}
diff --git a/test/e2e/recovery_and_restart/recovery_and_restart_test.go 
b/test/e2e/recovery_and_restart/recovery_and_restart_test.go
index 6ce0a712..c9d0fcdd 100644
--- a/test/e2e/recovery_and_restart/recovery_and_restart_test.go
+++ b/test/e2e/recovery_and_restart/recovery_and_restart_test.go
@@ -313,19 +313,15 @@ var _ = ginkgo.Describe("", func() {
                        selectedNode, memoryGiB, placeholderCount)
 
                ginkgo.By("Tainting all nodes except " + selectedNode)
-               for _, nodeName := range nodesToTaint {
-                       err = kClient.TaintNode(nodeName, taintKey, "value", 
v1.TaintEffectNoSchedule)
-                       Ω(err).NotTo(gomega.HaveOccurred())
-               }
+               err = kClient.TaintNodes(nodesToTaint, taintKey, "value", 
v1.TaintEffectNoSchedule)
+               Ω(err).NotTo(gomega.HaveOccurred())
 
                removeTaint := true
                defer func() {
                        if removeTaint {
                                ginkgo.By("Untainting nodes (defer)")
-                               for _, nodeName := range nodesToTaint {
-                                       err = kClient.UntaintNode(nodeName, 
taintKey)
-                                       Ω(err).NotTo(gomega.HaveOccurred(), 
"Could not remove taint from node "+nodeName)
-                               }
+                               err = kClient.UntaintNodes(nodesToTaint, 
taintKey)
+                               Ω(err).NotTo(gomega.HaveOccurred(), "Could not 
remove taint from nodes "+strings.Join(nodesToTaint, ","))
                        }
                }()
 
@@ -353,10 +349,8 @@ var _ = ginkgo.Describe("", func() {
 
                ginkgo.By("Untainting nodes")
                removeTaint = false
-               for _, nodeName := range nodesToTaint {
-                       err = kClient.UntaintNode(nodeName, taintKey)
-                       Ω(err).NotTo(gomega.HaveOccurred(), "Could not remove 
taint from node "+nodeName)
-               }
+               err = kClient.UntaintNodes(nodesToTaint, taintKey)
+               Ω(err).NotTo(gomega.HaveOccurred(), "Could not remove taint 
from nodes "+strings.Join(nodesToTaint, ","))
 
                ginkgo.By("Waiting for placeholder replacement & sleep pods to 
finish")
                err = kClient.WaitForJobPodsSucceeded(dev, job.Name, 1, 
60*time.Second)
diff --git a/test/e2e/simple_preemptor/simple_preemptor_test.go 
b/test/e2e/simple_preemptor/simple_preemptor_test.go
index a0ffd6e9..18da4178 100644
--- a/test/e2e/simple_preemptor/simple_preemptor_test.go
+++ b/test/e2e/simple_preemptor/simple_preemptor_test.go
@@ -97,11 +97,8 @@ var _ = ginkgo.BeforeSuite(func() {
        }
 
        ginkgo.By("Tainting some nodes..")
-       for _, nodeName := range nodesToTaint {
-               ginkgo.By("Tainting node " + nodeName)
-               err = kClient.TaintNode(nodeName, taintKey, "value", 
v1.TaintEffectNoSchedule)
-               Ω(err).NotTo(gomega.HaveOccurred())
-       }
+       err = kClient.TaintNodes(nodesToTaint, taintKey, "value", 
v1.TaintEffectNoSchedule)
+       Ω(err).NotTo(gomega.HaveOccurred())
 
        var pods *v1.PodList
        totalPodQuantity1 := *resource.NewQuantity(0, resource.DecimalSI)
@@ -127,11 +124,8 @@ var _ = ginkgo.BeforeSuite(func() {
 var _ = ginkgo.AfterSuite(func() {
 
        ginkgo.By("Untainting some nodes")
-       for _, nodeName := range nodesToTaint {
-               ginkgo.By("Untainting node " + nodeName)
-               err := kClient.UntaintNode(nodeName, taintKey)
-               Ω(err).NotTo(gomega.HaveOccurred(), "Could not remove taint 
from node "+nodeName)
-       }
+       err := kClient.UntaintNodes(nodesToTaint, taintKey)
+       Ω(err).NotTo(gomega.HaveOccurred(), "Could not remove taint from nodes 
"+strings.Join(nodesToTaint, ","))
 
        ginkgo.By("Check Yunikorn's health")
        checks, err := yunikorn.GetFailedHealthChecks()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to