This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push:
new 346138b6 [YUNIKORN-1471] add preemption e2e test Signed-off-by: Frank
Yang <[email protected]>
346138b6 is described below
commit 346138b64cd234da7708dbc7cb05a55b11c3553d
Author: Frank Yang <[email protected]>
AuthorDate: Fri Jun 9 12:22:24 2023 +0200
[YUNIKORN-1471] add preemption e2e test
Signed-off-by: Frank Yang <[email protected]>
Closes: #608
Signed-off-by: Peter Bacsko <[email protected]>
---
test/e2e/framework/helpers/k8s/k8s_utils.go | 31 ++
.../framework/helpers/yunikorn/rest_api_utils.go | 4 +-
test/e2e/gang_scheduling/gang_scheduling_test.go | 8 +-
test/e2e/node_resources/node_resources_test.go | 38 ++-
test/e2e/preemption/preemption_suite_test.go | 50 +++
test/e2e/preemption/preemption_test.go | 352 +++++++++++++++++++++
.../recovery_and_restart_test.go | 18 +-
test/e2e/simple_preemptor/simple_preemptor_test.go | 14 +-
8 files changed, 466 insertions(+), 49 deletions(-)
diff --git a/test/e2e/framework/helpers/k8s/k8s_utils.go
b/test/e2e/framework/helpers/k8s/k8s_utils.go
index 562c9dca..78922c20 100644
--- a/test/e2e/framework/helpers/k8s/k8s_utils.go
+++ b/test/e2e/framework/helpers/k8s/k8s_utils.go
@@ -765,6 +765,17 @@ func (k *KubeCtl) ListPodsByFieldSelector(namespace
string, selector string) (*v
return podList, nil
}
+// Returns the list of pods in `namespace` with the given label selector
+func (k *KubeCtl) ListPodsByLabelSelector(namespace string, selector string)
(*v1.PodList, error) {
+ listOptions := metav1.ListOptions{LabelSelector: selector}
+ podList, err :=
k.clientSet.CoreV1().Pods(namespace).List(context.TODO(), listOptions)
+
+ if err != nil {
+ return nil, err
+ }
+ return podList, nil
+}
+
// Wait up to timeout seconds for all pods in 'namespace' with given
'selector' to enter running state.
// Returns an error if no pods are found or not all discovered pods enter
running state.
func (k *KubeCtl) WaitForPodBySelectorRunning(namespace string, selector
string, timeout int) error {
@@ -1268,6 +1279,16 @@ func (k *KubeCtl) TaintNode(name, key, val string,
effect v1.TaintEffect) error
return err
}
+func (k *KubeCtl) TaintNodes(names []string, key, val string, effect
v1.TaintEffect) error {
+ for _, name := range names {
+ err := k.TaintNode(name, key, val, effect)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
func (k *KubeCtl) UntaintNode(name, key string) error {
node, err := k.clientSet.CoreV1().Nodes().Get(context.TODO(), name,
metav1.GetOptions{})
if err != nil {
@@ -1286,6 +1307,16 @@ func (k *KubeCtl) UntaintNode(name, key string) error {
return err
}
+func (k *KubeCtl) UntaintNodes(names []string, key string) error {
+ for _, name := range names {
+ err := k.UntaintNode(name, key)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
func IsMasterNode(node *v1.Node) bool {
for _, taint := range node.Spec.Taints {
if _, ok := common.MasterTaints[taint.Key]; ok {
diff --git a/test/e2e/framework/helpers/yunikorn/rest_api_utils.go
b/test/e2e/framework/helpers/yunikorn/rest_api_utils.go
index 97f4d73b..a12f976a 100644
--- a/test/e2e/framework/helpers/yunikorn/rest_api_utils.go
+++ b/test/e2e/framework/helpers/yunikorn/rest_api_utils.go
@@ -223,12 +223,12 @@ func (c *RClient) isAppInDesiredState(partition string,
queue string, appID stri
}
}
-func (c *RClient) GetNodes(partition string) (*[]dao.NodesDAOInfo, error) {
+func (c *RClient) GetNodes(partition string) (*[]dao.NodeDAOInfo, error) {
req, err := c.newRequest("GET", fmt.Sprintf(configmanager.NodesPath,
partition), nil)
if err != nil {
return nil, err
}
- var nodes []dao.NodesDAOInfo
+ var nodes []dao.NodeDAOInfo
_, err = c.do(req, &nodes)
return &nodes, err
}
diff --git a/test/e2e/gang_scheduling/gang_scheduling_test.go
b/test/e2e/gang_scheduling/gang_scheduling_test.go
index 1653c1ed..8b9bd096 100644
--- a/test/e2e/gang_scheduling/gang_scheduling_test.go
+++ b/test/e2e/gang_scheduling/gang_scheduling_test.go
@@ -781,11 +781,9 @@ var _ = Describe("", func() {
// Verify no app allocation in nodeA
ykNodes, nodeErr := restClient.GetNodes(defaultPartition)
Ω(nodeErr).NotTo(HaveOccurred())
- for _, nodeDAO := range *ykNodes {
- for _, node := range nodeDAO.Nodes {
- for _, alloc := range node.Allocations {
-
Ω(alloc.ApplicationID).NotTo(Equal(podConf.Labels["applicationId"]),
"Placeholder allocation not removed from node")
- }
+ for _, node := range *ykNodes {
+ for _, alloc := range node.Allocations {
+
Ω(alloc.ApplicationID).NotTo(Equal(podConf.Labels["applicationId"]),
"Placeholder allocation not removed from node")
}
}
diff --git a/test/e2e/node_resources/node_resources_test.go
b/test/e2e/node_resources/node_resources_test.go
index 80070800..9087dfd6 100644
--- a/test/e2e/node_resources/node_resources_test.go
+++ b/test/e2e/node_resources/node_resources_test.go
@@ -74,26 +74,24 @@ var _ = Describe("", func() {
Ω(restErr).NotTo(HaveOccurred())
// For each yk node capacity, compare to official Kubernetes
value
- for _, nodeDAO := range *ykNodes {
- for _, ykNode := range nodeDAO.Nodes {
- nodeName := ykNode.NodeID
- var ykCapacity yunikorn.ResourceUsage
- ykCapacity.ParseResourceUsage(ykNode.Capacity)
-
- kubeNodeCPU :=
kClientNodeCapacities[nodeName][siCommon.CPU]
- kubeNodeMem :=
kClientNodeCapacities[nodeName][siCommon.Memory]
- kubeNodeMem.RoundUp(resource.Mega) // round to
nearest megabyte for comparison
-
- // Compare memory to nearest megabyte
- roundedYKMem := ykCapacity.GetMemory()
- roundedYKMem.RoundUp(resource.Mega)
- cmpRes := kubeNodeMem.Cmp(roundedYKMem)
- Ω(cmpRes).To(Equal(0))
-
- // Compare cpu cores to nearest millicore
- cmpRes = kubeNodeCPU.Cmp(ykCapacity.GetCPU())
- Ω(cmpRes).To(Equal(0))
- }
+ for _, ykNode := range *ykNodes {
+ nodeName := ykNode.NodeID
+ var ykCapacity yunikorn.ResourceUsage
+ ykCapacity.ParseResourceUsage(ykNode.Capacity)
+
+ kubeNodeCPU := kClientNodeCapacities[nodeName]["cpu"]
+ kubeNodeMem :=
kClientNodeCapacities[nodeName][siCommon.Memory]
+ kubeNodeMem.RoundUp(resource.Mega) // round to nearest
megabyte for comparison
+
+ // Compare memory to nearest megabyte
+ roundedYKMem := ykCapacity.GetMemory()
+ roundedYKMem.RoundUp(resource.Mega)
+ cmpRes := kubeNodeMem.Cmp(roundedYKMem)
+ Ω(cmpRes).To(Equal(0))
+
+ // Compare cpu cores to nearest millicore
+ cmpRes = kubeNodeCPU.Cmp(ykCapacity.GetCPU())
+ Ω(cmpRes).To(Equal(0))
}
})
})
diff --git a/test/e2e/preemption/preemption_suite_test.go
b/test/e2e/preemption/preemption_suite_test.go
new file mode 100644
index 00000000..680589ad
--- /dev/null
+++ b/test/e2e/preemption/preemption_suite_test.go
@@ -0,0 +1,50 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package preemption_test
+
+import (
+ "path/filepath"
+ "testing"
+
+ "github.com/onsi/ginkgo/v2"
+ "github.com/onsi/ginkgo/v2/reporters"
+ "github.com/onsi/gomega"
+
+ "github.com/apache/yunikorn-k8shim/test/e2e/framework/configmanager"
+)
+
+func init() {
+ configmanager.YuniKornTestConfig.ParseFlags()
+}
+
+func TestPreemption(t *testing.T) {
+ ginkgo.ReportAfterSuite("TestPreemption", func(report ginkgo.Report) {
+ err := reporters.GenerateJUnitReportWithConfig(
+ report,
+ filepath.Join(configmanager.YuniKornTestConfig.LogDir,
"TEST-preemption_junit.xml"),
+ reporters.JunitReportConfig{OmitSpecLabels: true},
+ )
+ Ω(err).NotTo(HaveOccurred())
+ })
+ gomega.RegisterFailHandler(ginkgo.Fail)
+ ginkgo.RunSpecs(t, "TestPreemption", ginkgo.Label("TestPreemption"))
+}
+
+var Ω = gomega.Ω
+var HaveOccurred = gomega.HaveOccurred
diff --git a/test/e2e/preemption/preemption_test.go
b/test/e2e/preemption/preemption_test.go
new file mode 100644
index 00000000..cab16844
--- /dev/null
+++ b/test/e2e/preemption/preemption_test.go
@@ -0,0 +1,352 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package preemption_test
+
+import (
+ "fmt"
+ "strings"
+ "time"
+
+ "github.com/onsi/ginkgo/v2"
+ "github.com/onsi/gomega"
+ v1 "k8s.io/api/core/v1"
+
+ "github.com/apache/yunikorn-core/pkg/common/configs"
+ "github.com/apache/yunikorn-k8shim/pkg/common/constants"
+ tests "github.com/apache/yunikorn-k8shim/test/e2e"
+ "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common"
+ "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/k8s"
+ "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/yunikorn"
+)
+
+var kClient k8s.KubeCtl
+var restClient yunikorn.RClient
+var ns *v1.Namespace
+var dev = "dev" + common.RandSeq(5)
+var oldConfigMap = new(v1.ConfigMap)
+var annotation = "ann-" + common.RandSeq(10)
+
+// Nodes
+var Worker = ""
+var WorkerMemRes int64
+var sleepPodMemLimit int64
+var taintKey = "e2e_test_preemption"
+var nodesToTaint []string
+
+var _ = ginkgo.BeforeSuite(func() {
+ // Initializing kubectl client
+ kClient = k8s.KubeCtl{}
+ Ω(kClient.SetClient()).To(gomega.BeNil())
+ // Initializing rest client
+ restClient = yunikorn.RClient{}
+ Ω(restClient).NotTo(gomega.BeNil())
+
+ yunikorn.EnsureYuniKornConfigsPresent()
+
+ ginkgo.By("Port-forward the scheduler pod")
+ var err = kClient.PortForwardYkSchedulerPod()
+ Ω(err).NotTo(gomega.HaveOccurred())
+
+ ginkgo.By("create development namespace")
+ ns, err = kClient.CreateNamespace(dev, nil)
+ gomega.Ω(err).NotTo(gomega.HaveOccurred())
+ gomega.Ω(ns.Status.Phase).To(gomega.Equal(v1.NamespaceActive))
+
+ var nodes *v1.NodeList
+ nodes, err = kClient.GetNodes()
+ Ω(err).NotTo(gomega.HaveOccurred())
+ Ω(len(nodes.Items)).NotTo(gomega.BeZero(), "Nodes cant be empty")
+
+ // Extract node allocatable resources
+ for _, node := range nodes.Items {
+ // skip master if it's marked as such
+ node := node
+ if k8s.IsMasterNode(&node) || !k8s.IsComputeNode(&node) {
+ continue
+ }
+ if Worker == "" {
+ Worker = node.Name
+ } else {
+ nodesToTaint = append(nodesToTaint, node.Name)
+ }
+ }
+ Ω(Worker).NotTo(gomega.BeEmpty(), "Worker node not found")
+
+ ginkgo.By("Tainting some nodes..")
+ err = kClient.TaintNodes(nodesToTaint, taintKey, "value",
v1.TaintEffectNoSchedule)
+ Ω(err).NotTo(gomega.HaveOccurred())
+
+ nodesDAOInfo, err := restClient.GetNodes(constants.DefaultPartition)
+ Ω(err).NotTo(gomega.HaveOccurred())
+ Ω(nodesDAOInfo).NotTo(gomega.BeNil())
+
+ for _, node := range *nodesDAOInfo {
+ if node.NodeID == Worker {
+ WorkerMemRes = node.Available["memory"]
+ }
+ }
+ WorkerMemRes /= (1000 * 1000) // change to M
+ fmt.Fprintf(ginkgo.GinkgoWriter, "Worker node %s available memory
%dM\n", Worker, WorkerMemRes)
+
+ sleepPodMemLimit = int64(float64(WorkerMemRes) / 3)
+ Ω(sleepPodMemLimit).NotTo(gomega.BeZero(), "Sleep pod memory limit
cannot be zero")
+ fmt.Fprintf(ginkgo.GinkgoWriter, "Sleep pod limit memory %dM\n",
sleepPodMemLimit)
+})
+
+var _ = ginkgo.AfterSuite(func() {
+
+ ginkgo.By("Untainting some nodes")
+ err := kClient.UntaintNodes(nodesToTaint, taintKey)
+ Ω(err).NotTo(gomega.HaveOccurred(), "Could not remove taint from nodes
"+strings.Join(nodesToTaint, ","))
+
+ ginkgo.By("Check Yunikorn's health")
+ checks, err := yunikorn.GetFailedHealthChecks()
+ Ω(err).NotTo(gomega.HaveOccurred())
+ Ω(checks).To(gomega.Equal(""), checks)
+
+ testDescription := ginkgo.CurrentSpecReport()
+ if testDescription.Failed() {
+
tests.LogTestClusterInfoWrapper(testDescription.FailureMessage(),
[]string{ns.Name})
+ tests.LogYunikornContainer(testDescription.FailureMessage())
+ }
+ ginkgo.By("Tearing down namespace: " + ns.Name)
+ err = kClient.TearDownNamespace(ns.Name)
+ Ω(err).NotTo(gomega.HaveOccurred())
+})
+
+var _ = ginkgo.Describe("Preemption", func() {
+ ginkgo.It("Verify_basic_preemption", func() {
+ ginkgo.By("A queue uses resource more than the guaranteed value
even after removing one of the pods. The cluster doesn't have enough resource
to deploy a pod in another queue which uses resource less than the guaranteed
value.")
+ // update config
+ ginkgo.By(fmt.Sprintf("Update root.sandbox1 and root.sandbox2
with guaranteed memory %dM", sleepPodMemLimit))
+ annotation = "ann-" + common.RandSeq(10)
+ yunikorn.UpdateCustomConfigMapWrapper(oldConfigMap, "",
annotation, func(sc *configs.SchedulerConfig) error {
+ // remove placement rules so we can control queue
+ sc.Partitions[0].PlacementRules = nil
+
+ var err error
+ if err = common.AddQueue(sc, "default", "root",
configs.QueueConfig{
+ Name: "sandbox1",
+ Resources: configs.Resources{Guaranteed:
map[string]string{"memory": fmt.Sprintf("%dM", sleepPodMemLimit)}},
+ Properties:
map[string]string{"preemption.delay": "1s"},
+ }); err != nil {
+ return err
+ }
+
+ if err = common.AddQueue(sc, "default", "root",
configs.QueueConfig{
+ Name: "sandbox2",
+ Resources: configs.Resources{Guaranteed:
map[string]string{"memory": fmt.Sprintf("%dM", sleepPodMemLimit)}},
+ Properties:
map[string]string{"preemption.delay": "1s"},
+ }); err != nil {
+ return err
+ }
+ return nil
+ })
+
+ // Define sleepPod
+ sleepPodConfigs := createSandbox1SleepPodCofigs(3, 600)
+ sleepPod4Config := k8s.SleepPodConfig{Name: "sleepjob4", NS:
dev, Mem: sleepPodMemLimit, Time: 600, Optedout: true, Labels:
map[string]string{"queue": "root.sandbox2"}}
+ sleepPodConfigs = append(sleepPodConfigs, sleepPod4Config)
+
+ for _, config := range sleepPodConfigs {
+ ginkgo.By("Deploy the sleep pod " + config.Name + " to
the development namespace")
+ sleepObj, podErr := k8s.InitSleepPod(config)
+ Ω(podErr).NotTo(gomega.HaveOccurred())
+ sleepRespPod, podErr := kClient.CreatePod(sleepObj, dev)
+ gomega.Ω(podErr).NotTo(gomega.HaveOccurred())
+
+ // Wait for pod to move to running state
+ podErr = kClient.WaitForPodBySelectorRunning(dev,
+ fmt.Sprintf("app=%s",
sleepRespPod.ObjectMeta.Labels["app"]),
+ 60)
+ gomega.Ω(podErr).NotTo(gomega.HaveOccurred())
+ }
+
+ // assert one of the pods in root.sandbox1 is preempted
+ ginkgo.By("One of the pods in root.sanbox1 is preempted")
+ sandbox1RunningPodsCnt := 0
+ pods, err := kClient.ListPodsByLabelSelector(dev,
"queue=root.sandbox1")
+ gomega.Ω(err).NotTo(gomega.HaveOccurred())
+ for _, pod := range pods.Items {
+ if pod.DeletionTimestamp != nil {
+ continue
+ }
+ if pod.Status.Phase == v1.PodRunning {
+ sandbox1RunningPodsCnt++
+ }
+ }
+ Ω(sandbox1RunningPodsCnt).To(gomega.Equal(2), "One of the pods
in root.sandbox1 should be preempted")
+ })
+
+
ginkgo.It("Verify_no_preemption_on_resources_less_than_guaranteed_value",
func() {
+ ginkgo.By("A queue uses resource less than the guaranteed value
can't be preempted.")
+ // update config
+ ginkgo.By(fmt.Sprintf("Update root.sandbox1 and root.sandbox2
with guaranteed memory %dM", WorkerMemRes))
+ annotation = "ann-" + common.RandSeq(10)
+ yunikorn.UpdateCustomConfigMapWrapper(oldConfigMap, "",
annotation, func(sc *configs.SchedulerConfig) error {
+ // remove placement rules so we can control queue
+ sc.Partitions[0].PlacementRules = nil
+
+ var err error
+ if err = common.AddQueue(sc, "default", "root",
configs.QueueConfig{
+ Name: "sandbox1",
+ Resources: configs.Resources{Guaranteed:
map[string]string{"memory": fmt.Sprintf("%dM", WorkerMemRes)}},
+ Properties:
map[string]string{"preemption.delay": "1s"},
+ }); err != nil {
+ return err
+ }
+
+ if err = common.AddQueue(sc, "default", "root",
configs.QueueConfig{
+ Name: "sandbox2",
+ Resources: configs.Resources{Guaranteed:
map[string]string{"memory": fmt.Sprintf("%dM", WorkerMemRes)}},
+ Properties:
map[string]string{"preemption.delay": "1s"},
+ }); err != nil {
+ return err
+ }
+ return nil
+ })
+
+ // Define sleepPod
+ sandbox1SleepPodConfigs := createSandbox1SleepPodCofigs(3, 30)
+ sleepPod4Config := k8s.SleepPodConfig{Name: "sleepjob4", NS:
dev, Mem: sleepPodMemLimit, Time: 30, Optedout: true, Labels:
map[string]string{"queue": "root.sandbox2"}}
+
+ // Deploy pods in root.sandbox1
+ for _, config := range sandbox1SleepPodConfigs {
+ ginkgo.By("Deploy the sleep pod " + config.Name + " to
the development namespace")
+ sleepObj, podErr := k8s.InitSleepPod(config)
+ Ω(podErr).NotTo(gomega.HaveOccurred())
+ sleepRespPod, podErr := kClient.CreatePod(sleepObj, dev)
+ gomega.Ω(podErr).NotTo(gomega.HaveOccurred())
+
+ // Wait for pod to move to running state
+ podErr = kClient.WaitForPodBySelectorRunning(dev,
+ fmt.Sprintf("app=%s",
sleepRespPod.ObjectMeta.Labels["app"]),
+ 30)
+ gomega.Ω(podErr).NotTo(gomega.HaveOccurred())
+ }
+
+ // Deploy sleepjob4 pod in root.sandbox2
+ ginkgo.By("Deploy the sleep pod " + sleepPod4Config.Name + " to
the development namespace")
+ sleepObj, podErr := k8s.InitSleepPod(sleepPod4Config)
+ Ω(podErr).NotTo(gomega.HaveOccurred())
+ sleepRespPod4, err := kClient.CreatePod(sleepObj, dev)
+ gomega.Ω(err).NotTo(gomega.HaveOccurred())
+
+ // sleepjob4 pod can't be scheduled before pods in
root.sandbox1 are succeeded
+ ginkgo.By("The sleep pod " + sleepPod4Config.Name + " can't be
scheduled")
+ err = kClient.WaitForPodUnschedulable(sleepRespPod4,
60*time.Second)
+ gomega.Ω(err).NotTo(gomega.HaveOccurred())
+
+ // pods in root.sandbox1 can be succeeded
+ ginkgo.By("The pods in root.sandbox1 can be succeeded")
+ for _, config := range sandbox1SleepPodConfigs {
+ err = kClient.WaitForPodSucceeded(dev, config.Name,
30*time.Second)
+ gomega.Ω(err).NotTo(gomega.HaveOccurred())
+ }
+ })
+
+ ginkgo.It("Verify_no_preemption_outside_fence", func() {
+ ginkgo.By("The preemption can't go outside the fence.")
+ // update config
+ ginkgo.By(fmt.Sprintf("Update root.sandbox1 and root.sandbox2
with guaranteed memory %dM. The root.sandbox2 has fence preemption policy.",
sleepPodMemLimit))
+ annotation = "ann-" + common.RandSeq(10)
+ yunikorn.UpdateCustomConfigMapWrapper(oldConfigMap, "",
annotation, func(sc *configs.SchedulerConfig) error {
+ // remove placement rules so we can control queue
+ sc.Partitions[0].PlacementRules = nil
+
+ var err error
+ if err = common.AddQueue(sc, "default", "root",
configs.QueueConfig{
+ Name: "sandbox1",
+ Resources: configs.Resources{Guaranteed:
map[string]string{"memory": fmt.Sprintf("%dM", sleepPodMemLimit)}},
+ Properties:
map[string]string{"preemption.delay": "1s"},
+ }); err != nil {
+ return err
+ }
+
+ if err = common.AddQueue(sc, "default", "root",
configs.QueueConfig{
+ Name: "sandbox2",
+ Resources: configs.Resources{Guaranteed:
map[string]string{"memory": fmt.Sprintf("%dM", sleepPodMemLimit)}},
+ Properties:
map[string]string{"preemption.delay": "1s", "preemption.policy": "fence"},
+ }); err != nil {
+ return err
+ }
+ return nil
+ })
+
+ // Define sleepPod
+ sandbox1SleepPodConfigs := createSandbox1SleepPodCofigs(3, 30)
+ sleepPod4Config := k8s.SleepPodConfig{Name: "sleepjob4", NS:
dev, Mem: sleepPodMemLimit, Time: 30, Optedout: true, Labels:
map[string]string{"queue": "root.sandbox2"}}
+
+ // Deploy pods in root.sandbox1
+ for _, config := range sandbox1SleepPodConfigs {
+ ginkgo.By("Deploy the sleep pod " + config.Name + " to
the development namespace")
+ sleepObj, podErr := k8s.InitSleepPod(config)
+ Ω(podErr).NotTo(gomega.HaveOccurred())
+ sleepRespPod, podErr := kClient.CreatePod(sleepObj, dev)
+ gomega.Ω(podErr).NotTo(gomega.HaveOccurred())
+
+ // Wait for pod to move to running state
+ podErr = kClient.WaitForPodBySelectorRunning(dev,
+ fmt.Sprintf("app=%s",
sleepRespPod.ObjectMeta.Labels["app"]),
+ 30)
+ gomega.Ω(podErr).NotTo(gomega.HaveOccurred())
+ }
+
+ // Deploy sleepjob4 pod in root.sandbox2
+ ginkgo.By("Deploy the sleep pod " + sleepPod4Config.Name + " to
the development namespace")
+ sleepObj, podErr := k8s.InitSleepPod(sleepPod4Config)
+ Ω(podErr).NotTo(gomega.HaveOccurred())
+ sleepRespPod4, err := kClient.CreatePod(sleepObj, dev)
+ gomega.Ω(err).NotTo(gomega.HaveOccurred())
+
+ // sleepjob4 pod can't be scheduled before pods in
root.sandbox1 are succeeded
+ ginkgo.By("The sleep pod " + sleepPod4Config.Name + " can't be
scheduled")
+ err = kClient.WaitForPodUnschedulable(sleepRespPod4,
60*time.Second)
+ gomega.Ω(err).NotTo(gomega.HaveOccurred())
+
+ // pods in root.sandbox1 can be succeeded
+ ginkgo.By("The pods in root.sandbox1 can be succeeded")
+ for _, config := range sandbox1SleepPodConfigs {
+ err = kClient.WaitForPodSucceeded(dev, config.Name,
30*time.Second)
+ gomega.Ω(err).NotTo(gomega.HaveOccurred())
+ }
+ })
+
+ ginkgo.AfterEach(func() {
+
+ // Delete all sleep pods
+ ginkgo.By("Delete all sleep pods")
+ err := kClient.DeletePods(ns.Name)
+ if err != nil {
+ fmt.Fprintf(ginkgo.GinkgoWriter, "Failed to delete pods
in namespace %s - reason is %s\n", ns.Name, err.Error())
+ }
+
+ // reset config
+ ginkgo.By("Restoring YuniKorn configuration")
+ yunikorn.RestoreConfigMapWrapper(oldConfigMap, annotation)
+ })
+})
+
+func createSandbox1SleepPodCofigs(cnt, time int) []k8s.SleepPodConfig {
+ sandbox1Configs := make([]k8s.SleepPodConfig, 0, cnt)
+ for i := 0; i < cnt; i++ {
+ sandbox1Configs = append(sandbox1Configs,
k8s.SleepPodConfig{Name: fmt.Sprintf("sleepjob%d", i+1), NS: dev, Mem:
sleepPodMemLimit, Time: time, Optedout: true, Labels:
map[string]string{"queue": "root.sandbox1"}})
+ }
+ return sandbox1Configs
+}
diff --git a/test/e2e/recovery_and_restart/recovery_and_restart_test.go
b/test/e2e/recovery_and_restart/recovery_and_restart_test.go
index 6ce0a712..c9d0fcdd 100644
--- a/test/e2e/recovery_and_restart/recovery_and_restart_test.go
+++ b/test/e2e/recovery_and_restart/recovery_and_restart_test.go
@@ -313,19 +313,15 @@ var _ = ginkgo.Describe("", func() {
selectedNode, memoryGiB, placeholderCount)
ginkgo.By("Tainting all nodes except " + selectedNode)
- for _, nodeName := range nodesToTaint {
- err = kClient.TaintNode(nodeName, taintKey, "value",
v1.TaintEffectNoSchedule)
- Ω(err).NotTo(gomega.HaveOccurred())
- }
+ err = kClient.TaintNodes(nodesToTaint, taintKey, "value",
v1.TaintEffectNoSchedule)
+ Ω(err).NotTo(gomega.HaveOccurred())
removeTaint := true
defer func() {
if removeTaint {
ginkgo.By("Untainting nodes (defer)")
- for _, nodeName := range nodesToTaint {
- err = kClient.UntaintNode(nodeName,
taintKey)
- Ω(err).NotTo(gomega.HaveOccurred(),
"Could not remove taint from node "+nodeName)
- }
+ err = kClient.UntaintNodes(nodesToTaint,
taintKey)
+ Ω(err).NotTo(gomega.HaveOccurred(), "Could not
remove taint from nodes "+strings.Join(nodesToTaint, ","))
}
}()
@@ -353,10 +349,8 @@ var _ = ginkgo.Describe("", func() {
ginkgo.By("Untainting nodes")
removeTaint = false
- for _, nodeName := range nodesToTaint {
- err = kClient.UntaintNode(nodeName, taintKey)
- Ω(err).NotTo(gomega.HaveOccurred(), "Could not remove
taint from node "+nodeName)
- }
+ err = kClient.UntaintNodes(nodesToTaint, taintKey)
+ Ω(err).NotTo(gomega.HaveOccurred(), "Could not remove taint
from nodes "+strings.Join(nodesToTaint, ","))
ginkgo.By("Waiting for placeholder replacement & sleep pods to
finish")
err = kClient.WaitForJobPodsSucceeded(dev, job.Name, 1,
60*time.Second)
diff --git a/test/e2e/simple_preemptor/simple_preemptor_test.go
b/test/e2e/simple_preemptor/simple_preemptor_test.go
index a0ffd6e9..18da4178 100644
--- a/test/e2e/simple_preemptor/simple_preemptor_test.go
+++ b/test/e2e/simple_preemptor/simple_preemptor_test.go
@@ -97,11 +97,8 @@ var _ = ginkgo.BeforeSuite(func() {
}
ginkgo.By("Tainting some nodes..")
- for _, nodeName := range nodesToTaint {
- ginkgo.By("Tainting node " + nodeName)
- err = kClient.TaintNode(nodeName, taintKey, "value",
v1.TaintEffectNoSchedule)
- Ω(err).NotTo(gomega.HaveOccurred())
- }
+ err = kClient.TaintNodes(nodesToTaint, taintKey, "value",
v1.TaintEffectNoSchedule)
+ Ω(err).NotTo(gomega.HaveOccurred())
var pods *v1.PodList
totalPodQuantity1 := *resource.NewQuantity(0, resource.DecimalSI)
@@ -127,11 +124,8 @@ var _ = ginkgo.BeforeSuite(func() {
var _ = ginkgo.AfterSuite(func() {
ginkgo.By("Untainting some nodes")
- for _, nodeName := range nodesToTaint {
- ginkgo.By("Untainting node " + nodeName)
- err := kClient.UntaintNode(nodeName, taintKey)
- Ω(err).NotTo(gomega.HaveOccurred(), "Could not remove taint
from node "+nodeName)
- }
+ err := kClient.UntaintNodes(nodesToTaint, taintKey)
+ Ω(err).NotTo(gomega.HaveOccurred(), "Could not remove taint from nodes
"+strings.Join(nodesToTaint, ","))
ginkgo.By("Check Yunikorn's health")
checks, err := yunikorn.GetFailedHealthChecks()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]