This is an automated email from the ASF dual-hosted git repository.
ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push:
new a15922a9 [YUNIKORN-1931] Improve placeholder name generation (#664)
a15922a9 is described below
commit a15922a94280de01d92bb631da0229bee1bf5d45
Author: Craig Condit <[email protected]>
AuthorDate: Wed Sep 6 13:18:10 2023 -0500
[YUNIKORN-1931] Improve placeholder name generation (#664)
Closes: #664
Signed-off-by: Craig Condit <[email protected]>
---
pkg/cache/placeholder_manager.go | 19 +-
pkg/cache/placeholder_manager_test.go | 38 ++--
pkg/common/utils/gang_utils.go | 26 ++-
pkg/common/utils/gang_utils_test.go | 33 +--
test/e2e/framework/helpers/k8s/k8s_utils.go | 20 +-
test/e2e/framework/helpers/yunikorn/yk_utils.go | 15 --
test/e2e/gang_scheduling/gang_scheduling_test.go | 245 ++++++---------------
.../recovery_and_restart_test.go | 16 +-
8 files changed, 154 insertions(+), 258 deletions(-)
diff --git a/pkg/cache/placeholder_manager.go b/pkg/cache/placeholder_manager.go
index cdfc57a0..dc938bf9 100644
--- a/pkg/cache/placeholder_manager.go
+++ b/pkg/cache/placeholder_manager.go
@@ -77,21 +77,18 @@ func (mgr *PlaceholderManager) createAppPlaceholders(app
*Application) error {
mgr.Lock()
defer mgr.Unlock()
- existingPlaceHolders := map[string]struct{}{}
- for _, phTasks := range app.GetPlaceHolderTasks() {
- existingPlaceHolders[phTasks.GetTaskPod().GetName()] =
struct{}{}
+ // map task group to count of already created placeholders
+ tgCounts := make(map[string]int32)
+ for _, ph := range app.getPlaceHolderTasks() {
+ tgCounts[ph.getTaskGroupName()]++
}
// iterate all task groups, create placeholders for all the min members
for _, tg := range app.getTaskGroups() {
- for i := int32(0); i < tg.MinMember; i++ {
- placeholderName :=
utils.GeneratePlaceholderName(tg.Name, app.GetApplicationID(), i)
- // when performing recovery, do not create pods that
are already running
- if _, ok := existingPlaceHolders[placeholderName]; ok {
-
log.Log(log.ShimCachePlaceholder).Info("Placeholder pod already exists",
- zap.String("name", placeholderName))
- continue
- }
+ count := tgCounts[tg.Name]
+ // only create missing pods for each task group
+ for i := count; i < tg.MinMember; i++ {
+ placeholderName :=
utils.GeneratePlaceholderName(tg.Name, app.GetApplicationID())
placeholder := newPlaceholder(placeholderName, app, tg)
// create the placeholder on K8s
_, err := mgr.clients.KubeClient.Create(placeholder.pod)
diff --git a/pkg/cache/placeholder_manager_test.go
b/pkg/cache/placeholder_manager_test.go
index dfba30b0..2d4d6003 100644
--- a/pkg/cache/placeholder_manager_test.go
+++ b/pkg/cache/placeholder_manager_test.go
@@ -20,7 +20,7 @@ package cache
import (
"fmt"
- "strconv"
+ "strings"
"testing"
"time"
@@ -65,14 +65,16 @@ func TestCreateAppPlaceholders(t *testing.T) {
// simulate placeholder creation failures
// failed to create one placeholder
+ var failed string
mockedAPIProvider.MockCreateFn(func(pod *v1.Pod) (*v1.Pod, error) {
- if pod.Name == "tg-test-group-2-app01-15" {
+ if failed == "" && strings.HasPrefix(pod.Name,
"tg-app01-test-group-2-") {
+ failed = pod.Name
return nil, fmt.Errorf("failed to create pod %s",
pod.Name)
}
return pod, nil
})
err := placeholderMgr.createAppPlaceholders(app)
- assert.Error(t, err, "failed to create pod tg-test-group-2-app01-15")
+ assert.Error(t, err, fmt.Sprintf("failed to create pod %s", failed))
}
func TestCreateAppPlaceholdersWithExistingPods(t *testing.T) {
@@ -88,16 +90,8 @@ func TestCreateAppPlaceholdersWithExistingPods(t *testing.T)
{
err := placeholderMgr.createAppPlaceholders(app)
assert.NilError(t, err)
assert.Equal(t, 27, len(createdPods))
- assert.Equal(t, (*v1.Pod)(nil), createdPods["tg-test-group-1-app01-0"],
"Pod should not have been created")
- assert.Equal(t, (*v1.Pod)(nil), createdPods["tg-test-group-1-app01-1"],
"Pod should not have been created")
- assert.Equal(t, (*v1.Pod)(nil), createdPods["tg-test-group-1-app02-0"],
"Pod should not have been created")
- var priority *int32
- for _, tg := range []string{"tg-test-group-1-app01-",
"tg-test-group-2-app01-"} {
- for i := 2; i <= 9; i++ {
- podName := tg + strconv.Itoa(i)
- assert.Equal(t, priority,
createdPods[podName].Spec.Priority, "Priority should not be set")
- assert.Equal(t, priorityClassName,
createdPods[podName].Spec.PriorityClassName, "priority class name should be
set")
- }
+ for _, pod := range createdPods {
+ assert.Equal(t, pod.Spec.PriorityClassName,
"test-priority-class", "Pod should have PriorityClassName of
test-priority-class")
}
}
@@ -112,13 +106,8 @@ func createAndCheckPlaceholderCreate(mockedAPIProvider
*client.MockedAPIProvider
err := placeholderMgr.createAppPlaceholders(app)
assert.NilError(t, err, "create app placeholders should be successful")
assert.Equal(t, len(createdPods), 30)
- var priority *int32
- for _, tg := range []string{"tg-test-group-1-app01-",
"tg-test-group-2-app01-"} {
- for i := 2; i <= 9; i++ {
- podName := tg + strconv.Itoa(i)
- assert.Equal(t, priority,
createdPods[podName].Spec.Priority, "Priority should not be set")
- assert.Equal(t, "",
createdPods[podName].Spec.PriorityClassName, "Priority class name should be
empty")
- }
+ for _, pod := range createdPods {
+ assert.Equal(t, "", pod.Spec.PriorityClassName,
"PriorityClassName should be empty")
}
return createdPods
}
@@ -175,7 +164,7 @@ func createAppWIthTaskGroupAndPodsForTest() *Application {
APIVersion: "v1",
},
ObjectMeta: apis.ObjectMeta{
- Name: "tg-test-group-1-app01-0",
+ Name: "tg-app01-test-group-1-0",
UID: "UID-01",
},
Spec: v1.PodSpec{
@@ -188,7 +177,7 @@ func createAppWIthTaskGroupAndPodsForTest() *Application {
APIVersion: "v1",
},
ObjectMeta: apis.ObjectMeta{
- Name: "tg-test-group-1-app01-1",
+ Name: "tg-app01-test-group-1-1",
UID: "UID-02",
},
}
@@ -198,7 +187,7 @@ func createAppWIthTaskGroupAndPodsForTest() *Application {
APIVersion: "v1",
},
ObjectMeta: apis.ObjectMeta{
- Name: "tg-test-group-2-app01-0",
+ Name: "tg-app01-test-group-2-0",
UID: "UID-03",
},
}
@@ -208,6 +197,7 @@ func createAppWIthTaskGroupAndPodsForTest() *Application {
task1.placeholder = true
task1.pod = pod1
task1.originator = true
+ task1.setTaskGroupName("test-group-1")
app.taskMap[taskID1] = task1
app.setOriginatingTask(task1)
@@ -215,12 +205,14 @@ func createAppWIthTaskGroupAndPodsForTest() *Application {
task2 := NewTask(taskID2, app, mockedContext, pod2)
task2.placeholder = true
task2.pod = pod2
+ task2.setTaskGroupName("test-group-1")
app.taskMap[taskID2] = task2
taskID3 := "task2-01"
task3 := NewTask(taskID3, app, mockedContext, pod3)
task3.placeholder = true
task3.pod = pod3
+ task3.setTaskGroupName("test-group-2")
app.taskMap[taskID3] = task3
return app
diff --git a/pkg/common/utils/gang_utils.go b/pkg/common/utils/gang_utils.go
index 796aeddb..9f876a3d 100644
--- a/pkg/common/utils/gang_utils.go
+++ b/pkg/common/utils/gang_utils.go
@@ -20,6 +20,7 @@ package utils
import (
"fmt"
+ "math/rand"
"strconv"
"strings"
@@ -55,13 +56,26 @@ func FindAppTaskGroup(appTaskGroups
[]*interfaces.TaskGroup, groupName string) (
}
// GeneratePlaceholderName creates the placeholder name for a pod, pod name
can not be longer than 63 chars,
-// taskGroup name and appID will be truncated if they go over 20/28 chars
respectively,
-// each taskGroup is assigned with an incremental index starting from 0.
-func GeneratePlaceholderName(taskGroupName, appID string, index int32) string {
- // taskGroup name no longer than 20 chars
+// appID and taskGroupName will be truncated if they go over 28/20 chars
respectively,
+// and each name will be suffixed with a randomly generated 10-character nonce.
+func GeneratePlaceholderName(taskGroupName, appID string) string {
// appID no longer than 28 chars
- // total length no longer than 20 + 28 + 5 + 10 = 63
- return fmt.Sprintf("tg-%.20s-%.28s-%d", taskGroupName, appID, index)
+ // taskGroup name no longer than 20 chars
+ // nonce equal to 10 chars
+ // total length no longer than 28 + 20 + 5 + 10 = 63
+ return fmt.Sprintf("tg-%.28s-%.20s-%s", appID, taskGroupName,
generateNonce(10))
+}
+
+const letterBytes = "abcdefghijklmnopqrstuvwxyz0123456789"
+
+// generateNonce generates an alphanumeric string (i.e. base-36) of the given
length
+func generateNonce(length int) string {
+ buf := make([]byte, length)
+ for i := range buf {
+ // this uses the built-in rand package as it does not need to
be hardened
+ buf[i] = letterBytes[rand.Intn(len(letterBytes))] //nolint:gosec
+ }
+ return string(buf)
}
// GetPlaceholderResourceRequests converts the map of resources requested into
a list of resources for the request
diff --git a/pkg/common/utils/gang_utils_test.go
b/pkg/common/utils/gang_utils_test.go
index 7db36dd2..8096df24 100644
--- a/pkg/common/utils/gang_utils_test.go
+++ b/pkg/common/utils/gang_utils_test.go
@@ -19,7 +19,7 @@
package utils
import (
- "math"
+ "fmt"
"reflect"
"testing"
@@ -73,23 +73,28 @@ func TestFindAppTaskGroup(t *testing.T) {
}
func TestGeneratePlaceholderName(t *testing.T) {
- name := GeneratePlaceholderName("my-group", "app0001", 100)
- assert.Equal(t, name, "tg-my-group-app0001-100")
+ name := GeneratePlaceholderName("my-group", "app0001")
+ prefix := name[0 : len(name)-11]
+ nonce := name[len(name)-10:]
+ assert.Equal(t, prefix, "tg-app0001-my-group")
+ assert.Equal(t, name, fmt.Sprintf("%s-%s", prefix, nonce))
+ assert.Equal(t, len(name), 30)
name = GeneratePlaceholderName("my-group",
- "app00000000000000000000000000000000000000000001", 100)
- assert.Equal(t, name, "tg-my-group-app0000000000000000000000000-100")
- assert.Assert(t, len(name) < 63)
+ "app00000000000000000000000000000000000000000001")
+ prefix = name[0 : len(name)-11]
+ nonce = name[len(name)-10:]
+ assert.Equal(t, prefix, "tg-app0000000000000000000000000-my-group")
+ assert.Equal(t, name, fmt.Sprintf("%s-%s", prefix, nonce))
+ assert.Equal(t, len(name), 51)
name =
GeneratePlaceholderName("a-very-long-task-group-name------------------------------------------",
-
"a-very-long-app-ID-----------------------------------------------------------------",
100)
- assert.Equal(t, name,
"tg-a-very-long-task-gro-a-very-long-app-ID-----------100")
- assert.Assert(t, len(name) < 63)
-
- name =
GeneratePlaceholderName("a-very-long-task-group-name------------------------------------------",
-
"a-very-long-app-ID-----------------------------------------------------------------",
math.MaxInt32)
- assert.Equal(t, name,
"tg-a-very-long-task-gro-a-very-long-app-ID-----------2147483647")
- assert.Assert(t, len(name) == 63)
+
"a-very-long-app-ID-----------------------------------------------------------------")
+ prefix = name[0 : len(name)-11]
+ nonce = name[len(name)-10:]
+ assert.Equal(t, prefix,
"tg-a-very-long-app-ID-----------a-very-long-task-gro")
+ assert.Equal(t, name, fmt.Sprintf("%s-%s", prefix, nonce))
+ assert.Equal(t, len(name), 63)
}
func TestGetSchedulingPolicyParams(t *testing.T) {
diff --git a/test/e2e/framework/helpers/k8s/k8s_utils.go
b/test/e2e/framework/helpers/k8s/k8s_utils.go
index 95e3267e..39af7274 100644
--- a/test/e2e/framework/helpers/k8s/k8s_utils.go
+++ b/test/e2e/framework/helpers/k8s/k8s_utils.go
@@ -965,10 +965,24 @@ func (k *KubeCtl) WaitForJobPodsSucceeded(namespace
string, jobName string, numP
return wait.PollImmediate(time.Millisecond*100, timeout,
k.isNumJobPodsInDesiredState(jobName, namespace, numPods, v1.PodSucceeded))
}
-func (k *KubeCtl) WaitForPlaceholders(namespace string, podPrefix string,
numPods int, timeout time.Duration, podPhase v1.PodPhase) error {
+func (k *KubeCtl) WaitForPlaceholders(namespace string, podPrefix string,
numPods int, timeout time.Duration, podPhase *v1.PodPhase) error {
return wait.PollImmediate(time.Millisecond*100, timeout,
k.isNumPlaceholdersRunning(namespace, podPrefix, numPods, podPhase))
}
+func (k *KubeCtl) ListPlaceholders(namespace string, podPrefix string)
([]v1.Pod, error) {
+ pods := make([]v1.Pod, 0)
+ podList, lstErr := k.ListPods(namespace, "placeholder=true")
+ if lstErr != nil {
+ return pods, lstErr
+ }
+ for _, pod := range podList.Items {
+ if strings.HasPrefix(pod.Name, podPrefix) {
+ pods = append(pods, pod)
+ }
+ }
+ return pods, nil
+}
+
// WaitForPlaceholdersStableState used when the expected state of the
placeholders cannot be properly determined in advance or not needed.
// Returns when the phase of pods does not change three times in a row.
func (k *KubeCtl) WaitForPlaceholdersStableState(namespace string, podPrefix
string, timeout time.Duration) error {
@@ -977,7 +991,7 @@ func (k *KubeCtl) WaitForPlaceholdersStableState(namespace
string, podPrefix str
return wait.PollImmediate(time.Second, timeout,
k.arePlaceholdersStable(namespace, podPrefix, &samePhases, 3, podPhases))
}
-func (k *KubeCtl) isNumPlaceholdersRunning(namespace string, podPrefix string,
num int, podPhase v1.PodPhase) wait.ConditionFunc {
+func (k *KubeCtl) isNumPlaceholdersRunning(namespace string, podPrefix string,
num int, podPhase *v1.PodPhase) wait.ConditionFunc {
return func() (bool, error) {
jobPods, lstErr := k.ListPods(namespace, "placeholder=true")
if lstErr != nil {
@@ -986,7 +1000,7 @@ func (k *KubeCtl) isNumPlaceholdersRunning(namespace
string, podPrefix string, n
var count int
for _, pod := range jobPods.Items {
- if strings.HasPrefix(pod.Name, podPrefix) &&
pod.Status.Phase == podPhase {
+ if strings.HasPrefix(pod.Name, podPrefix) && (podPhase
== nil || *podPhase == pod.Status.Phase) {
count++
}
}
diff --git a/test/e2e/framework/helpers/yunikorn/yk_utils.go
b/test/e2e/framework/helpers/yunikorn/yk_utils.go
index f2004b2e..79956738 100644
--- a/test/e2e/framework/helpers/yunikorn/yk_utils.go
+++ b/test/e2e/framework/helpers/yunikorn/yk_utils.go
@@ -89,21 +89,6 @@ func CreateDefaultConfigMap() *v1.ConfigMap {
return cm
}
-// Generates all placeholder names for all taskGroups using format:
-// [taskGroup]-[taskGroupName]-[applicationID]-[Index]
-func GetPlaceholderNames(ann *k8s.PodAnnotation, appID string)
map[string][]string {
- phMap := make(map[string][]string)
- phName := "tg-%s-%s-%d"
- for _, tg := range ann.TaskGroups {
- phMap[tg.Name] = []string{}
- for i := 0; i < int(tg.MinMember); i++ {
- phMap[tg.Name] = append(phMap[tg.Name],
fmt.Sprintf(phName, tg.Name, appID, i))
- }
- }
-
- return phMap
-}
-
func GetSchedulerPodName(kClient k8s.KubeCtl) (string, error) {
ykNS := configmanager.YuniKornTestConfig.YkNamespace
schedComponent := fmt.Sprintf("component=%s", configmanager.YKScheduler)
diff --git a/test/e2e/gang_scheduling/gang_scheduling_test.go
b/test/e2e/gang_scheduling/gang_scheduling_test.go
index 09d6f696..d3aada44 100644
--- a/test/e2e/gang_scheduling/gang_scheduling_test.go
+++ b/test/e2e/gang_scheduling/gang_scheduling_test.go
@@ -142,13 +142,8 @@ var _ = Describe("", func() {
// Check all placeholders deleted.
By("Wait for all placeholders terminated")
- tgPlaceHolders :=
yunikorn.GetPlaceholderNames(podConf.Annotations,
podConf.Labels["applicationId"])
- for _, phNames := range tgPlaceHolders {
- for _, ph := range phNames {
- phTermErr := kClient.WaitForPodTerminated(ns,
ph, 3*time.Minute)
- Ω(phTermErr).NotTo(HaveOccurred())
- }
- }
+ phTermErr := kClient.WaitForPlaceholders(ns,
"tg-"+podConf.Labels["applicationId"]+"-"+taskGroupName+"-", 0, 3*time.Minute,
nil)
+ Ω(phTermErr).NotTo(gomega.HaveOccurred())
// Check real gang members now running
By("Wait for all gang members running")
@@ -228,18 +223,27 @@ var _ = Describe("", func() {
30)
Ω(timeoutErr).NotTo(HaveOccurred())
+ // Wait for placeholders to become running
+ stateRunning := v1.PodRunning
+ phErr := kClient.WaitForPlaceholders(ns,
"tg-"+podConf.Labels["applicationId"]+"-", 15, 2*time.Minute, &stateRunning)
+ Ω(phErr).NotTo(HaveOccurred())
+
// Check placeholder node distribution is same as real pods'
- tgPlaceHolders :=
yunikorn.GetPlaceholderNames(podConf.Annotations,
podConf.Labels["applicationId"])
+ phPods, phListErr := kClient.ListPods(ns, "placeholder=true")
+ Ω(phListErr).NotTo(HaveOccurred())
taskGroupNodes := map[string]map[string]int{}
- for tg, phNames := range tgPlaceHolders {
- taskGroupNodes[tg] = map[string]int{}
- for _, name := range phNames {
- podRunErr := kClient.WaitForPodRunning(ns,
name, time.Second*120)
- Ω(podRunErr).NotTo(HaveOccurred())
- ph, phErr := kClient.GetPod(name, ns)
- Ω(phErr).NotTo(HaveOccurred())
- taskGroupNodes[tg][ph.Spec.NodeName]++
+ for _, ph := range phPods.Items {
+ tg, ok :=
ph.Annotations[constants.AnnotationTaskGroupName]
+ if !ok {
+ continue
+ }
+ if _, ok = taskGroupNodes[tg]; !ok {
+ taskGroupNodes[tg] = map[string]int{}
}
+ if _, ok = taskGroupNodes[tg][ph.Spec.NodeName]; !ok {
+ taskGroupNodes[tg][ph.Spec.NodeName] = 0
+ }
+ taskGroupNodes[tg][ph.Spec.NodeName]++
}
// Deploy real pods for each taskGroup
@@ -262,12 +266,8 @@ var _ = Describe("", func() {
}
By("Wait for all placeholders terminated")
- for _, phNames := range tgPlaceHolders {
- for _, ph := range phNames {
- phTermErr := kClient.WaitForPodTerminated(ns,
ph, 3*time.Minute)
- Ω(phTermErr).NotTo(HaveOccurred())
- }
- }
+ phTermErr := kClient.WaitForPlaceholders(ns,
"tg-"+podConf.Labels["applicationId"]+"-", 0, 3*time.Minute, nil)
+ Ω(phTermErr).NotTo(HaveOccurred())
// Check real gang members now running on same node distribution
By("Verify task group node distribution")
@@ -635,17 +635,9 @@ var _ = Describe("", func() {
// App1 should have 2/3 placeholders running
podConf.Annotations.TaskGroups[0].MinMember =
int32(appAllocs[apps[1]]["minMembers"])
- app1Phs := yunikorn.GetPlaceholderNames(podConf.Annotations,
apps[1])
- numRunningPhs := 0
- for _, placeholders := range app1Phs {
- for _, ph := range placeholders {
- runErr := kClient.WaitForPodRunning(fifoQName,
ph, 30*time.Second)
- if runErr == nil {
- numRunningPhs++
- }
- }
- }
- Ω(numRunningPhs).Should(BeNumerically("==", 2))
+ statusRunning := v1.PodRunning
+ phErr := kClient.WaitForPlaceholders(fifoQName,
"tg-"+apps[1]+"-", 2, 30*time.Second, &statusRunning)
+ Ω(phErr).NotTo(HaveOccurred())
// Delete app0
deleteErr := kClient.DeleteJob(apps[0], fifoQName)
@@ -757,16 +749,16 @@ var _ = Describe("", func() {
Ω(timeoutErr).NotTo(HaveOccurred())
// Wait for groupa placeholder pods running
- phNames := yunikorn.GetPlaceholderNames(podConf.Annotations,
podConf.Labels["applicationId"])
- for _, ph := range phNames[groupA] {
- runErr := kClient.WaitForPodRunning(ns, ph,
30*time.Second)
- Ω(runErr).NotTo(HaveOccurred())
- }
+ stateRunning := v1.PodRunning
+ runErr := kClient.WaitForPlaceholders(ns,
"tg-"+podConf.Labels["applicationId"]+"-"+groupA+"-", 3, 30*time.Second,
&stateRunning)
+ Ω(runErr).NotTo(HaveOccurred())
// Delete all groupa placeholder pods
- for i, ph := range phNames[groupA] {
- By(fmt.Sprintf("Iteration-%d: Delete placeholder %s",
i, ph))
- deleteErr := kClient.DeletePod(ph, ns)
+ phPods, listErr := kClient.ListPlaceholders(ns,
"tg-"+podConf.Labels["applicationId"]+"-"+groupA+"-")
+ Ω(listErr).NotTo(HaveOccurred())
+ for _, ph := range phPods {
+ By(fmt.Sprintf("Delete placeholder %s", ph.Name))
+ deleteErr := kClient.DeletePod(ph.Name, ns)
Ω(deleteErr).NotTo(HaveOccurred())
}
@@ -863,13 +855,14 @@ var _ = Describe("", func() {
timeoutErr :=
restClient.WaitForAppStateTransition(defaultPartition, nsQueue,
podConf.Labels["applicationId"], yunikorn.States().Application.Accepted, 10)
Ω(timeoutErr).NotTo(HaveOccurred())
- By("Wait for placeholders running")
- phNames := yunikorn.GetPlaceholderNames(podConf.Annotations,
podConf.Labels["applicationId"])
- tgBNames := phNames[podConf.Annotations.TaskGroups[1].Name]
- for _, ph := range tgBNames {
- runErr := kClient.WaitForPodRunning(ns, ph,
30*time.Second)
- Ω(runErr).NotTo(HaveOccurred())
- }
+ By("Wait for groupB placeholders running")
+ stateRunning := v1.PodRunning
+ runErr := kClient.WaitForPlaceholders(ns,
"tg-"+podConf.Labels["applicationId"]+"-"+groupB+"-", 3, 30*time.Second,
&stateRunning)
+ Ω(runErr).NotTo(HaveOccurred())
+
+ By("List placeholders")
+ tgPods, listErr := kClient.ListPlaceholders(ns,
"tg-"+podConf.Labels["applicationId"]+"-")
+ Ω(listErr).NotTo(HaveOccurred())
By("Delete job pods")
deleteErr := kClient.DeleteJob(jobConf.Name, ns)
@@ -878,11 +871,9 @@ var _ = Describe("", func() {
Ω(timeoutErr).NotTo(HaveOccurred())
By("Verify placeholders deleted")
- for _, placeholders := range phNames {
- for _, ph := range placeholders {
- deleteErr = kClient.WaitForPodTerminated(ns,
ph, 30*time.Second)
- Ω(deleteErr).NotTo(HaveOccurred(), "Placeholder
%s still running", ph)
- }
+ for _, ph := range tgPods {
+ deleteErr = kClient.WaitForPodTerminated(ns, ph.Name,
30*time.Second)
+ Ω(deleteErr).NotTo(HaveOccurred(), "Placeholder %s
still running", ph)
}
By("Verify app allocation is empty")
@@ -944,24 +935,23 @@ var _ = Describe("", func() {
timeoutErr :=
restClient.WaitForAppStateTransition(defaultPartition, nsQueue,
podConf.Labels["applicationId"], yunikorn.States().Application.Accepted, 20)
Ω(timeoutErr).NotTo(HaveOccurred())
- By("Wait for placeholders running")
- phNames := yunikorn.GetPlaceholderNames(podConf.Annotations,
podConf.Labels["applicationId"])
- tgBNames := phNames[podConf.Annotations.TaskGroups[1].Name]
- for _, ph := range tgBNames {
- runErr := kClient.WaitForPodRunning(ns, ph,
30*time.Second)
- Ω(runErr).NotTo(HaveOccurred())
- }
+ By("Wait for groupB placeholders running")
+ stateRunning := v1.PodRunning
+ runErr := kClient.WaitForPlaceholders(ns,
"tg-"+podConf.Labels["applicationId"]+"-"+groupB+"-", 3, 30*time.Second,
&stateRunning)
+ Ω(runErr).NotTo(HaveOccurred())
+
+ By("List placeholders")
+ tgPods, listErr := kClient.ListPlaceholders(ns,
"tg-"+podConf.Labels["applicationId"]+"-")
+ Ω(listErr).NotTo(HaveOccurred())
By("Delete originator pod")
deleteErr := kClient.DeletePod(originator.Name, ns)
Ω(deleteErr).NotTo(HaveOccurred())
By("Verify placeholders deleted")
- for _, placeholders := range phNames {
- for _, ph := range placeholders {
- deleteErr = kClient.WaitForPodTerminated(ns,
ph, 30*time.Second)
- Ω(deleteErr).NotTo(HaveOccurred(), "Placeholder
%s still running", ph)
- }
+ for _, ph := range tgPods {
+ deleteErr = kClient.WaitForPodTerminated(ns, ph.Name,
30*time.Second)
+ Ω(deleteErr).NotTo(HaveOccurred(), "Placeholder %s
still running", ph)
}
By("Verify app allocation is empty")
@@ -1052,24 +1042,23 @@ var _ = Describe("", func() {
timeoutErr :=
restClient.WaitForAppStateTransition(defaultPartition, nsQueue,
podConf.Labels["applicationId"], yunikorn.States().Application.Accepted, 20)
Ω(timeoutErr).NotTo(HaveOccurred())
- By("Wait for placeholders running")
- phNames := yunikorn.GetPlaceholderNames(podConf.Annotations,
podConf.Labels["applicationId"])
- tgBNames := phNames[podConf.Annotations.TaskGroups[1].Name]
- for _, ph := range tgBNames {
- runErr := kClient.WaitForPodRunning(ns, ph,
30*time.Second)
- Ω(runErr).NotTo(HaveOccurred())
- }
+ By("Wait for groupB placeholders running")
+ stateRunning := v1.PodRunning
+ runErr := kClient.WaitForPlaceholders(ns,
"tg-"+podConf.Labels["applicationId"]+"-"+groupB+"-", 3, 30*time.Second,
&stateRunning)
+ Ω(runErr).NotTo(HaveOccurred())
+
+ By("List placeholders")
+ tgPods, listErr := kClient.ListPlaceholders(ns,
"tg-"+podConf.Labels["applicationId"]+"-")
+ Ω(listErr).NotTo(HaveOccurred())
By("Delete originator pod")
deleteErr := kClient.DeletePod(originator.Name, ns)
Ω(deleteErr).NotTo(HaveOccurred())
By("Verify placeholders deleted")
- for _, placeholders := range phNames {
- for _, ph := range placeholders {
- deleteErr = kClient.WaitForPodTerminated(ns,
ph, 30*time.Second)
- Ω(deleteErr).NotTo(HaveOccurred(), "Placeholder
%s still running", ph)
- }
+ for _, ph := range tgPods {
+ deleteErr = kClient.WaitForPodTerminated(ns, ph.Name,
30*time.Second)
+ Ω(deleteErr).NotTo(HaveOccurred(), "Placeholder %s
still running", ph)
}
By("Verify app allocation is empty")
@@ -1078,108 +1067,6 @@ var _ = Describe("", func() {
Ω(len(appInfo.Allocations)).To(BeNumerically("==", 0))
})
- ginkgo.DescribeTable("", func(annotations k8s.PodAnnotation) {
- podConf := k8s.TestPodConfig{
- Labels: map[string]string{
- "app": "sleep-" + common.RandSeq(5),
- "applicationId": "appid-" + common.RandSeq(5),
- },
- Annotations: &annotations,
- Resources: &v1.ResourceRequirements{
- Requests: v1.ResourceList{
- "cpu": resource.MustParse("10m"),
- "memory": resource.MustParse("10M"),
- },
- },
- }
- jobConf := k8s.JobConfig{
- Name: "gangjob-" + common.RandSeq(5),
- Namespace: ns,
- Parallelism: int32(2),
- PodConfig: podConf,
- }
-
- job, jobErr := k8s.InitJobConfig(jobConf)
- Ω(jobErr).NotTo(HaveOccurred())
-
- // Deploy job
- taskGroupsMap, annErr :=
k8s.PodAnnotationToMap(podConf.Annotations)
- Ω(annErr).NotTo(HaveOccurred())
- By(fmt.Sprintf("Deploy job %s with task-groups: %+v",
jobConf.Name, taskGroupsMap[k8s.TaskGroups]))
- _, jobErr = kClient.CreateJob(job, ns)
- Ω(jobErr).NotTo(HaveOccurred())
- createErr := kClient.WaitForJobPodsCreated(ns, job.Name,
int(*job.Spec.Parallelism), 30*time.Second)
- Ω(createErr).NotTo(HaveOccurred())
-
- // Validate placeholders deleted.
- tgPlaceHolders :=
yunikorn.GetPlaceholderNames(podConf.Annotations,
podConf.Labels["applicationId"])
- for _, phNames := range tgPlaceHolders {
- for _, name := range phNames {
- phErr := kClient.WaitForPodTerminated(ns, name,
time.Minute)
- Ω(phErr).NotTo(HaveOccurred())
- }
- }
-
- // Validate incorrect task-group definition ignored
- timeoutErr := kClient.WaitForJobPods(ns, jobConf.Name,
int(jobConf.Parallelism), 30*time.Second)
- Ω(timeoutErr).NotTo(HaveOccurred())
- appPods, getErr := kClient.ListPods(ns,
fmt.Sprintf("applicationId=%s", podConf.Labels["applicationId"]))
- Ω(getErr).NotTo(HaveOccurred())
- Ω(len(appPods.Items)).To(BeNumerically("==",
jobConf.Parallelism))
- },
- ginkgo.Entry("Verify_TG_With_Duplicate_Group",
k8s.PodAnnotation{
- TaskGroups: []interfaces.TaskGroup{
- {
- Name: "groupdup",
- MinMember: int32(3),
- MinResource:
map[string]resource.Quantity{
- "cpu":
resource.MustParse("10m"),
- "memory":
resource.MustParse("10M"),
- },
- },
- {
- Name: "groupdup",
- MinMember: int32(5),
- MinResource:
map[string]resource.Quantity{
- "cpu":
resource.MustParse("10m"),
- "memory":
resource.MustParse("10M"),
- },
- },
- {
- Name: groupA,
- MinMember: int32(7),
- MinResource:
map[string]resource.Quantity{
- "cpu":
resource.MustParse("10m"),
- "memory":
resource.MustParse("10M"),
- },
- },
- },
- }),
- ginkgo.Entry("Verify_TG_With_Invalid_Chars", k8s.PodAnnotation{
- TaskGroups: []interfaces.TaskGroup{
- {
- Name: "GROUPCAPS",
- MinMember: int32(3),
- MinResource:
map[string]resource.Quantity{
- "cpu":
resource.MustParse("10m"),
- "memory":
resource.MustParse("10M"),
- },
- },
- },
- }),
- ginkgo.Entry("Verify_TG_With_Invalid_MinMember",
k8s.PodAnnotation{
- TaskGroups: []interfaces.TaskGroup{
- {
- Name: groupA,
- MinMember: int32(-1),
- MinResource:
map[string]resource.Quantity{
- "cpu":
resource.MustParse("10m"),
- "memory":
resource.MustParse("10M"),
- },
- },
- },
- }),
- )
AfterEach(func() {
testDescription := ginkgo.CurrentSpecReport()
if testDescription.Failed() {
diff --git a/test/e2e/recovery_and_restart/recovery_and_restart_test.go
b/test/e2e/recovery_and_restart/recovery_and_restart_test.go
index c9d0fcdd..b31d50a3 100644
--- a/test/e2e/recovery_and_restart/recovery_and_restart_test.go
+++ b/test/e2e/recovery_and_restart/recovery_and_restart_test.go
@@ -39,10 +39,8 @@ const (
normalSleepJobPrefix = "normal-sleep-job"
taskGroupA = "groupa"
taskGroupB = "groupb"
- taskGroupAprefix = "tg-" + taskGroupA + "-" + gangSleepJobPrefix
- taskGroupBprefix = "tg-" + taskGroupB + "-" + gangSleepJobPrefix
taskGroupE2E = "e2e-task-group"
- taskGroupE2EPrefix = "tg-" + taskGroupE2E
+ taskGroupE2EPrefix = "tg-" + gangSleepJobPrefix + "-" + taskGroupE2E
parallelism = 3
taintKey = "e2e_test"
)
@@ -216,11 +214,15 @@ var _ = ginkgo.Describe("", func() {
Ω(createErr).NotTo(gomega.HaveOccurred())
ginkgo.By("Waiting for placeholders in task group A (expected
state: Running)")
- err = kClient.WaitForPlaceholders(dev, taskGroupAprefix,
parallelism, 30*time.Second, v1.PodRunning)
+ groupAPrefix := "tg-" + appID + "-" + taskGroupA + "-"
+ stateRunning := v1.PodRunning
+ err = kClient.WaitForPlaceholders(dev, groupAPrefix,
parallelism, 30*time.Second, &stateRunning)
Ω(err).NotTo(gomega.HaveOccurred())
ginkgo.By("Waiting for placeholders in task group B (expected
state: Pending)")
- err = kClient.WaitForPlaceholders(dev, taskGroupBprefix,
parallelism+1, 30*time.Second, v1.PodPending)
+ groupBPrefix := "tg-" + appID + "-" + taskGroupB + "-"
+ statePending := v1.PodPending
+ err = kClient.WaitForPlaceholders(dev, groupBPrefix,
parallelism+1, 30*time.Second, &statePending)
Ω(err).NotTo(gomega.HaveOccurred())
ginkgo.By("Restart the scheduler pod")
@@ -252,12 +254,12 @@ var _ = ginkgo.Describe("", func() {
podPhase := pod.Status.Phase
podName := pod.GetName()
fmt.Fprintf(ginkgo.GinkgoWriter, "Pod name:
%-40s\tStatus: %s\n", podName, podPhase)
- if strings.HasPrefix(podName, taskGroupAprefix) {
+ if strings.HasPrefix(podName, groupAPrefix) {
groupAPlaceholderCount++
Ω(podPhase).To(gomega.Equal(v1.PodRunning))
continue
}
- if strings.HasPrefix(podName, taskGroupBprefix) {
+ if strings.HasPrefix(podName, groupBPrefix) {
groupBPlaceholderCount++
Ω(podPhase).To(gomega.Equal(v1.PodPending))
continue
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]