This is an automated email from the ASF dual-hosted git repository.
mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push:
new 1efe5212 [YUNIKORN-1909] e2e test for taskgroups with limited
resources (#653)
1efe5212 is described below
commit 1efe52121c0ab7a56a27804848e45aad87578a68
Author: Frank Yang <[email protected]>
AuthorDate: Thu Sep 7 19:47:00 2023 +0530
[YUNIKORN-1909] e2e test for taskgroups with limited resources (#653)
Closes: #653
Signed-off-by: Manikandan R <[email protected]>
---
.github/workflows/pre-commit.yml | 5 +
test/e2e/framework/helpers/k8s/k8s_utils.go | 3 +
test/e2e/gang_scheduling/gang_scheduling_test.go | 1062 +++++++-------------
.../priority_scheduling_test.go | 28 +-
4 files changed, 382 insertions(+), 716 deletions(-)
diff --git a/.github/workflows/pre-commit.yml b/.github/workflows/pre-commit.yml
index af2c7d1e..f08c6f73 100644
--- a/.github/workflows/pre-commit.yml
+++ b/.github/workflows/pre-commit.yml
@@ -49,6 +49,11 @@ jobs:
uses: actions/setup-go@v3
with:
go-version-file: .go_version
+ - name: Set hugpage
+ run: |
+ echo "vm.nr_hugepages = 1024" | sudo tee -a /etc/sysctl.conf
+ sudo sysctl -p
+ sudo sysctl -a | grep vm.nr_hugepages
- run: ./scripts/run-e2e-tests.sh -a "test" -n "yk8s" -v
"kindest/node:${KIND_NODE_IMAGE}" ${KIND_EXTRA_ARGS}
env:
KIND_NODE_IMAGE: ${{ matrix.k8s }}
diff --git a/test/e2e/framework/helpers/k8s/k8s_utils.go
b/test/e2e/framework/helpers/k8s/k8s_utils.go
index 39af7274..6070d477 100644
--- a/test/e2e/framework/helpers/k8s/k8s_utils.go
+++ b/test/e2e/framework/helpers/k8s/k8s_utils.go
@@ -629,6 +629,9 @@ func (k *KubeCtl) isPodInDesiredState(podName string,
namespace string, state v1
return func() (bool, error) {
pod, err :=
k.clientSet.CoreV1().Pods(namespace).Get(context.TODO(), podName,
metav1.GetOptions{})
if err != nil {
+ if k8serrors.IsNotFound(err) {
+ return false, nil
+ }
return false, err
}
switch pod.Status.Phase {
diff --git a/test/e2e/gang_scheduling/gang_scheduling_test.go
b/test/e2e/gang_scheduling/gang_scheduling_test.go
index d3aada44..032b6d4c 100644
--- a/test/e2e/gang_scheduling/gang_scheduling_test.go
+++ b/test/e2e/gang_scheduling/gang_scheduling_test.go
@@ -20,43 +20,61 @@ package gangscheduling_test
import (
"fmt"
+ "strings"
"time"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
-
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
+ batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "github.com/apache/yunikorn-core/pkg/webservice/dao"
"github.com/apache/yunikorn-k8shim/pkg/appmgmt/interfaces"
"github.com/apache/yunikorn-k8shim/pkg/common/constants"
tests "github.com/apache/yunikorn-k8shim/test/e2e"
+ "github.com/apache/yunikorn-k8shim/test/e2e/framework/configmanager"
"github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common"
"github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/k8s"
"github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/yunikorn"
siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
)
-var _ = Describe("", func() {
- var kClient k8s.KubeCtl //nolint
- var restClient yunikorn.RClient
- var ns string
- groupA := "groupa"
- groupB := "groupb"
- fifoQName := "fifoq"
- defaultPartition := "default"
- var nsQueue string
+const (
+ groupA = "groupa"
+ groupB = "groupb"
+ groupC = "groupc"
+)
+
+var (
+ restClient yunikorn.RClient
+ ns string
+ nsQueue string
+ appID string
+ minResource map[string]resource.Quantity
+ jobNames []string
+ unsatisfiableNodeSelector = map[string]string{"kubernetes.io/hostname":
"unsatisfiable_node"}
+)
+var _ = Describe("", func() {
BeforeEach(func() {
kClient = k8s.KubeCtl{}
Ω(kClient.SetClient()).To(BeNil())
+
ns = "ns-" + common.RandSeq(10)
nsQueue = "root." + ns
By(fmt.Sprintf("Creating namespace: %s for sleep jobs", ns))
- var ns1, err1 = kClient.CreateNamespace(ns, nil)
- Ω(err1).NotTo(HaveOccurred())
- Ω(ns1.Status.Phase).To(Equal(v1.NamespaceActive))
+ namespace, err := kClient.CreateNamespace(ns, nil)
+ Ω(err).NotTo(HaveOccurred())
+ Ω(namespace.Status.Phase).To(Equal(v1.NamespaceActive))
+
+ minResource = map[string]resource.Quantity{
+ v1.ResourceCPU.String(): resource.MustParse("10m"),
+ v1.ResourceMemory.String(): resource.MustParse("10M"),
+ }
+ jobNames = []string{}
+ appID = "appid-" + common.RandSeq(5)
})
// Test to verify annotation with task group definition
@@ -67,106 +85,42 @@ var _ = Describe("", func() {
// 5. Check placeholders deleted
// 6. Real pods running and app running
It("Verify_Annotation_TaskGroup_Def", func() {
- taskGroupName := "group-" + common.RandSeq(3)
-
// Define gang member template with 5 members, 1 real pod (not
part of tg)
- podResources := map[string]resource.Quantity{
- "cpu": resource.MustParse("10m"),
- "memory": resource.MustParse("10M"),
- }
annotations := k8s.PodAnnotation{
TaskGroups: []interfaces.TaskGroup{
- {Name: taskGroupName, MinMember: int32(5),
MinResource: podResources},
+ {Name: groupA, MinMember: int32(5),
MinResource: minResource},
},
}
- podConf := k8s.TestPodConfig{
- Labels: map[string]string{
- "app": "sleep-" + common.RandSeq(5),
- "applicationId": "appid-" + common.RandSeq(5),
- },
- Annotations: &annotations,
- Resources: &v1.ResourceRequirements{
- Requests: v1.ResourceList{
- "cpu": podResources["cpu"],
- "memory": podResources["memory"],
- },
- },
- }
- jobConf := k8s.JobConfig{
- Name: "gangjob-" + common.RandSeq(5),
- Namespace: ns,
- Parallelism: int32(1),
- PodConfig: podConf,
- }
- job, jobErr := k8s.InitJobConfig(jobConf)
- Ω(jobErr).NotTo(HaveOccurred())
-
- // Deploy job
- taskGroupsMap, annErr :=
k8s.PodAnnotationToMap(podConf.Annotations)
- Ω(annErr).NotTo(HaveOccurred())
- By(fmt.Sprintf("Deploy job %s with task-groups: %+v",
jobConf.Name, taskGroupsMap[k8s.TaskGroups]))
- _, jobErr = kClient.CreateJob(job, ns)
- Ω(jobErr).NotTo(HaveOccurred())
- createErr := kClient.WaitForJobPodsCreated(ns, job.Name,
int(*job.Spec.Parallelism), 30*time.Second)
- Ω(createErr).NotTo(HaveOccurred())
-
- // After all placeholders reserved + separate pod, appStatus =
Running
- By("Verify appStatus = Running")
- timeoutErr :=
restClient.WaitForAppStateTransition(defaultPartition, nsQueue,
podConf.Labels["applicationId"],
- yunikorn.States().Application.Running,
- 120)
- Ω(timeoutErr).NotTo(HaveOccurred())
+ createJob(appID, minResource, annotations, 1)
+
+ checkAppStatus(appID, yunikorn.States().Application.Running)
// Ensure placeholders are created
- appDaoInfo, appDaoInfoErr :=
restClient.GetAppInfo(defaultPartition, nsQueue,
podConf.Labels["applicationId"])
+ appDaoInfo, appDaoInfoErr :=
restClient.GetAppInfo(configmanager.DefaultPartition, nsQueue, appID)
Ω(appDaoInfoErr).NotTo(HaveOccurred())
- Ω(len(appDaoInfo.PlaceholderData)).To(Equal(1), "Placeholder
count is not correct")
- Ω(int(appDaoInfo.PlaceholderData[0].Count)).To(Equal(int(5)),
"Placeholder count is not correct")
-
Ω(int(appDaoInfo.PlaceholderData[0].Replaced)).To(Equal(int(0)), "Placeholder
replacement count is not correct")
+ checkPlaceholderData(appDaoInfo, groupA, 5, 0, 0)
// Deploy job, now with 5 pods part of taskGroup
By("Deploy second job with 5 real taskGroup pods")
- podConf.Annotations.TaskGroupName = taskGroupName
- realJobConf := k8s.JobConfig{
- Name: "gangjob2-" + common.RandSeq(5),
- Namespace: ns,
- Parallelism: int32(5),
- PodConfig: podConf,
- }
- job1, job1Err := k8s.InitJobConfig(realJobConf)
- Ω(job1Err).NotTo(HaveOccurred())
- _, job1Err = kClient.CreateJob(job1, ns)
- Ω(job1Err).NotTo(HaveOccurred())
- createErr = kClient.WaitForJobPodsCreated(ns, job1.Name,
int(*job.Spec.Parallelism), 30*time.Second)
- Ω(createErr).NotTo(HaveOccurred())
+ annotations.TaskGroupName = groupA
+ realJob := createJob(appID, minResource, annotations, 5)
// Check all placeholders deleted.
By("Wait for all placeholders terminated")
- phTermErr := kClient.WaitForPlaceholders(ns,
"tg-"+podConf.Labels["applicationId"]+"-"+taskGroupName+"-", 0, 3*time.Minute,
nil)
+ phTermErr := kClient.WaitForPlaceholders(ns,
"tg-"+appID+"-"+groupA+"-", 0, 3*time.Minute, nil)
Ω(phTermErr).NotTo(gomega.HaveOccurred())
// Check real gang members now running
By("Wait for all gang members running")
- jobRunErr := kClient.WaitForJobPods(ns, realJobConf.Name,
int(realJobConf.Parallelism), 3*time.Minute)
+ jobRunErr := kClient.WaitForJobPods(ns, realJob.Name,
int(*realJob.Spec.Parallelism), 3*time.Minute)
Ω(jobRunErr).NotTo(HaveOccurred())
- By("Verify appStatus = Running")
- timeoutErr =
restClient.WaitForAppStateTransition(defaultPartition, nsQueue,
podConf.Labels["applicationId"],
- yunikorn.States().Application.Running,
- 120)
- Ω(timeoutErr).NotTo(HaveOccurred())
+ checkAppStatus(appID, yunikorn.States().Application.Running)
// Ensure placeholders are replaced
- appDaoInfo, appDaoInfoErr =
restClient.GetAppInfo(defaultPartition, nsQueue,
podConf.Labels["applicationId"])
+ appDaoInfo, appDaoInfoErr =
restClient.GetAppInfo(configmanager.DefaultPartition, nsQueue, appID)
Ω(appDaoInfoErr).NotTo(HaveOccurred())
- Ω(len(appDaoInfo.PlaceholderData)).To(Equal(1), "Placeholder
count is not correct")
- Ω(int(appDaoInfo.PlaceholderData[0].Count)).To(Equal(int(5)),
"Placeholder count is not correct")
-
Ω(int(appDaoInfo.PlaceholderData[0].Replaced)).To(Equal(int(5)), "Placeholder
replacement count is not correct")
-
- err := kClient.DeleteJob(job.Name, ns)
- Ω(err).NotTo(gomega.HaveOccurred())
- err = kClient.DeleteJob(job1.Name, ns)
- Ω(err).NotTo(gomega.HaveOccurred())
+ checkPlaceholderData(appDaoInfo, groupA, 5, 5, 0)
})
// Test to verify multiple task group nodes
@@ -176,56 +130,21 @@ var _ = Describe("", func() {
// 4. Deploy 1 job with real pods
// 5. Nodes distributions of real pods and placeholders should be the
same.
It("Verify_Multiple_TaskGroups_Nodes", func() {
- podResources := map[string]resource.Quantity{
- "cpu": resource.MustParse("10m"),
- "memory": resource.MustParse("10M"),
- }
- podConf := k8s.TestPodConfig{
- Labels: map[string]string{
- "app": "sleep-" + common.RandSeq(5),
- "applicationId": "appid-" + common.RandSeq(5),
- },
- Annotations: &k8s.PodAnnotation{
- TaskGroups: []interfaces.TaskGroup{
- {Name: "group-" + common.RandSeq(5),
MinMember: int32(3), MinResource: podResources},
- {Name: "group-" + common.RandSeq(5),
MinMember: int32(5), MinResource: podResources},
- {Name: "group-" + common.RandSeq(5),
MinMember: int32(7), MinResource: podResources},
- },
- },
- Resources: &v1.ResourceRequirements{
- Requests: v1.ResourceList{
- "cpu": podResources["cpu"],
- "memory": podResources["memory"],
- },
+ annotations := k8s.PodAnnotation{
+ TaskGroups: []interfaces.TaskGroup{
+ {Name: groupA, MinMember: int32(3),
MinResource: minResource},
+ {Name: groupB, MinMember: int32(5),
MinResource: minResource},
+ {Name: groupC, MinMember: int32(7),
MinResource: minResource},
},
}
- jobConf := k8s.JobConfig{
- Name: "gangjob-" + common.RandSeq(5),
- Namespace: ns,
- Parallelism: int32(1),
- PodConfig: podConf,
- }
- job, jobErr := k8s.InitJobConfig(jobConf)
- Ω(jobErr).NotTo(HaveOccurred())
-
- // Deploy job
- taskGroupsMap, annErr :=
k8s.PodAnnotationToMap(podConf.Annotations)
- Ω(annErr).NotTo(HaveOccurred())
- By(fmt.Sprintf("Deploy job %s with task-groups: %+v",
jobConf.Name, taskGroupsMap[k8s.TaskGroups]))
- _, jobErr = kClient.CreateJob(job, ns)
- Ω(jobErr).NotTo(HaveOccurred())
- createErr := kClient.WaitForJobPodsCreated(ns, job.Name,
int(*job.Spec.Parallelism), 30*time.Second)
- Ω(createErr).NotTo(HaveOccurred())
-
- By("Verify appStatus = Running")
- timeoutErr :=
restClient.WaitForAppStateTransition(defaultPartition, nsQueue,
podConf.Labels["applicationId"],
- yunikorn.States().Application.Running,
- 30)
- Ω(timeoutErr).NotTo(HaveOccurred())
+ createJob(appID, minResource, annotations, 1)
+
+ checkAppStatus(appID, yunikorn.States().Application.Running)
// Wait for placeholders to become running
stateRunning := v1.PodRunning
- phErr := kClient.WaitForPlaceholders(ns,
"tg-"+podConf.Labels["applicationId"]+"-", 15, 2*time.Minute, &stateRunning)
+ By("Wait for all placeholders running")
+ phErr := kClient.WaitForPlaceholders(ns, "tg-"+appID+"-", 15,
2*time.Minute, &stateRunning)
Ω(phErr).NotTo(HaveOccurred())
// Check placeholder node distribution is same as real pods'
@@ -247,33 +166,22 @@ var _ = Describe("", func() {
}
// Deploy real pods for each taskGroup
- for i := 0; i < len(podConf.Annotations.TaskGroups); i++ {
- curTG := podConf.Annotations.TaskGroups[i]
- podConf.Annotations.TaskGroupName = curTG.Name
- realJobConf := k8s.JobConfig{
- Name: "job-" + curTG.Name,
- Namespace: ns,
- Parallelism: curTG.MinMember,
- PodConfig: podConf,
- }
- By(fmt.Sprintf("Deploy pods for taskGroup: %s",
curTG.Name))
- job, jobErr = k8s.InitJobConfig(realJobConf)
- Ω(jobErr).NotTo(HaveOccurred())
- _, jobErr = kClient.CreateJob(job, ns)
- Ω(jobErr).NotTo(HaveOccurred())
- createErr := kClient.WaitForJobPodsCreated(ns,
job.Name, int(*job.Spec.Parallelism), 30*time.Second)
- Ω(createErr).NotTo(HaveOccurred())
+ realJobNames := []string{}
+ for _, tg := range annotations.TaskGroups {
+ annotations.TaskGroupName = tg.Name
+ realJob := createJob(appID, minResource, annotations,
tg.MinMember)
+ realJobNames = append(realJobNames, realJob.Name)
}
By("Wait for all placeholders terminated")
- phTermErr := kClient.WaitForPlaceholders(ns,
"tg-"+podConf.Labels["applicationId"]+"-", 0, 3*time.Minute, nil)
+ phTermErr := kClient.WaitForPlaceholders(ns, "tg-"+appID+"-",
0, 3*time.Minute, nil)
Ω(phTermErr).NotTo(HaveOccurred())
// Check real gang members now running on same node distribution
By("Verify task group node distribution")
realPodNodes := map[string]map[string]int{}
- for _, tg := range podConf.Annotations.TaskGroups {
- jobPods, lstErr := kClient.ListPods(ns,
fmt.Sprintf("job-name=%s%s", "job-", tg.Name))
+ for i, tg := range annotations.TaskGroups {
+ jobPods, lstErr := kClient.ListPods(ns,
fmt.Sprintf("job-name=%s", realJobNames[i]))
Ω(lstErr).NotTo(HaveOccurred())
Ω(len(jobPods.Items)).Should(BeNumerically("==",
tg.MinMember))
realPodNodes[tg.Name] = map[string]int{}
@@ -287,81 +195,32 @@ var _ = Describe("", func() {
}
Ω(realPodNodes).Should(Equal(taskGroupNodes))
- By("Verify appStatus = Running")
- timeoutErr =
restClient.WaitForAppStateTransition(defaultPartition, nsQueue,
podConf.Labels["applicationId"],
- yunikorn.States().Application.Running,
- 10)
- Ω(timeoutErr).NotTo(HaveOccurred())
-
- err := kClient.DeleteJob(job.Name, ns)
- Ω(err).NotTo(gomega.HaveOccurred())
+ checkAppStatus(appID, yunikorn.States().Application.Running)
})
// Test to verify task group with more than min members
// 1. Deploy 1 job with more taskGroup pods than minMembers
// 2. Verify all pods running
It("Verify_TG_with_More_Than_minMembers", func() {
- podResources := map[string]resource.Quantity{
- "cpu": resource.MustParse("10m"),
- "memory": resource.MustParse("10M"),
- }
- tgName := "group-" + common.RandSeq(5)
annotations := k8s.PodAnnotation{
- TaskGroupName: tgName,
+ TaskGroupName: groupA,
TaskGroups: []interfaces.TaskGroup{
- {Name: tgName, MinMember: int32(3),
MinResource: podResources},
+ {Name: groupA, MinMember: int32(3),
MinResource: minResource},
},
}
- podConf := k8s.TestPodConfig{
- Labels: map[string]string{
- "app": "sleep-" + common.RandSeq(5),
- "applicationId": "appid-" + common.RandSeq(5),
- },
- Annotations: &annotations,
- Resources: &v1.ResourceRequirements{
- Requests: v1.ResourceList{
- "cpu": podResources["cpu"],
- "memory": podResources["memory"],
- },
- },
- }
- jobConf := k8s.JobConfig{
- Name: "gangjob-" + common.RandSeq(5),
- Namespace: ns,
- Parallelism: 2 * annotations.TaskGroups[0].MinMember,
- PodConfig: podConf,
- }
-
- job, jobErr := k8s.InitJobConfig(jobConf)
- Ω(jobErr).NotTo(HaveOccurred())
- taskGroupsMap, annErr :=
k8s.PodAnnotationToMap(podConf.Annotations)
- Ω(annErr).NotTo(HaveOccurred())
- By(fmt.Sprintf("Deploy job %s with task-groups: %+v",
jobConf.Name, taskGroupsMap[k8s.TaskGroups]))
- _, jobErr = kClient.CreateJob(job, ns)
- Ω(jobErr).NotTo(HaveOccurred())
- createErr := kClient.WaitForJobPodsCreated(ns, job.Name,
int(*job.Spec.Parallelism), 30*time.Second)
- Ω(createErr).NotTo(HaveOccurred())
+ createJob(appID, minResource, annotations, 6)
- By("Verify all job pods are running")
- jobRunErr := kClient.WaitForJobPods(ns, jobConf.Name,
int(jobConf.Parallelism), 2*time.Minute)
- Ω(jobRunErr).NotTo(HaveOccurred())
+ checkAppStatus(appID, yunikorn.States().Application.Running)
- By("Verify appStatus = Running")
- timeoutErr :=
restClient.WaitForAppStateTransition(defaultPartition, nsQueue,
podConf.Labels["applicationId"],
- yunikorn.States().Application.Running,
- 120)
- Ω(timeoutErr).NotTo(HaveOccurred())
+ By("Wait for all placeholders terminated")
+ phTermErr := kClient.WaitForPlaceholders(ns, "tg-"+appID+"-",
0, 3*time.Minute, nil)
+ Ω(phTermErr).NotTo(HaveOccurred())
// Ensure placeholders are replaced and allocations count is
correct
- appDaoInfo, appDaoInfoErr :=
restClient.GetAppInfo(defaultPartition, nsQueue,
podConf.Labels["applicationId"])
+ appDaoInfo, appDaoInfoErr :=
restClient.GetAppInfo(configmanager.DefaultPartition, nsQueue, appID)
Ω(appDaoInfoErr).NotTo(HaveOccurred())
- Ω(len(appDaoInfo.PlaceholderData)).To(Equal(1), "Placeholder
count is not correct")
- Ω(int(appDaoInfo.PlaceholderData[0].Count)).To(Equal(int(3)),
"Placeholder count is not correct")
-
Ω(int(appDaoInfo.PlaceholderData[0].Replaced)).To(Equal(int(3)), "Placeholder
replacement count is not correct")
+ checkPlaceholderData(appDaoInfo, groupA, 3, 3, 0)
Ω(len(appDaoInfo.Allocations)).To(Equal(int(6)), "Allocations
count is not correct")
-
- err := kClient.DeleteJob(job.Name, ns)
- Ω(err).NotTo(gomega.HaveOccurred())
})
// Test to verify soft GS style behaviour
@@ -370,307 +229,148 @@ var _ = Describe("", func() {
// b) gangb - 1 placeholder, unsatisfiable nodeSelector
// 2. After placeholder timeout, real pods should be scheduled.
It("Verify_Default_GS_Style", func() {
- appID := "appid-" + common.RandSeq(5)
- podResources := map[string]resource.Quantity{
- "cpu": resource.MustParse("10m"),
- "memory": resource.MustParse("10M"),
- }
-
pdTimeout := 20
- placeholderTimeoutStr := fmt.Sprintf("%s=%d",
"placeholderTimeoutInSeconds", pdTimeout)
annotations := k8s.PodAnnotation{
- SchedulingPolicyParams: placeholderTimeoutStr,
+ SchedulingPolicyParams: fmt.Sprintf("%s=%d",
constants.SchedulingPolicyTimeoutParam, pdTimeout),
TaskGroups: []interfaces.TaskGroup{
- {
- Name: groupA,
- MinMember: int32(3),
- MinResource: podResources,
- },
- {
- Name: groupB,
- MinMember: int32(1),
- MinResource: podResources,
- NodeSelector:
map[string]string{"kubernetes.io/hostname": "unsatisfiable_node"},
- },
- },
- }
-
- podConf := k8s.TestPodConfig{
- Labels: map[string]string{
- "app": "sleep-" + common.RandSeq(5),
- "applicationId": appID,
+ {Name: groupA, MinMember: int32(3),
MinResource: minResource},
+ {Name: groupB, MinMember: int32(1),
MinResource: minResource, NodeSelector: unsatisfiableNodeSelector},
},
- Annotations: &annotations,
- Resources: &v1.ResourceRequirements{
- Requests: v1.ResourceList{
- "cpu": podResources["cpu"],
- "memory": podResources["memory"],
- },
- },
- }
- jobConf := k8s.JobConfig{
- Name: "gangjob-" + common.RandSeq(5),
- Namespace: ns,
- Parallelism: 3,
- PodConfig: podConf,
}
-
- // Create gang job
- job, jobErr := k8s.InitJobConfig(jobConf)
- Ω(jobErr).NotTo(HaveOccurred())
- taskGroupsMap, annErr :=
k8s.PodAnnotationToMap(podConf.Annotations)
- Ω(annErr).NotTo(HaveOccurred())
- By(fmt.Sprintf("[%s] Deploy job %s with task-groups: %+v",
- appID, jobConf.Name, taskGroupsMap[k8s.TaskGroups]))
- _, jobErr = kClient.CreateJob(job, ns)
- Ω(jobErr).NotTo(HaveOccurred())
- createErr := kClient.WaitForJobPodsCreated(ns, job.Name,
int(*job.Spec.Parallelism), 30*time.Second)
- Ω(createErr).NotTo(HaveOccurred())
+ job := createJob(appID, minResource, annotations, 3)
// Wait for placeholder timeout
time.Sleep(time.Duration(pdTimeout) * time.Second)
By(fmt.Sprintf("[%s] Verify pods are scheduled", appID))
- jobRunErr := kClient.WaitForJobPods(ns, jobConf.Name,
int(jobConf.Parallelism), 2*time.Minute)
+ jobRunErr := kClient.WaitForJobPods(ns, job.Name,
int(*job.Spec.Parallelism), 2*time.Minute)
Ω(jobRunErr).NotTo(HaveOccurred())
- By("Verify appStatus = Running")
- timeoutErr :=
restClient.WaitForAppStateTransition(defaultPartition, nsQueue,
podConf.Labels["applicationId"],
- yunikorn.States().Application.Running,
- 10)
- Ω(timeoutErr).NotTo(HaveOccurred())
+ checkAppStatus(appID, yunikorn.States().Application.Running)
// Ensure placeholders are timed out and allocations count is
correct as app started running normal because of 'soft' gang style
- appDaoInfo, appDaoInfoErr :=
restClient.GetAppInfo(defaultPartition, nsQueue,
podConf.Labels["applicationId"])
+ appDaoInfo, appDaoInfoErr :=
restClient.GetAppInfo(configmanager.DefaultPartition, nsQueue, appID)
Ω(appDaoInfoErr).NotTo(HaveOccurred())
Ω(len(appDaoInfo.PlaceholderData)).To(Equal(2), "Placeholder
count is not correct")
- if appDaoInfo.PlaceholderData[0].TaskGroupName == groupA {
-
Ω(int(appDaoInfo.PlaceholderData[0].Count)).To(Equal(int(3)), "Placeholder
count is not correct")
-
Ω(int(appDaoInfo.PlaceholderData[0].TimedOut)).To(Equal(int(3)), "Placeholder
timed out is not correct")
-
Ω(int(appDaoInfo.PlaceholderData[0].Replaced)).To(Equal(int(0)), "Placeholder
replacement count is not correct")
- } else if appDaoInfo.PlaceholderData[1].TaskGroupName == groupB
{
-
Ω(int(appDaoInfo.PlaceholderData[1].Count)).To(Equal(int(0)), "Placeholder
count is not correct")
- }
+ checkPlaceholderData(appDaoInfo, groupA, 3, 0, 3)
+ checkPlaceholderData(appDaoInfo, groupB, 1, 0, 1)
Ω(len(appDaoInfo.Allocations)).To(Equal(int(3)), "Allocations
count is not correct")
- Ω(appDaoInfo.Allocations[0].Placeholder).To(Equal(false),
"Allocation should be non placeholder")
- Ω(appDaoInfo.Allocations[0].PlaceholderUsed).To(Equal(false),
"Allocation should not be replacement of ph")
- Ω(appDaoInfo.Allocations[1].Placeholder).To(Equal(false),
"Allocation should be non placeholder")
- Ω(appDaoInfo.Allocations[1].PlaceholderUsed).To(Equal(false),
"Allocation should not be replacement of ph")
- Ω(appDaoInfo.Allocations[2].Placeholder).To(Equal(false),
"Allocation should be non placeholder")
- Ω(appDaoInfo.Allocations[2].PlaceholderUsed).To(Equal(false),
"Allocation should not be replacement of ph")
-
- err := kClient.DeleteJob(job.Name, ns)
- Ω(err).NotTo(gomega.HaveOccurred())
+ for _, alloc := range appDaoInfo.Allocations {
+ Ω(alloc.Placeholder).To(Equal(false), "Allocation
should be non placeholder")
+ Ω(alloc.PlaceholderUsed).To(Equal(false), "Allocation
should not be replacement of ph")
+ }
})
- // Test to verify soft GS style behaviour
+ // Test to verify Hard GS style behaviour
// 1. Deploy 1 job with 2 task group's:
- // a. 1 tg with un runnable placeholders
- // b. 1 tg with runnable placeholders
+ // a) 1 tg with un runnable placeholders
+ // b) 1 tg with runnable placeholders
// 2. Verify appState = Accepted
// 3. Once ph's are timed out, app should move to failed state
It("Verify_Hard_GS_Failed_State", func() {
pdTimeout := 20
gsStyle := "Hard"
- placeholderTimeoutStr := fmt.Sprintf("%s=%d",
"placeholderTimeoutInSeconds", pdTimeout)
- gsStyleStr := fmt.Sprintf("%s=%s", "gangSchedulingStyle",
gsStyle)
- groupA = groupA + "-" + common.RandSeq(5)
- groupB = groupB + "-" + common.RandSeq(5)
-
- podResources := map[string]resource.Quantity{
- "cpu": resource.MustParse("10m"),
- "memory": resource.MustParse("10M"),
- }
- podConf := k8s.TestPodConfig{
- Labels: map[string]string{
- "app": "sleep-" + common.RandSeq(5),
- "applicationId": "appid-" + common.RandSeq(5),
- },
- Annotations: &k8s.PodAnnotation{
- TaskGroups: []interfaces.TaskGroup{
- {
- Name: groupA,
- MinMember: int32(3),
- MinResource: podResources,
- NodeSelector:
map[string]string{"kubernetes.io/hostname": "unsatisfiable_node"},
- },
- {
- Name: groupB,
- MinMember: int32(3),
- MinResource: podResources,
- },
- },
- SchedulingPolicyParams: fmt.Sprintf("%s %s",
placeholderTimeoutStr, gsStyleStr),
- },
- Resources: &v1.ResourceRequirements{
- Requests: v1.ResourceList{
- "cpu": podResources["cpu"],
- "memory": podResources["memory"],
- },
+ placeholderTimeoutStr := fmt.Sprintf("%s=%d",
constants.SchedulingPolicyTimeoutParam, pdTimeout)
+ gsStyleStr := fmt.Sprintf("%s=%s",
constants.SchedulingPolicyStyleParam, gsStyle)
+
+ annotations := k8s.PodAnnotation{
+ TaskGroups: []interfaces.TaskGroup{
+ {Name: groupA, MinMember: int32(3),
MinResource: minResource, NodeSelector: unsatisfiableNodeSelector},
+ {Name: groupB, MinMember: int32(3),
MinResource: minResource},
},
+ SchedulingPolicyParams: fmt.Sprintf("%s %s",
placeholderTimeoutStr, gsStyleStr),
}
- jobConf := k8s.JobConfig{
- Name: "gangjob-" + common.RandSeq(5),
- Namespace: ns,
- Parallelism: int32(1),
- PodConfig: podConf,
- }
- job, jobErr := k8s.InitJobConfig(jobConf)
- Ω(jobErr).NotTo(HaveOccurred())
- taskGroupsMap, annErr :=
k8s.PodAnnotationToMap(podConf.Annotations)
- Ω(annErr).NotTo(HaveOccurred())
- By(fmt.Sprintf("Deploy job %s with task-groups: %+v",
jobConf.Name, taskGroupsMap[k8s.TaskGroups]))
- _, jobErr = kClient.CreateJob(job, ns)
- Ω(jobErr).NotTo(HaveOccurred())
- createErr := kClient.WaitForJobPodsCreated(ns, job.Name,
int(*job.Spec.Parallelism), 30*time.Second)
- Ω(createErr).NotTo(HaveOccurred())
+ createJob(appID, minResource, annotations, 1)
// Wait for placeholder timeout
time.Sleep(time.Duration(pdTimeout) * time.Second)
- By("Verify appStatus = Failing")
- timeoutErr :=
restClient.WaitForAppStateTransition(defaultPartition, nsQueue,
podConf.Labels["applicationId"],
- yunikorn.States().Application.Failing,
- 30)
- Ω(timeoutErr).NotTo(HaveOccurred())
+ checkAppStatus(appID, yunikorn.States().Application.Failing)
// Ensure placeholders are timed out and allocations count is
correct as app started running normal because of 'soft' gang style
- appDaoInfo, appDaoInfoErr :=
restClient.GetAppInfo(defaultPartition, nsQueue,
podConf.Labels["applicationId"])
+ appDaoInfo, appDaoInfoErr :=
restClient.GetAppInfo(configmanager.DefaultPartition, nsQueue, appID)
Ω(appDaoInfoErr).NotTo(HaveOccurred())
Ω(len(appDaoInfo.PlaceholderData)).To(Equal(2), "Placeholder
count is not correct")
- if appDaoInfo.PlaceholderData[0].TaskGroupName == groupB {
-
Ω(int(appDaoInfo.PlaceholderData[0].Count)).To(Equal(int(3)), "Placeholder
count is not correct")
-
Ω(int(appDaoInfo.PlaceholderData[0].TimedOut)).To(Equal(int(3)), "Placeholder
timed out is not correct")
-
Ω(int(appDaoInfo.PlaceholderData[0].Replaced)).To(Equal(int(0)), "Placeholder
replacement count is not correct")
- }
- err := kClient.DeleteJob(job.Name, ns)
- Ω(err).NotTo(gomega.HaveOccurred())
+ checkPlaceholderData(appDaoInfo, groupA, 3, 0, 3)
+ checkPlaceholderData(appDaoInfo, groupB, 3, 0, 3)
})
// Test to verify Gang Apps FIFO order
- // Create FIFO queue with quota of 300m and 300M
+ // Update namespace with quota of 300m and 300M
// 1. Deploy appA with 1 pod
// 2. Deploy appB with gang of 3 pods
// 3. Deploy appC with 1 pod
// 4. Delete appA
// 5. appA = Completing, appB = Running, appC = Accepted
It("Verify_GangApp_FIFO_Order", func() {
- By(fmt.Sprintf("Creating namespace: %s", fifoQName))
- fifoQ, nsErr := kClient.CreateNamespace(fifoQName,
map[string]string{
+ By(fmt.Sprintf("Update namespace %s with quota cpu 300m and
memory 300M", ns))
+ namespace, nsErr := kClient.UpdateNamespace(ns,
map[string]string{
constants.NamespaceQuota: "{\"cpu\": \"300m\",
\"memory\": \"300M\"}"})
Ω(nsErr).NotTo(HaveOccurred())
- Ω(fifoQ.Status.Phase).To(Equal(v1.NamespaceActive))
- defer func() {
Ω(kClient.DeleteNamespace(fifoQName)).NotTo(HaveOccurred()) }()
-
- // Create appIDs
- var apps []string
- for j := 0; j < 3; j++ {
- id := fmt.Sprintf("app%d-%s", j, common.RandSeq(5))
- apps = append(apps, id)
- }
+ Ω(namespace.Status.Phase).To(Equal(v1.NamespaceActive))
- // Initial allocation to fill ns quota
- appAllocs := map[string]map[string]int{
- apps[0]: {"pods": 1, "minMembers": 0},
- apps[1]: {"pods": 3, "minMembers": 3},
- apps[2]: {"pods": 1, "minMembers": 0},
- }
- // Expected appState progression
- appStates := map[string][]string{
- apps[0]: {"Completing"},
- apps[1]: {"Running"},
- apps[2]: {"Accepted"},
- }
+ appIDA := "app-a-" + common.RandSeq(5)
+ appIDB := "app-b-" + common.RandSeq(5)
+ appIDC := "app-c-" + common.RandSeq(5)
- // Base pod conf
- taskGroupName := groupA + "-" + common.RandSeq(5)
- podResources := map[string]resource.Quantity{
- "cpu": resource.MustParse("100m"),
- "memory": resource.MustParse("100M"),
- }
- podConf := k8s.TestPodConfig{
- Annotations: &k8s.PodAnnotation{
- TaskGroupName: taskGroupName,
- TaskGroups: []interfaces.TaskGroup{
- {Name: taskGroupName, MinResource:
podResources},
- },
+ minResource[v1.ResourceCPU.String()] =
resource.MustParse("100m")
+ minResource[v1.ResourceMemory.String()] =
resource.MustParse("100M")
+
+ annotationsA := k8s.PodAnnotation{
+ TaskGroupName: groupA,
+ TaskGroups: []interfaces.TaskGroup{
+ {Name: groupA, MinMember: int32(0),
MinResource: minResource},
},
- Resources: &v1.ResourceRequirements{
- Requests: v1.ResourceList{
- "cpu": podResources["cpu"],
- "memory": podResources["memory"],
- },
+ }
+ annotationsB := k8s.PodAnnotation{
+ TaskGroupName: groupB,
+ TaskGroups: []interfaces.TaskGroup{
+ {Name: groupB, MinMember: int32(3),
MinResource: minResource},
},
}
-
- // Deploy 3 apps in that order
- for _, appID := range apps {
- req := appAllocs[appID]
- podConf.Annotations.TaskGroups[0].MinMember =
int32(req["minMembers"])
- podConf.Labels = map[string]string{
- "app": "sleep-" + common.RandSeq(5),
- "applicationId": appID,
- }
- jobConf := k8s.JobConfig{
- Name: appID,
- Namespace: fifoQName,
- Parallelism: int32(req["pods"]),
- PodConfig: podConf,
- }
-
- By(fmt.Sprintf("[%s] Deploy %d pods", appID,
req["pods"]))
- job, jobErr := k8s.InitJobConfig(jobConf)
- Ω(jobErr).NotTo(HaveOccurred())
- taskGroupsMap, annErr :=
k8s.PodAnnotationToMap(podConf.Annotations)
- Ω(annErr).NotTo(HaveOccurred())
- By(fmt.Sprintf("Deploy job %s with task-groups: %+v",
jobConf.Name, taskGroupsMap[k8s.TaskGroups]))
- _, jobErr = kClient.CreateJob(job, fifoQName)
- Ω(jobErr).NotTo(HaveOccurred())
- createErr := kClient.WaitForJobPodsCreated(fifoQName,
job.Name, int(*job.Spec.Parallelism), 30*time.Second)
- Ω(createErr).NotTo(HaveOccurred())
-
- // To ensure there is minor gap between applications
- time.Sleep(1 * time.Second)
+ annotationsC := k8s.PodAnnotation{
+ TaskGroupName: groupC,
+ TaskGroups: []interfaces.TaskGroup{
+ {Name: groupC, MinMember: int32(0),
MinResource: minResource},
+ },
}
+ jobA := createJob(appIDA, minResource, annotationsA, 1)
+ time.Sleep(1 * time.Second) // To ensure there is minor gap
between applications
+ createJob(appIDB, minResource, annotationsB, 3)
+ time.Sleep(1 * time.Second) // To ensure there is minor gap
between applications
+ createJob(appIDC, minResource, annotationsC, 1)
- // App1 should have 2/3 placeholders running
- podConf.Annotations.TaskGroups[0].MinMember =
int32(appAllocs[apps[1]]["minMembers"])
+ // AppB should have 2/3 placeholders running
+ By("Wait for 2 placeholders running in app " + appIDB)
statusRunning := v1.PodRunning
- phErr := kClient.WaitForPlaceholders(fifoQName,
"tg-"+apps[1]+"-", 2, 30*time.Second, &statusRunning)
+ phErr := kClient.WaitForPlaceholders(ns, "tg-"+appIDB+"-", 2,
30*time.Second, &statusRunning)
Ω(phErr).NotTo(HaveOccurred())
- // Delete app0
- deleteErr := kClient.DeleteJob(apps[0], fifoQName)
+ // Delete appA
+ deleteErr := kClient.DeleteJob(jobA.Name, ns)
Ω(deleteErr).NotTo(HaveOccurred())
+ jobNames = jobNames[1:] // remove jobA, so we don't delete
again in AfterEach
- // Now, app0=Completed, app1=Running, app2=Accepted
- for appID, states := range appStates {
- By(fmt.Sprintf("[%s] Verify appStatus = %s", appID,
states[0]))
- timeoutErr :=
restClient.WaitForAppStateTransition(defaultPartition, "root."+fifoQName,
appID, states[0], 120)
- Ω(timeoutErr).NotTo(HaveOccurred())
- }
+ // Now, appA=Completing, appB=Running, appC=Accepted
+ checkAppStatus(appIDA, yunikorn.States().Application.Completing)
+ checkAppStatus(appIDB, yunikorn.States().Application.Running)
+ checkAppStatus(appIDC, yunikorn.States().Application.Accepted)
- appDaoInfo, appDaoInfoErr :=
restClient.GetAppInfo(defaultPartition, "root."+fifoQName, apps[0])
+ appDaoInfo, appDaoInfoErr :=
restClient.GetAppInfo(configmanager.DefaultPartition, nsQueue, appIDA)
Ω(appDaoInfoErr).NotTo(HaveOccurred())
Ω(len(appDaoInfo.Allocations)).To(Equal(0), "Allocations count
is not correct")
Ω(len(appDaoInfo.PlaceholderData)).To(Equal(0), "Placeholder
count is not correct")
- appDaoInfo, appDaoInfoErr =
restClient.GetAppInfo(defaultPartition, "root."+fifoQName, apps[1])
+ appDaoInfo, appDaoInfoErr =
restClient.GetAppInfo(configmanager.DefaultPartition, nsQueue, appIDB)
Ω(appDaoInfoErr).NotTo(HaveOccurred())
Ω(len(appDaoInfo.Allocations)).To(Equal(3), "Allocations count
is not correct")
Ω(len(appDaoInfo.PlaceholderData)).To(Equal(1), "Placeholder
count is not correct")
Ω(int(appDaoInfo.PlaceholderData[0].Count)).To(Equal(int(3)),
"Placeholder count is not correct")
- appDaoInfo, appDaoInfoErr =
restClient.GetAppInfo(defaultPartition, "root."+fifoQName, apps[2])
+ appDaoInfo, appDaoInfoErr =
restClient.GetAppInfo(configmanager.DefaultPartition, nsQueue, appIDC)
Ω(appDaoInfoErr).NotTo(HaveOccurred())
Ω(len(appDaoInfo.Allocations)).To(Equal(0), "Allocations count
is not correct")
Ω(len(appDaoInfo.PlaceholderData)).To(Equal(0), "Placeholder
count is not correct")
-
- deleteErr = kClient.DeleteJob(apps[1], fifoQName)
- Ω(deleteErr).NotTo(HaveOccurred())
-
- deleteErr = kClient.DeleteJob(apps[2], fifoQName)
- Ω(deleteErr).NotTo(HaveOccurred())
})
// Test validates that lost placeholders resources are decremented by
Yunikorn.
@@ -687,74 +387,38 @@ var _ = Describe("", func() {
nodes, err := kClient.GetNodes()
Ω(err).NotTo(HaveOccurred())
workerNodes := k8s.GetWorkerNodes(*nodes)
-
- podResources := map[string]resource.Quantity{
- "cpu": resource.MustParse("10m"),
- "memory": resource.MustParse("10M"),
- }
+ Ω(len(workerNodes)).NotTo(Equal(0))
pdTimeout := 60
- placeholderTimeoutStr := fmt.Sprintf("%s=%d",
"placeholderTimeoutInSeconds", pdTimeout)
annotations := k8s.PodAnnotation{
- SchedulingPolicyParams: placeholderTimeoutStr,
+ SchedulingPolicyParams: fmt.Sprintf("%s=%d",
constants.SchedulingPolicyTimeoutParam, pdTimeout),
TaskGroups: []interfaces.TaskGroup{
{
Name: groupA,
MinMember: int32(3),
- MinResource: podResources,
+ MinResource: minResource,
NodeSelector:
map[string]string{"kubernetes.io/hostname": workerNodes[0].Name},
},
{
Name: groupB,
MinMember: int32(1),
- MinResource: podResources,
- NodeSelector:
map[string]string{"kubernetes.io/hostname": "unsatisfiable"},
- },
- },
- }
-
- podConf := k8s.TestPodConfig{
- Labels: map[string]string{
- "app": "sleep-" + common.RandSeq(5),
- "applicationId": "appid-" + common.RandSeq(5),
- },
- Annotations: &annotations,
- Resources: &v1.ResourceRequirements{
- Requests: v1.ResourceList{
- "cpu": podResources["cpu"],
- "memory": podResources["memory"],
+ MinResource: minResource,
+ NodeSelector: unsatisfiableNodeSelector,
},
},
}
- jobConf := k8s.JobConfig{
- Name: "gangjob-" + common.RandSeq(5),
- Namespace: ns,
- Parallelism: 3,
- PodConfig: podConf,
- }
+ createJob(appID, minResource, annotations, 3)
- // Create gang job
- job, jobErr := k8s.InitJobConfig(jobConf)
- Ω(jobErr).NotTo(HaveOccurred())
- taskGroupsMap, annErr :=
k8s.PodAnnotationToMap(podConf.Annotations)
- Ω(annErr).NotTo(HaveOccurred())
- By(fmt.Sprintf("[%s] Deploy job %s with task-groups: %+v",
podConf.Labels["applicationId"], jobConf.Name, taskGroupsMap[k8s.TaskGroups]))
- _, jobErr = kClient.CreateJob(job, ns)
- Ω(jobErr).NotTo(HaveOccurred())
- createErr := kClient.WaitForJobPodsCreated(ns, job.Name,
int(*job.Spec.Parallelism), 30*time.Second)
- Ω(createErr).NotTo(HaveOccurred())
-
- By(fmt.Sprintf("[%s] Verify appStatus = Accepted",
podConf.Labels["applicationId"]))
- timeoutErr :=
restClient.WaitForAppStateTransition(defaultPartition, nsQueue,
podConf.Labels["applicationId"], yunikorn.States().Application.Accepted, 120)
- Ω(timeoutErr).NotTo(HaveOccurred())
+ checkAppStatus(appID, yunikorn.States().Application.Accepted)
// Wait for groupa placeholder pods running
+ By(fmt.Sprintf("Wait for %s placeholders running", groupA))
stateRunning := v1.PodRunning
- runErr := kClient.WaitForPlaceholders(ns,
"tg-"+podConf.Labels["applicationId"]+"-"+groupA+"-", 3, 30*time.Second,
&stateRunning)
+ runErr := kClient.WaitForPlaceholders(ns,
"tg-"+appID+"-"+groupA+"-", 3, 30*time.Second, &stateRunning)
Ω(runErr).NotTo(HaveOccurred())
// Delete all groupa placeholder pods
- phPods, listErr := kClient.ListPlaceholders(ns,
"tg-"+podConf.Labels["applicationId"]+"-"+groupA+"-")
+ phPods, listErr := kClient.ListPlaceholders(ns,
"tg-"+appID+"-"+groupA+"-")
Ω(listErr).NotTo(HaveOccurred())
for _, ph := range phPods {
By(fmt.Sprintf("Delete placeholder %s", ph.Name))
@@ -766,21 +430,21 @@ var _ = Describe("", func() {
time.Sleep(5 * time.Second)
// Verify app allocations correctly decremented
- appInfo, appErr := restClient.GetAppInfo(defaultPartition,
nsQueue, podConf.Labels["applicationId"])
+ appInfo, appErr :=
restClient.GetAppInfo(configmanager.DefaultPartition, nsQueue, appID)
Ω(appErr).NotTo(HaveOccurred())
Ω(len(appInfo.Allocations)).To(Equal(0), "Placeholder
allocation not removed from app")
// Verify no app allocation in nodeA
- ykNodes, nodeErr := restClient.GetNodes(defaultPartition)
+ ykNodes, nodeErr :=
restClient.GetNodes(configmanager.DefaultPartition)
Ω(nodeErr).NotTo(HaveOccurred())
for _, node := range *ykNodes {
for _, alloc := range node.Allocations {
-
Ω(alloc.ApplicationID).NotTo(Equal(podConf.Labels["applicationId"]),
"Placeholder allocation not removed from node")
+ Ω(alloc.ApplicationID).NotTo(Equal(appID),
"Placeholder allocation not removed from node")
}
}
// Verify queue resources = 0
- qInfo, qErr := restClient.GetQueue(defaultPartition, nsQueue)
+ qInfo, qErr :=
restClient.GetQueue(configmanager.DefaultPartition, nsQueue)
Ω(qErr).NotTo(HaveOccurred())
var usedResource yunikorn.ResourceUsage
var usedPercentageResource yunikorn.ResourceUsage
@@ -790,85 +454,42 @@ var _ = Describe("", func() {
usedPercentageResource.ParseResourceUsage(qInfo.AbsUsedCapacity)
Ω(usedPercentageResource.GetResourceValue(siCommon.CPU)).Should(Equal(int64(0)),
"Placeholder allocation not removed from queue")
Ω(usedPercentageResource.GetResourceValue(siCommon.Memory)).Should(Equal(int64(0)),
"Placeholder allocation not removed from queue")
-
- err = kClient.DeleteJob(job.Name, ns)
- Ω(err).NotTo(gomega.HaveOccurred())
})
// Test to verify completed placeholders cleanup
// 1. Deploy 1 job with 2 task group's:
- // a. 1 tg with un runnable placeholders
- // b. 1 tg with runnable placeholders
+ // a) 1 tg with un runnable placeholders
+ // b) 1 tg with runnable placeholders
// 2. Delete job
// 3. Verify app is completing
// 4. Verify placeholders deleted
// 5. Verify app allocation is empty
It("Verify_Completed_Job_Placeholders_Cleanup", func() {
- podResources := map[string]resource.Quantity{
- "cpu": resource.MustParse("10m"),
- "memory": resource.MustParse("10M"),
- }
- podConf := k8s.TestPodConfig{
- Labels: map[string]string{
- "app": "sleep-" + common.RandSeq(5),
- "applicationId": "appid-" + common.RandSeq(5),
- },
- Annotations: &k8s.PodAnnotation{
- TaskGroups: []interfaces.TaskGroup{
- {
- Name: groupA + "-" +
common.RandSeq(5),
- MinMember: int32(3),
- MinResource: podResources,
- NodeSelector:
map[string]string{"kubernetes.io/hostname": "unsatisfiable"},
- },
- {
- Name: groupB + "-" +
common.RandSeq(5),
- MinMember: int32(3),
- MinResource: podResources,
- },
- },
- },
- Resources: &v1.ResourceRequirements{
- Requests: v1.ResourceList{
- "cpu": podResources["cpu"],
- "memory": podResources["memory"],
- },
+ annotations := k8s.PodAnnotation{
+ TaskGroups: []interfaces.TaskGroup{
+ {Name: groupA, MinMember: int32(3),
MinResource: minResource, NodeSelector: unsatisfiableNodeSelector},
+ {Name: groupB, MinMember: int32(3),
MinResource: minResource},
},
}
- jobConf := k8s.JobConfig{
- Name: "gangjob-" + common.RandSeq(5),
- Namespace: ns,
- Parallelism: int32(1),
- PodConfig: podConf,
- }
- job, jobErr := k8s.InitJobConfig(jobConf)
- Ω(jobErr).NotTo(HaveOccurred())
- taskGroupsMap, annErr :=
k8s.PodAnnotationToMap(podConf.Annotations)
- Ω(annErr).NotTo(HaveOccurred())
- By(fmt.Sprintf("Deploy job %s with task-groups: %+v",
jobConf.Name, taskGroupsMap[k8s.TaskGroups]))
- _, jobErr = kClient.CreateJob(job, ns)
- Ω(jobErr).NotTo(HaveOccurred())
- createErr := kClient.WaitForJobPodsCreated(ns, job.Name,
int(*job.Spec.Parallelism), 30*time.Second)
- Ω(createErr).NotTo(HaveOccurred())
-
- By("Verify appState = Accepted")
- timeoutErr :=
restClient.WaitForAppStateTransition(defaultPartition, nsQueue,
podConf.Labels["applicationId"], yunikorn.States().Application.Accepted, 10)
- Ω(timeoutErr).NotTo(HaveOccurred())
+ job := createJob(appID, minResource, annotations, 1)
+
+ checkAppStatus(appID, yunikorn.States().Application.Accepted)
By("Wait for groupB placeholders running")
stateRunning := v1.PodRunning
- runErr := kClient.WaitForPlaceholders(ns,
"tg-"+podConf.Labels["applicationId"]+"-"+groupB+"-", 3, 30*time.Second,
&stateRunning)
+ runErr := kClient.WaitForPlaceholders(ns,
"tg-"+appID+"-"+groupB+"-", 3, 30*time.Second, &stateRunning)
Ω(runErr).NotTo(HaveOccurred())
By("List placeholders")
- tgPods, listErr := kClient.ListPlaceholders(ns,
"tg-"+podConf.Labels["applicationId"]+"-")
+ tgPods, listErr := kClient.ListPlaceholders(ns, "tg-"+appID+"-")
Ω(listErr).NotTo(HaveOccurred())
By("Delete job pods")
- deleteErr := kClient.DeleteJob(jobConf.Name, ns)
+ deleteErr := kClient.DeleteJob(job.Name, ns)
Ω(deleteErr).NotTo(HaveOccurred())
- timeoutErr =
restClient.WaitForAppStateTransition(defaultPartition, nsQueue,
podConf.Labels["applicationId"], yunikorn.States().Application.Completing, 30)
- Ω(timeoutErr).NotTo(HaveOccurred())
+ jobNames = []string{} // remove job names to prevent delete job
again in AfterEach
+
+ checkAppStatus(appID, yunikorn.States().Application.Completing)
By("Verify placeholders deleted")
for _, ph := range tgPods {
@@ -877,9 +498,9 @@ var _ = Describe("", func() {
}
By("Verify app allocation is empty")
- appInfo, restErr := restClient.GetAppInfo(defaultPartition,
nsQueue, podConf.Labels["applicationId"])
+ appInfo, restErr :=
restClient.GetAppInfo(configmanager.DefaultPartition, nsQueue, appID)
Ω(restErr).NotTo(HaveOccurred())
- Ω(len(appInfo.Allocations)).To(BeNumerically("==", 0))
+ Ω(len(appInfo.Allocations)).To(Equal(0))
})
// Test to verify originator deletion will trigger placeholders cleanup
@@ -890,118 +511,189 @@ var _ = Describe("", func() {
// 5. Verify app allocation is empty
It("Verify_OriginatorDeletion_Trigger_Placeholders_Cleanup", func() {
// case 1: originator pod without ownerreference
- podResources := map[string]resource.Quantity{
- "cpu": resource.MustParse("10m"),
- "memory": resource.MustParse("10M"),
+ verifyOriginatorDeletionCase(false)
+ })
+
+ // Test to verify originator deletion will trigger placeholders cleanup
+ // 1. Create an originator pod
+ // 2. Set pod ownerreference with ownerreference, take configmap for
example
+ // 3. Delete originator pod to trigger placeholders deletion
+ // 4. Verify placeholders deleted
+ // 5. Verify app allocation is empty
+
It("Verify_OriginatorDeletionWithOwnerreference_Trigger_Placeholders_Cleanup",
func() {
+ // case 2: originator pod with ownerreference
+ verifyOriginatorDeletionCase(true)
+ })
+
+ // Test placeholder with hugepages
+ // 1. Deploy 1 job with hugepages-2Mi
+ // 2. Verify all pods running
+ It("Verify_HugePage", func() {
+ hugepageKey := fmt.Sprintf("%s2Mi", v1.ResourceHugePagesPrefix)
+ nodes, err := kClient.GetNodes()
+ Ω(err).NotTo(HaveOccurred())
+ hasHugePages := false
+ for _, node := range nodes.Items {
+ if v, ok :=
node.Status.Capacity[v1.ResourceName(hugepageKey)]; ok {
+ if v.Value() != 0 {
+ hasHugePages = true
+ break
+ }
+ }
}
- podConf := k8s.TestPodConfig{
- Name: "gang-driver-pod" + common.RandSeq(5),
- Labels: map[string]string{
- "app": "sleep-" + common.RandSeq(5),
- "applicationId": "appid-" + common.RandSeq(5),
- },
- Annotations: &k8s.PodAnnotation{
- TaskGroups: []interfaces.TaskGroup{
- {
- Name: groupA + "-" +
common.RandSeq(5),
- MinMember: int32(3),
- MinResource: podResources,
- NodeSelector:
map[string]string{"kubernetes.io/hostname": "unsatisfiable"},
- },
- {
- Name: groupB + "-" +
common.RandSeq(5),
- MinMember: int32(3),
- MinResource: podResources,
- },
- },
- },
- Resources: &v1.ResourceRequirements{
- Requests: v1.ResourceList{
- "cpu": podResources["cpu"],
- "memory": podResources["memory"],
- },
- },
+ if !hasHugePages {
+ ginkgo.Skip("Skip hugepages test as no node has
hugepages")
}
- podTest, err := k8s.InitTestPod(podConf)
- Ω(err).NotTo(HaveOccurred())
- taskGroupsMap, annErr :=
k8s.PodAnnotationToMap(podConf.Annotations)
- Ω(annErr).NotTo(HaveOccurred())
- By(fmt.Sprintf("Deploy pod %s with task-groups: %+v",
podTest.Name, taskGroupsMap[k8s.TaskGroups]))
- originator, err := kClient.CreatePod(podTest, ns)
- Ω(err).NotTo(HaveOccurred())
+ // add hugepages to request
+ minResource[hugepageKey] = resource.MustParse("100Mi")
+ annotations := k8s.PodAnnotation{
+ TaskGroupName: groupA,
+ TaskGroups: []interfaces.TaskGroup{
+ {Name: groupA, MinMember: int32(3),
MinResource: minResource},
+ },
+ }
+ job := createJob(appID, minResource, annotations, 3)
- By("Verify appState = Accepted")
- timeoutErr :=
restClient.WaitForAppStateTransition(defaultPartition, nsQueue,
podConf.Labels["applicationId"], yunikorn.States().Application.Accepted, 20)
- Ω(timeoutErr).NotTo(HaveOccurred())
+ By("Verify all job pods are running")
+ jobRunErr := kClient.WaitForJobPods(ns, job.Name,
int(*job.Spec.Parallelism), 2*time.Minute)
+ Ω(jobRunErr).NotTo(HaveOccurred())
- By("Wait for groupB placeholders running")
- stateRunning := v1.PodRunning
- runErr := kClient.WaitForPlaceholders(ns,
"tg-"+podConf.Labels["applicationId"]+"-"+groupB+"-", 3, 30*time.Second,
&stateRunning)
- Ω(runErr).NotTo(HaveOccurred())
+ checkAppStatus(appID, yunikorn.States().Application.Running)
- By("List placeholders")
- tgPods, listErr := kClient.ListPlaceholders(ns,
"tg-"+podConf.Labels["applicationId"]+"-")
- Ω(listErr).NotTo(HaveOccurred())
+ // Ensure placeholders are replaced and allocations count is
correct
+ appDaoInfo, appDaoInfoErr :=
restClient.GetAppInfo(configmanager.DefaultPartition, nsQueue, appID)
+ Ω(appDaoInfoErr).NotTo(HaveOccurred())
+ Ω(len(appDaoInfo.PlaceholderData)).To(Equal(1), "Placeholder
count is not correct")
+ checkPlaceholderData(appDaoInfo, groupA, 3, 3, 0)
+ Ω(len(appDaoInfo.Allocations)).To(Equal(int(3)), "Allocations
count is not correct")
+
Ω(appDaoInfo.UsedResource[hugepageKey]).To(Equal(int64(314572800)), "Used huge
page resource is not correct")
+ })
- By("Delete originator pod")
- deleteErr := kClient.DeletePod(originator.Name, ns)
- Ω(deleteErr).NotTo(HaveOccurred())
+ AfterEach(func() {
+ testDescription := ginkgo.CurrentSpecReport()
+ if testDescription.Failed() {
+
tests.LogTestClusterInfoWrapper(testDescription.FailureMessage(), []string{ns})
+
tests.LogYunikornContainer(testDescription.FailureMessage())
+ }
- By("Verify placeholders deleted")
- for _, ph := range tgPods {
- deleteErr = kClient.WaitForPodTerminated(ns, ph.Name,
30*time.Second)
- Ω(deleteErr).NotTo(HaveOccurred(), "Placeholder %s
still running", ph)
+ By(fmt.Sprintf("Cleanup jobs: %v", jobNames))
+ for _, jobName := range jobNames {
+ err := kClient.DeleteJob(jobName, ns)
+ Ω(err).NotTo(gomega.HaveOccurred())
}
- By("Verify app allocation is empty")
- appInfo, restErr := restClient.GetAppInfo(defaultPartition,
nsQueue, podConf.Labels["applicationId"])
- Ω(restErr).NotTo(HaveOccurred())
- Ω(len(appInfo.Allocations)).To(BeNumerically("==", 0))
+ By("Tear down namespace: " + ns)
+ err := kClient.TearDownNamespace(ns)
+ Ω(err).NotTo(HaveOccurred())
})
- // Test to verify originator deletion will trigger placeholders cleanup
- // 1. Create an originator pod
- // 2. Set pod ownerreference with ownerreference, take configmap for
example
- // 3. Delete originator pod to trigger placeholders deletion
- // 4. Verify placeholders deleted
- // 5. Verify app allocation is empty
-
It("Verify_OriginatorDeletionWithOwnerreference_Trigger_Placeholders_Cleanup",
func() {
- // case 2: originator pod with ownerreference
- podResources := map[string]resource.Quantity{
- "cpu": resource.MustParse("10m"),
- "memory": resource.MustParse("10M"),
- }
- podConf := k8s.TestPodConfig{
- Name: "gang-driver-pod" + common.RandSeq(5),
- Labels: map[string]string{
- "app": "sleep-" + common.RandSeq(5),
- "applicationId": "appid-" + common.RandSeq(5),
- },
- Annotations: &k8s.PodAnnotation{
- TaskGroups: []interfaces.TaskGroup{
- {
- Name: groupA + "-" +
common.RandSeq(5),
- MinMember: int32(3),
- MinResource: podResources,
- NodeSelector:
map[string]string{"kubernetes.io/hostname": "unsatisfiable"},
- },
- {
- Name: groupB + "-" +
common.RandSeq(5),
- MinMember: int32(3),
- MinResource: podResources,
- },
+})
+
+func createJob(applicationID string, minResource map[string]resource.Quantity,
annotations k8s.PodAnnotation, parallelism int32) (job *batchv1.Job) {
+ var (
+ err error
+ requests = v1.ResourceList{}
+ limits = v1.ResourceList{}
+ )
+ for k, v := range minResource {
+ key := v1.ResourceName(k)
+ requests[key] = v
+ if strings.HasPrefix(k, v1.ResourceHugePagesPrefix) {
+ limits[key] = v
+ }
+ }
+
+ podConf := k8s.TestPodConfig{
+ Labels: map[string]string{
+ "app": "sleep-" + common.RandSeq(5),
+ "applicationId": applicationID,
+ },
+ Annotations: &annotations,
+ Resources: &v1.ResourceRequirements{
+ Requests: requests,
+ Limits: limits,
+ },
+ }
+ jobConf := k8s.JobConfig{
+ Name: fmt.Sprintf("gangjob-%s-%s", applicationID,
common.RandSeq(5)),
+ Namespace: ns,
+ Parallelism: parallelism,
+ PodConfig: podConf,
+ }
+
+ job, err = k8s.InitJobConfig(jobConf)
+ Ω(err).NotTo(HaveOccurred())
+
+ taskGroupsMap, err := k8s.PodAnnotationToMap(podConf.Annotations)
+ Ω(err).NotTo(HaveOccurred())
+
+ By(fmt.Sprintf("[%s] Deploy job %s with task-groups: %+v",
applicationID, jobConf.Name, taskGroupsMap[k8s.TaskGroups]))
+ job, err = kClient.CreateJob(job, ns)
+ Ω(err).NotTo(HaveOccurred())
+
+ err = kClient.WaitForJobPodsCreated(ns, job.Name,
int(*job.Spec.Parallelism), 30*time.Second)
+ Ω(err).NotTo(HaveOccurred())
+
+ jobNames = append(jobNames, job.Name) // for cleanup in afterEach
function
+ return job
+}
+
+func checkAppStatus(applicationID, state string) {
+ By(fmt.Sprintf("Verify application %s status is %s", applicationID,
state))
+ timeoutErr :=
restClient.WaitForAppStateTransition(configmanager.DefaultPartition, nsQueue,
applicationID, state, 120)
+ Ω(timeoutErr).NotTo(HaveOccurred())
+}
+
+func checkPlaceholderData(appDaoInfo *dao.ApplicationDAOInfo, tgName string,
count, replaced, timeout int) {
+ verified := false
+ for _, placeholderData := range appDaoInfo.PlaceholderData {
+ if tgName == placeholderData.TaskGroupName {
+ Ω(int(placeholderData.Count)).To(Equal(count),
"Placeholder count is not correct")
+ Ω(int(placeholderData.Replaced)).To(Equal(replaced),
"Placeholder replaced is not correct")
+ Ω(int(placeholderData.TimedOut)).To(Equal(timeout),
"Placeholder timeout is not correct")
+ verified = true
+ break
+ }
+ }
+ Ω(verified).To(Equal(true), fmt.Sprintf("Can't find task group %s in
app info", tgName))
+}
+
+func verifyOriginatorDeletionCase(withOwnerRef bool) {
+ podConf := k8s.TestPodConfig{
+ Name: "gang-driver-pod" + common.RandSeq(5),
+ Labels: map[string]string{
+ "app": "sleep-" + common.RandSeq(5),
+ "applicationId": appID,
+ },
+ Annotations: &k8s.PodAnnotation{
+ TaskGroups: []interfaces.TaskGroup{
+ {
+ Name: groupA,
+ MinMember: int32(3),
+ MinResource: minResource,
+ NodeSelector: unsatisfiableNodeSelector,
},
- },
- Resources: &v1.ResourceRequirements{
- Requests: v1.ResourceList{
- "cpu": podResources["cpu"],
- "memory": podResources["memory"],
+ {
+ Name: groupB,
+ MinMember: int32(3),
+ MinResource: minResource,
},
},
- }
+ },
+ Resources: &v1.ResourceRequirements{
+ Requests: v1.ResourceList{
+ "cpu": minResource["cpu"],
+ "memory": minResource["memory"],
+ },
+ },
+ }
+
+ podTest, err := k8s.InitTestPod(podConf)
+ Ω(err).NotTo(HaveOccurred())
+ if withOwnerRef {
// create a configmap as ownerreference
testConfigmap := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
@@ -1020,9 +712,6 @@ var _ = Describe("", func() {
testConfigmap, err := kClient.CreateConfigMap(testConfigmap, ns)
Ω(err).NotTo(HaveOccurred())
- podTest, err := k8s.InitTestPod(podConf)
- Ω(err).NotTo(HaveOccurred())
-
podTest.OwnerReferences = []metav1.OwnerReference{
{
APIVersion: "v1",
@@ -1031,52 +720,37 @@ var _ = Describe("", func() {
UID: testConfigmap.UID,
},
}
+ }
- taskGroupsMap, annErr :=
k8s.PodAnnotationToMap(podConf.Annotations)
- Ω(annErr).NotTo(HaveOccurred())
- By(fmt.Sprintf("Deploy pod %s with task-groups: %+v",
podTest.Name, taskGroupsMap[k8s.TaskGroups]))
- originator, err := kClient.CreatePod(podTest, ns)
- Ω(err).NotTo(HaveOccurred())
-
- By("Verify appState = Accepted")
- timeoutErr :=
restClient.WaitForAppStateTransition(defaultPartition, nsQueue,
podConf.Labels["applicationId"], yunikorn.States().Application.Accepted, 20)
- Ω(timeoutErr).NotTo(HaveOccurred())
-
- By("Wait for groupB placeholders running")
- stateRunning := v1.PodRunning
- runErr := kClient.WaitForPlaceholders(ns,
"tg-"+podConf.Labels["applicationId"]+"-"+groupB+"-", 3, 30*time.Second,
&stateRunning)
- Ω(runErr).NotTo(HaveOccurred())
-
- By("List placeholders")
- tgPods, listErr := kClient.ListPlaceholders(ns,
"tg-"+podConf.Labels["applicationId"]+"-")
- Ω(listErr).NotTo(HaveOccurred())
+ taskGroupsMap, annErr := k8s.PodAnnotationToMap(podConf.Annotations)
+ Ω(annErr).NotTo(HaveOccurred())
+ By(fmt.Sprintf("Deploy pod %s with task-groups: %+v", podTest.Name,
taskGroupsMap[k8s.TaskGroups]))
+ originator, err := kClient.CreatePod(podTest, ns)
+ Ω(err).NotTo(HaveOccurred())
- By("Delete originator pod")
- deleteErr := kClient.DeletePod(originator.Name, ns)
- Ω(deleteErr).NotTo(HaveOccurred())
+ checkAppStatus(appID, yunikorn.States().Application.Accepted)
- By("Verify placeholders deleted")
- for _, ph := range tgPods {
- deleteErr = kClient.WaitForPodTerminated(ns, ph.Name,
30*time.Second)
- Ω(deleteErr).NotTo(HaveOccurred(), "Placeholder %s
still running", ph)
- }
+ By("Wait for groupB placeholders running")
+ stateRunning := v1.PodRunning
+ runErr := kClient.WaitForPlaceholders(ns, "tg-"+appID+"-"+groupB+"-",
3, 30*time.Second, &stateRunning)
+ Ω(runErr).NotTo(HaveOccurred())
- By("Verify app allocation is empty")
- appInfo, restErr := restClient.GetAppInfo(defaultPartition,
nsQueue, podConf.Labels["applicationId"])
- Ω(restErr).NotTo(HaveOccurred())
- Ω(len(appInfo.Allocations)).To(BeNumerically("==", 0))
- })
+ By("List placeholders")
+ tgPods, listErr := kClient.ListPlaceholders(ns, "tg-"+appID+"-")
+ Ω(listErr).NotTo(HaveOccurred())
- AfterEach(func() {
- testDescription := ginkgo.CurrentSpecReport()
- if testDescription.Failed() {
-
tests.LogTestClusterInfoWrapper(testDescription.FailureMessage(), []string{ns})
-
tests.LogYunikornContainer(testDescription.FailureMessage())
- }
- By("Tear down namespace: " + ns)
- err := kClient.TearDownNamespace(ns)
+ By("Delete originator pod")
+ deleteErr := kClient.DeletePod(originator.Name, ns)
+ Ω(deleteErr).NotTo(HaveOccurred())
- Ω(err).NotTo(HaveOccurred())
- })
+ By("Verify placeholders deleted")
+ for _, ph := range tgPods {
+ deleteErr = kClient.WaitForPodTerminated(ns, ph.Name,
30*time.Second)
+ Ω(deleteErr).NotTo(HaveOccurred(), "Placeholder %s still
running", ph)
+ }
-})
+ By("Verify app allocation is empty")
+ appInfo, restErr :=
restClient.GetAppInfo(configmanager.DefaultPartition, nsQueue, appID)
+ Ω(restErr).NotTo(HaveOccurred())
+ Ω(len(appInfo.Allocations)).To(BeNumerically("==", 0))
+}
diff --git a/test/e2e/priority_scheduling/priority_scheduling_test.go
b/test/e2e/priority_scheduling/priority_scheduling_test.go
index 40dd8ce1..d4f3ca57 100644
--- a/test/e2e/priority_scheduling/priority_scheduling_test.go
+++ b/test/e2e/priority_scheduling/priority_scheduling_test.go
@@ -348,14 +348,8 @@ var _ = ginkgo.Describe("PriorityScheduling", func() {
Ω(err).NotTo(gomega.HaveOccurred())
By("Wait for high-priority placeholders terminated")
- var tgPlaceHolders map[string][]string
- tgPlaceHolders =
yunikorn.GetPlaceholderNames(highPodConf.Annotations,
highPodConf.Labels["applicationId"])
- for _, phNames := range tgPlaceHolders {
- for _, ph := range phNames {
- phTermErr :=
kubeClient.WaitForPodTerminated(ns, ph, 1*time.Minute)
- Ω(phTermErr).NotTo(HaveOccurred())
- }
- }
+ err = kubeClient.WaitForPlaceholders(ns,
"tg-"+highPodConf.Labels["applicationId"]+"-", 0, 30*time.Second, nil)
+ Ω(err).NotTo(HaveOccurred())
By("Wait for high-priority pod to begin running")
err = kubeClient.WaitForPodRunning(ns, highPod.Name,
1*time.Minute)
@@ -369,13 +363,8 @@ var _ = ginkgo.Describe("PriorityScheduling", func() {
Ω(err).NotTo(gomega.HaveOccurred())
By("Wait for normal-priority placeholders terminated")
- tgPlaceHolders =
yunikorn.GetPlaceholderNames(normalPodConf.Annotations,
normalPodConf.Labels["applicationId"])
- for _, phNames := range tgPlaceHolders {
- for _, ph := range phNames {
- phTermErr :=
kubeClient.WaitForPodTerminated(ns, ph, 1*time.Minute)
- Ω(phTermErr).NotTo(HaveOccurred())
- }
- }
+ err = kubeClient.WaitForPlaceholders(ns,
"tg-"+normalPodConf.Labels["applicationId"]+"-", 0, 30*time.Second, nil)
+ Ω(err).NotTo(HaveOccurred())
By("Wait for normal-priority pod to begin running")
err = kubeClient.WaitForPodRunning(ns, normalPod.Name,
1*time.Minute)
@@ -389,13 +378,8 @@ var _ = ginkgo.Describe("PriorityScheduling", func() {
Ω(err).NotTo(gomega.HaveOccurred())
By("Wait for low-priority placeholders terminated")
- tgPlaceHolders =
yunikorn.GetPlaceholderNames(lowPodConf.Annotations,
lowPodConf.Labels["applicationId"])
- for _, phNames := range tgPlaceHolders {
- for _, ph := range phNames {
- phTermErr :=
kubeClient.WaitForPodTerminated(ns, ph, 1*time.Minute)
- Ω(phTermErr).NotTo(HaveOccurred())
- }
- }
+ err = kubeClient.WaitForPlaceholders(ns,
"tg-"+lowPodConf.Labels["applicationId"]+"-", 0, 30*time.Second, nil)
+ Ω(err).NotTo(HaveOccurred())
By("Wait for low-priority pod to begin running")
err = kubeClient.WaitForPodRunning(ns, lowPod.Name,
1*time.Minute)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]