This is an automated email from the ASF dual-hosted git repository.
ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push:
new 3e59ddcd [YUNIKORN-2309] Add pod status updater logic to the
MockScheduler performance test (#760)
3e59ddcd is described below
commit 3e59ddcdd917c292635c6be0ddb8170356ae6511
Author: Peter Bacsko <[email protected]>
AuthorDate: Fri Jan 5 11:42:37 2024 -0600
[YUNIKORN-2309] Add pod status updater logic to the MockScheduler
performance test (#760)
Closes: #760
Signed-off-by: Craig Condit <[email protected]>
---
pkg/client/apifactory_mock.go | 4 ++
pkg/client/kubeclient_mock.go | 75 +++++++++++-------------
pkg/shim/scheduler_mock_test.go | 4 ++
pkg/shim/scheduler_perf_test.go | 122 ++++++++++++++++++++++++++++++++--------
4 files changed, 139 insertions(+), 66 deletions(-)
diff --git a/pkg/client/apifactory_mock.go b/pkg/client/apifactory_mock.go
index ae35e7fd..827d2626 100644
--- a/pkg/client/apifactory_mock.go
+++ b/pkg/client/apifactory_mock.go
@@ -410,6 +410,10 @@ func (m *MockedAPIProvider) GetPodBindStats() BindStats {
return m.clients.KubeClient.(*KubeClientMock).GetBindStats()
}
+func (m *MockedAPIProvider) GetBoundPods(clear bool) []BoundPod {
+ return m.clients.KubeClient.(*KubeClientMock).GetBoundPods(clear)
+}
+
// MockedPersistentVolumeInformer implements PersistentVolumeInformer interface
type MockedPersistentVolumeInformer struct{}
diff --git a/pkg/client/kubeclient_mock.go b/pkg/client/kubeclient_mock.go
index f4df7356..bc890df2 100644
--- a/pkg/client/kubeclient_mock.go
+++ b/pkg/client/kubeclient_mock.go
@@ -43,39 +43,23 @@ type KubeClientMock struct {
clientSet kubernetes.Interface
pods map[string]*v1.Pod
lock sync.RWMutex
- bindStats *BindStats
+ bindStats BindStats
+ boundPods []BoundPod
}
// BindStats statistics about KubeClientMock.Bind() calls
type BindStats struct {
- First time.Time
- Last time.Time
- FirstPod *v1.Pod
- LastPod *v1.Pod
- Success int64
- Errors int64
- HostBindings []HostBinding
-}
-
-type HostBinding struct {
- pod *v1.Pod
- host string
- time time.Time
-}
-
-func (b *BindStats) copy() BindStats {
- bindings := make([]HostBinding, len(b.HostBindings))
- copy(bindings, b.HostBindings)
-
- return BindStats{
- First: b.First,
- Last: b.Last,
- FirstPod: b.FirstPod,
- LastPod: b.LastPod,
- Success: b.Success,
- Errors: b.Errors,
- HostBindings: bindings,
- }
+ First time.Time
+ Last time.Time
+ FirstPod string
+ LastPod string
+ Success int64
+ Errors int64
+}
+
+type BoundPod struct {
+ Pod string
+ Host string
}
func NewKubeClientMock(err bool) *KubeClientMock {
@@ -124,13 +108,13 @@ func NewKubeClientMock(err bool) *KubeClientMock {
clientSet: fake.NewSimpleClientset(),
pods: make(map[string]*v1.Pod),
lock: sync.RWMutex{},
- bindStats: &BindStats{
- HostBindings: make([]HostBinding, 0, 1024),
- },
+ boundPods: make([]BoundPod, 0, 1024),
}
kubeMock.bindFn = func(pod *v1.Pod, hostID string) error {
- stats := kubeMock.bindStats
+ // kubeMock must be locked for this
+ stats := &kubeMock.bindStats
+
if err {
stats.Errors++
return fmt.Errorf("binding error")
@@ -139,16 +123,15 @@ func NewKubeClientMock(err bool) *KubeClientMock {
zap.String("PodName", pod.Name))
now := time.Now()
- if stats.FirstPod == nil {
- stats.FirstPod = pod
+ if stats.FirstPod == "" {
+ stats.FirstPod = pod.Name
stats.First = now
}
stats.Last = now
- stats.LastPod = pod
- stats.HostBindings = append(stats.HostBindings, HostBinding{
- pod: pod,
- time: now,
- host: hostID,
+ stats.LastPod = pod.Name
+ kubeMock.boundPods = append(kubeMock.boundPods, BoundPod{
+ Pod: pod.Name,
+ Host: hostID,
})
stats.Success++
@@ -236,7 +219,17 @@ func (c *KubeClientMock) GetConfigMap(namespace string,
name string) (*v1.Config
func (c *KubeClientMock) GetBindStats() BindStats {
c.lock.RLock()
defer c.lock.RUnlock()
- return c.bindStats.copy()
+ return c.bindStats
+}
+
+func (c *KubeClientMock) GetBoundPods(clear bool) []BoundPod {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+ boundPods := c.boundPods
+ if clear {
+ c.boundPods = nil
+ }
+ return boundPods
}
func getPodKey(pod *v1.Pod) string {
diff --git a/pkg/shim/scheduler_mock_test.go b/pkg/shim/scheduler_mock_test.go
index d267e5ee..54c2f75b 100644
--- a/pkg/shim/scheduler_mock_test.go
+++ b/pkg/shim/scheduler_mock_test.go
@@ -301,6 +301,10 @@ func (fc *MockScheduler) GetPodBindStats()
client.BindStats {
return fc.apiProvider.GetPodBindStats()
}
+func (fc *MockScheduler) GetBoundPods(clear bool) []client.BoundPod {
+ return fc.apiProvider.GetBoundPods(clear)
+}
+
func (fc *MockScheduler) ensureStarted() {
if !fc.started.Load() {
panic("mock scheduler is not started - call start() first")
diff --git a/pkg/shim/scheduler_perf_test.go b/pkg/shim/scheduler_perf_test.go
index 9c1afcae..940a1b2c 100644
--- a/pkg/shim/scheduler_perf_test.go
+++ b/pkg/shim/scheduler_perf_test.go
@@ -27,6 +27,7 @@ import (
"strconv"
"strings"
"sync"
+ "sync/atomic"
"testing"
"time"
@@ -59,7 +60,7 @@ partitions:
profileCpu = true
profileHeap = true
numNodes = 5000
- totalPods = int64(50_000)
+ totalPods = uint64(50_000)
nodeCpuMilli = int64(16_000)
nodeMemGiB = int64(16)
nodeNumPods = int64(110)
@@ -117,11 +118,20 @@ func BenchmarkSchedulingThroughPut(b *testing.B) {
assert.NilError(b, err, "node initialization did not finish in time")
// add pods, begin collecting allocation metrics & wait until all pods
are bound
- addPodsToCluster(cluster)
+ pods := addPodsToCluster(cluster)
collector := &metricsCollector{}
go collector.collectData()
+
+ // update bound pods to Running then to Succeeded
+ ps := newPodStatusUpdater(cluster, pods)
+ go ps.updatePodStatus()
+ defer ps.stop()
+
+ // await binding of pods
err = wait.PollUntilContextTimeout(context.Background(), time.Second,
time.Second*60, true, func(ctx context.Context) (done bool, err error) {
- return cluster.GetPodBindStats().Success == totalPods, nil
+ c := ps.getCompletedPodsCount()
+ fmt.Printf("Number of completed pods: %d\n", c)
+ return c == totalPods, nil
})
assert.NilError(b, err, "scheduling did not finish in time")
@@ -136,28 +146,26 @@ func BenchmarkSchedulingThroughPut(b *testing.B) {
}
}
-func addPodsToCluster(cluster *MockScheduler) {
+func addPodsToCluster(cluster *MockScheduler) map[string]*v1.Pod {
+ podsMap := make(map[string]*v1.Pod, totalPods)
+
+ var queues [5]string
+ queues[0] = "root.a"
+ queues[1] = "root.b"
+ queues[2] = "root.c"
+ queues[3] = "root.d"
+ queues[4] = "root.e"
+
// make sure that total number of pods == totalPods
- pods := getTestPods(80, 125, "root.a")
- for _, pod := range pods {
- cluster.AddPod(pod)
- }
- pods = getTestPods(80, 125, "root.b")
- for _, pod := range pods {
- cluster.AddPod(pod)
- }
- pods = getTestPods(80, 125, "root.c")
- for _, pod := range pods {
- cluster.AddPod(pod)
- }
- pods = getTestPods(80, 125, "root.d")
- for _, pod := range pods {
- cluster.AddPod(pod)
- }
- pods = getTestPods(80, 125, "root.e")
- for _, pod := range pods {
- cluster.AddPod(pod)
+ for _, queue := range queues {
+ pods := getTestPods(80, 125, queue)
+ for _, pod := range pods {
+ cluster.AddPod(pod)
+ podsMap[pod.Name] = pod
+ }
}
+
+ return podsMap
}
type metricsCollector struct {
@@ -210,6 +218,66 @@ func (m *metricsCollector) getData() []float64 {
return data
}
+func newPodStatusUpdater(cluster *MockScheduler, pods map[string]*v1.Pod)
*podStatusUpdater {
+ ps := &podStatusUpdater{
+ stopCh: make(chan struct{}),
+ cluster: cluster,
+ }
+ podsCopy := make(map[string]*v1.Pod, len(pods))
+ for name, pod := range pods {
+ podsCopy[name] = pod
+ }
+ ps.pods = podsCopy
+
+ return ps
+}
+
+// updates the status of bound pods to Running then to Completed
+type podStatusUpdater struct {
+ completedPods atomic.Uint64
+ stopCh chan struct{}
+ cluster *MockScheduler
+ pods map[string]*v1.Pod
+}
+
+func (p *podStatusUpdater) updatePodStatus() {
+ for {
+ select {
+ case <-time.After(time.Second):
+ boundPods := p.cluster.GetBoundPods(true)
+ for _, boundPod := range boundPods {
+ podName := boundPod.Pod
+
+ if pod, ok := p.pods[podName]; ok {
+ podRunning := pod.DeepCopy()
+ // update to Running
+ podRunning.Status.Phase = v1.PodRunning
+ p.cluster.UpdatePod(pod, podRunning)
+
+ podSucceeded := pod.DeepCopy()
+ // update to Completed
+ podSucceeded.Status.Phase =
v1.PodSucceeded
+ p.cluster.UpdatePod(podRunning,
podSucceeded)
+
+ p.completedPods.Add(1)
+ continue
+ }
+ panic("BUG: test pod not found: " + podName)
+ }
+ case <-p.stopCh:
+ return
+ }
+ }
+}
+
+func (p *podStatusUpdater) getCompletedPodsCount() uint64 {
+ return p.completedPods.Load()
+}
+
+func (p *podStatusUpdater) stop() {
+ close(p.stopCh)
+}
+
func getTestPods(noApps, noTasksPerApp int, queue string) []*v1.Pod {
podCount := noApps * noTasksPerApp
pods := make([]*v1.Pod, 0, podCount)
@@ -221,14 +289,15 @@ func getTestPods(noApps, noTasksPerApp int, queue string)
[]*v1.Pod {
appId := "app000" + strconv.Itoa(i) + "-" +
strconv.FormatInt(time.Now().UnixMilli(), 10)
for j := 0; j < noTasksPerApp; j++ {
taskName := "task000" + strconv.Itoa(j)
+ podName := appId + "-" + taskName
pod := &v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
- Name: taskName,
- UID: types.UID("UID-" + appId + "-" +
taskName),
+ Name: podName,
+ UID: types.UID("UID-" + podName),
Annotations: map[string]string{
constants.AnnotationApplicationID: appId,
constants.AnnotationQueueName:
queue,
@@ -245,6 +314,9 @@ func getTestPods(noApps, noTasksPerApp int, queue string)
[]*v1.Pod {
},
SchedulerName: constants.SchedulerName,
},
+ Status: v1.PodStatus{
+ Phase: v1.PodPending,
+ },
}
pods = append(pods, pod)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]