This is an automated email from the ASF dual-hosted git repository.

ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e59ddcd [YUNIKORN-2309] Add pod status updater logic to the 
MockScheduler performance test (#760)
3e59ddcd is described below

commit 3e59ddcdd917c292635c6be0ddb8170356ae6511
Author: Peter Bacsko <[email protected]>
AuthorDate: Fri Jan 5 11:42:37 2024 -0600

    [YUNIKORN-2309] Add pod status updater logic to the MockScheduler 
performance test (#760)
    
    Closes: #760
    
    Signed-off-by: Craig Condit <[email protected]>
---
 pkg/client/apifactory_mock.go   |   4 ++
 pkg/client/kubeclient_mock.go   |  75 +++++++++++-------------
 pkg/shim/scheduler_mock_test.go |   4 ++
 pkg/shim/scheduler_perf_test.go | 122 ++++++++++++++++++++++++++++++++--------
 4 files changed, 139 insertions(+), 66 deletions(-)

diff --git a/pkg/client/apifactory_mock.go b/pkg/client/apifactory_mock.go
index ae35e7fd..827d2626 100644
--- a/pkg/client/apifactory_mock.go
+++ b/pkg/client/apifactory_mock.go
@@ -410,6 +410,10 @@ func (m *MockedAPIProvider) GetPodBindStats() BindStats {
        return m.clients.KubeClient.(*KubeClientMock).GetBindStats()
 }
 
+func (m *MockedAPIProvider) GetBoundPods(clear bool) []BoundPod {
+       return m.clients.KubeClient.(*KubeClientMock).GetBoundPods(clear)
+}
+
 // MockedPersistentVolumeInformer implements PersistentVolumeInformer interface
 type MockedPersistentVolumeInformer struct{}
 
diff --git a/pkg/client/kubeclient_mock.go b/pkg/client/kubeclient_mock.go
index f4df7356..bc890df2 100644
--- a/pkg/client/kubeclient_mock.go
+++ b/pkg/client/kubeclient_mock.go
@@ -43,39 +43,23 @@ type KubeClientMock struct {
        clientSet      kubernetes.Interface
        pods           map[string]*v1.Pod
        lock           sync.RWMutex
-       bindStats      *BindStats
+       bindStats      BindStats
+       boundPods      []BoundPod
 }
 
 // BindStats statistics about KubeClientMock.Bind() calls
 type BindStats struct {
-       First        time.Time
-       Last         time.Time
-       FirstPod     *v1.Pod
-       LastPod      *v1.Pod
-       Success      int64
-       Errors       int64
-       HostBindings []HostBinding
-}
-
-type HostBinding struct {
-       pod  *v1.Pod
-       host string
-       time time.Time
-}
-
-func (b *BindStats) copy() BindStats {
-       bindings := make([]HostBinding, len(b.HostBindings))
-       copy(bindings, b.HostBindings)
-
-       return BindStats{
-               First:        b.First,
-               Last:         b.Last,
-               FirstPod:     b.FirstPod,
-               LastPod:      b.LastPod,
-               Success:      b.Success,
-               Errors:       b.Errors,
-               HostBindings: bindings,
-       }
+       First    time.Time
+       Last     time.Time
+       FirstPod string
+       LastPod  string
+       Success  int64
+       Errors   int64
+}
+
+type BoundPod struct {
+       Pod  string
+       Host string
 }
 
 func NewKubeClientMock(err bool) *KubeClientMock {
@@ -124,13 +108,13 @@ func NewKubeClientMock(err bool) *KubeClientMock {
                clientSet: fake.NewSimpleClientset(),
                pods:      make(map[string]*v1.Pod),
                lock:      sync.RWMutex{},
-               bindStats: &BindStats{
-                       HostBindings: make([]HostBinding, 0, 1024),
-               },
+               boundPods: make([]BoundPod, 0, 1024),
        }
 
        kubeMock.bindFn = func(pod *v1.Pod, hostID string) error {
-               stats := kubeMock.bindStats
+               // kubeMock must be locked for this
+               stats := &kubeMock.bindStats
+
                if err {
                        stats.Errors++
                        return fmt.Errorf("binding error")
@@ -139,16 +123,15 @@ func NewKubeClientMock(err bool) *KubeClientMock {
                        zap.String("PodName", pod.Name))
 
                now := time.Now()
-               if stats.FirstPod == nil {
-                       stats.FirstPod = pod
+               if stats.FirstPod == "" {
+                       stats.FirstPod = pod.Name
                        stats.First = now
                }
                stats.Last = now
-               stats.LastPod = pod
-               stats.HostBindings = append(stats.HostBindings, HostBinding{
-                       pod:  pod,
-                       time: now,
-                       host: hostID,
+               stats.LastPod = pod.Name
+               kubeMock.boundPods = append(kubeMock.boundPods, BoundPod{
+                       Pod:  pod.Name,
+                       Host: hostID,
                })
                stats.Success++
 
@@ -236,7 +219,17 @@ func (c *KubeClientMock) GetConfigMap(namespace string, 
name string) (*v1.Config
 func (c *KubeClientMock) GetBindStats() BindStats {
        c.lock.RLock()
        defer c.lock.RUnlock()
-       return c.bindStats.copy()
+       return c.bindStats
+}
+
+func (c *KubeClientMock) GetBoundPods(clear bool) []BoundPod {
+       c.lock.Lock()
+       defer c.lock.Unlock()
+       boundPods := c.boundPods
+       if clear {
+               c.boundPods = nil
+       }
+       return boundPods
 }
 
 func getPodKey(pod *v1.Pod) string {
diff --git a/pkg/shim/scheduler_mock_test.go b/pkg/shim/scheduler_mock_test.go
index d267e5ee..54c2f75b 100644
--- a/pkg/shim/scheduler_mock_test.go
+++ b/pkg/shim/scheduler_mock_test.go
@@ -301,6 +301,10 @@ func (fc *MockScheduler) GetPodBindStats() 
client.BindStats {
        return fc.apiProvider.GetPodBindStats()
 }
 
+func (fc *MockScheduler) GetBoundPods(clear bool) []client.BoundPod {
+       return fc.apiProvider.GetBoundPods(clear)
+}
+
 func (fc *MockScheduler) ensureStarted() {
        if !fc.started.Load() {
                panic("mock scheduler is not started - call start() first")
diff --git a/pkg/shim/scheduler_perf_test.go b/pkg/shim/scheduler_perf_test.go
index 9c1afcae..940a1b2c 100644
--- a/pkg/shim/scheduler_perf_test.go
+++ b/pkg/shim/scheduler_perf_test.go
@@ -27,6 +27,7 @@ import (
        "strconv"
        "strings"
        "sync"
+       "sync/atomic"
        "testing"
        "time"
 
@@ -59,7 +60,7 @@ partitions:
        profileCpu      = true
        profileHeap     = true
        numNodes        = 5000
-       totalPods       = int64(50_000)
+       totalPods       = uint64(50_000)
        nodeCpuMilli    = int64(16_000)
        nodeMemGiB      = int64(16)
        nodeNumPods     = int64(110)
@@ -117,11 +118,20 @@ func BenchmarkSchedulingThroughPut(b *testing.B) {
        assert.NilError(b, err, "node initialization did not finish in time")
 
        // add pods, begin collecting allocation metrics & wait until all pods 
are bound
-       addPodsToCluster(cluster)
+       pods := addPodsToCluster(cluster)
        collector := &metricsCollector{}
        go collector.collectData()
+
+       // update bound pods to Running then to Succeeded
+       ps := newPodStatusUpdater(cluster, pods)
+       go ps.updatePodStatus()
+       defer ps.stop()
+
+       // await binding of pods
        err = wait.PollUntilContextTimeout(context.Background(), time.Second, 
time.Second*60, true, func(ctx context.Context) (done bool, err error) {
-               return cluster.GetPodBindStats().Success == totalPods, nil
+               c := ps.getCompletedPodsCount()
+               fmt.Printf("Number of completed pods: %d\n", c)
+               return c == totalPods, nil
        })
        assert.NilError(b, err, "scheduling did not finish in time")
 
@@ -136,28 +146,26 @@ func BenchmarkSchedulingThroughPut(b *testing.B) {
        }
 }
 
-func addPodsToCluster(cluster *MockScheduler) {
+func addPodsToCluster(cluster *MockScheduler) map[string]*v1.Pod {
+       podsMap := make(map[string]*v1.Pod, totalPods)
+
+       var queues [5]string
+       queues[0] = "root.a"
+       queues[1] = "root.b"
+       queues[2] = "root.c"
+       queues[3] = "root.d"
+       queues[4] = "root.e"
+
        // make sure that total number of pods == totalPods
-       pods := getTestPods(80, 125, "root.a")
-       for _, pod := range pods {
-               cluster.AddPod(pod)
-       }
-       pods = getTestPods(80, 125, "root.b")
-       for _, pod := range pods {
-               cluster.AddPod(pod)
-       }
-       pods = getTestPods(80, 125, "root.c")
-       for _, pod := range pods {
-               cluster.AddPod(pod)
-       }
-       pods = getTestPods(80, 125, "root.d")
-       for _, pod := range pods {
-               cluster.AddPod(pod)
-       }
-       pods = getTestPods(80, 125, "root.e")
-       for _, pod := range pods {
-               cluster.AddPod(pod)
+       for _, queue := range queues {
+               pods := getTestPods(80, 125, queue)
+               for _, pod := range pods {
+                       cluster.AddPod(pod)
+                       podsMap[pod.Name] = pod
+               }
        }
+
+       return podsMap
 }
 
 type metricsCollector struct {
@@ -210,6 +218,66 @@ func (m *metricsCollector) getData() []float64 {
        return data
 }
 
+func newPodStatusUpdater(cluster *MockScheduler, pods map[string]*v1.Pod) 
*podStatusUpdater {
+       ps := &podStatusUpdater{
+               stopCh:  make(chan struct{}),
+               cluster: cluster,
+       }
+       podsCopy := make(map[string]*v1.Pod, len(pods))
+       for name, pod := range pods {
+               podsCopy[name] = pod
+       }
+       ps.pods = podsCopy
+
+       return ps
+}
+
+// updates the status of bound pods to Running then to Completed
+type podStatusUpdater struct {
+       completedPods atomic.Uint64
+       stopCh        chan struct{}
+       cluster       *MockScheduler
+       pods          map[string]*v1.Pod
+}
+
+func (p *podStatusUpdater) updatePodStatus() {
+       for {
+               select {
+               case <-time.After(time.Second):
+                       boundPods := p.cluster.GetBoundPods(true)
+                       for _, boundPod := range boundPods {
+                               podName := boundPod.Pod
+
+                               if pod, ok := p.pods[podName]; ok {
+                                       podRunning := pod.DeepCopy()
+                                       // update to Running
+                                       podRunning.Status.Phase = v1.PodRunning
+                                       p.cluster.UpdatePod(pod, podRunning)
+
+                                       podSucceeded := pod.DeepCopy()
+                                       // update to Completed
+                                       podSucceeded.Status.Phase = 
v1.PodSucceeded
+                                       p.cluster.UpdatePod(podRunning, 
podSucceeded)
+
+                                       p.completedPods.Add(1)
+                                       continue
+                               }
+                               panic("BUG: test pod not found: " + podName)
+                       }
+               case <-p.stopCh:
+                       return
+               }
+       }
+}
+
+func (p *podStatusUpdater) getCompletedPodsCount() uint64 {
+       return p.completedPods.Load()
+}
+
+func (p *podStatusUpdater) stop() {
+       close(p.stopCh)
+}
+
 func getTestPods(noApps, noTasksPerApp int, queue string) []*v1.Pod {
        podCount := noApps * noTasksPerApp
        pods := make([]*v1.Pod, 0, podCount)
@@ -221,14 +289,15 @@ func getTestPods(noApps, noTasksPerApp int, queue string) 
[]*v1.Pod {
                appId := "app000" + strconv.Itoa(i) + "-" + 
strconv.FormatInt(time.Now().UnixMilli(), 10)
                for j := 0; j < noTasksPerApp; j++ {
                        taskName := "task000" + strconv.Itoa(j)
+                       podName := appId + "-" + taskName
                        pod := &v1.Pod{
                                TypeMeta: metav1.TypeMeta{
                                        Kind:       "Pod",
                                        APIVersion: "v1",
                                },
                                ObjectMeta: metav1.ObjectMeta{
-                                       Name: taskName,
-                                       UID:  types.UID("UID-" + appId + "-" + 
taskName),
+                                       Name: podName,
+                                       UID:  types.UID("UID-" + podName),
                                        Annotations: map[string]string{
                                                
constants.AnnotationApplicationID: appId,
                                                constants.AnnotationQueueName:  
   queue,
@@ -245,6 +314,9 @@ func getTestPods(noApps, noTasksPerApp int, queue string) 
[]*v1.Pod {
                                        },
                                        SchedulerName: constants.SchedulerName,
                                },
+                               Status: v1.PodStatus{
+                                       Phase: v1.PodPending,
+                               },
                        }
                        pods = append(pods, pod)
                }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to