This is an automated email from the ASF dual-hosted git repository.

wilfreds pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git


The following commit(s) were added to refs/heads/master by this push:
     new 36a1be26 [YUNIKORN-2100] Merge functionality of node coordinator into 
context (#716)
36a1be26 is described below

commit 36a1be26427370d72f0e4575501b9873fd95d8b3
Author: Craig Condit <[email protected]>
AuthorDate: Thu Nov 2 18:30:02 2023 +1100

    [YUNIKORN-2100] Merge functionality of node coordinator into context (#716)
    
    Merge the node_coordinator.go pod event handler logic into the Context's pod
    event handlers. Also, rename addPodToCache() / updatePodInCache() /
    removePodFromCache() to addPod() / updatePod() / deletePod() for 
consistency.
    
    Closes: #716
    
    Signed-off-by: Wilfred Spiegelenburg <[email protected]>
---
 pkg/cache/context.go               | 170 +++++++++++-----
 pkg/cache/context_test.go          | 361 +++++++++++++++++++++++++++-------
 pkg/cache/node_coordinator.go      | 140 -------------
 pkg/cache/node_coordinator_test.go | 392 -------------------------------------
 4 files changed, 418 insertions(+), 645 deletions(-)

diff --git a/pkg/cache/context.go b/pkg/cache/context.go
index a255e78d..f452c6ab 100644
--- a/pkg/cache/context.go
+++ b/pkg/cache/context.go
@@ -108,18 +108,9 @@ func (ctx *Context) AddSchedulingEventHandlers() {
 
        ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{
                Type:     client.PodInformerHandlers,
-               FilterFn: ctx.filterPods,
-               AddFn:    ctx.addPodToCache,
-               UpdateFn: ctx.updatePodInCache,
-               DeleteFn: ctx.removePodFromCache,
-       })
-
-       nodeCoordinator := newNodeResourceCoordinator(ctx.nodes)
-       ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{
-               Type:     client.PodInformerHandlers,
-               FilterFn: nodeCoordinator.filterPods,
-               UpdateFn: nodeCoordinator.updatePod,
-               DeleteFn: nodeCoordinator.deletePod,
+               AddFn:    ctx.addPod,
+               UpdateFn: ctx.updatePod,
+               DeleteFn: ctx.deletePod,
        })
 
        ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{
@@ -147,6 +138,9 @@ func (ctx *Context) SetPluginMode(pluginMode bool) {
 }
 
 func (ctx *Context) addNode(obj interface{}) {
+       ctx.lock.Lock()
+       defer ctx.lock.Unlock()
+
        node, err := convertToNode(obj)
        if err != nil {
                log.Log(log.ShimContext).Error("node conversion failed", 
zap.Error(err))
@@ -166,6 +160,9 @@ func (ctx *Context) addNode(obj interface{}) {
 }
 
 func (ctx *Context) updateNode(oldObj, newObj interface{}) {
+       ctx.lock.Lock()
+       defer ctx.lock.Unlock()
+
        // we only trigger update when resource changes
        oldNode, err := convertToNode(oldObj)
        if err != nil {
@@ -189,6 +186,9 @@ func (ctx *Context) updateNode(oldObj, newObj interface{}) {
 }
 
 func (ctx *Context) deleteNode(obj interface{}) {
+       ctx.lock.Lock()
+       defer ctx.lock.Unlock()
+
        var node *v1.Node
        switch t := obj.(type) {
        case *v1.Node:
@@ -217,25 +217,90 @@ func (ctx *Context) deleteNode(obj interface{}) {
                fmt.Sprintf("node %s is deleted from the scheduler", node.Name))
 }
 
-func (ctx *Context) addPodToCache(obj interface{}) {
+func (ctx *Context) addPod(obj interface{}) {
        pod, err := utils.Convert2Pod(obj)
        if err != nil {
-               log.Log(log.ShimContext).Error("failed to add pod to cache", 
zap.Error(err))
+               log.Log(log.ShimContext).Error("failed to add pod", 
zap.Error(err))
+               return
+       }
+       if utils.GetApplicationIDFromPod(pod) == "" {
+               ctx.updateForeignPod(pod)
+       } else {
+               ctx.updateYuniKornPod(pod)
+       }
+}
+
+func (ctx *Context) updatePod(_, newObj interface{}) {
+       pod, err := utils.Convert2Pod(newObj)
+       if err != nil {
+               log.Log(log.ShimContext).Error("failed to update pod", 
zap.Error(err))
                return
        }
+       if utils.GetApplicationIDFromPod(pod) == "" {
+               ctx.updateForeignPod(pod)
+       } else {
+               ctx.updateYuniKornPod(pod)
+       }
+}
 
-       // treat a terminated pod like a removal
+func (ctx *Context) updateYuniKornPod(pod *v1.Pod) {
+       ctx.lock.Lock()
+       defer ctx.lock.Unlock()
+
+       // treat terminated pods like a remove
        if utils.IsPodTerminated(pod) {
-               log.Log(log.ShimContext).Debug("Request to add terminated pod, 
removing from cache", zap.String("podName", pod.Name))
+               log.Log(log.ShimContext).Debug("Request to update terminated 
pod, removing from cache", zap.String("podName", pod.Name))
                ctx.schedulerCache.RemovePod(pod)
                return
        }
+       ctx.schedulerCache.UpdatePod(pod)
+}
 
-       log.Log(log.ShimContext).Debug("adding pod to cache", 
zap.String("podName", pod.Name))
-       ctx.schedulerCache.AddPod(pod)
+func (ctx *Context) updateForeignPod(pod *v1.Pod) {
+       ctx.lock.Lock()
+       defer ctx.lock.Unlock()
+
+       podStatusBefore := ""
+       oldPod, ok := ctx.schedulerCache.GetPod(string(pod.UID))
+       if ok {
+               podStatusBefore = string(oldPod.Status.Phase)
+       }
+
+       // conditions for allocate:
+       //   1. pod was previously assigned
+       //   2. pod is now assigned
+       //   3. pod is not in terminated state
+       if oldPod == nil && utils.IsAssignedPod(pod) && 
!utils.IsPodTerminated(pod) {
+               log.Log(log.ShimContext).Debug("pod is assigned to a node, 
trigger occupied resource update",
+                       zap.String("namespace", pod.Namespace),
+                       zap.String("podName", pod.Name),
+                       zap.String("podStatusBefore", podStatusBefore),
+                       zap.String("podStatusCurrent", 
string(pod.Status.Phase)))
+               podResource := common.GetPodResource(pod)
+               ctx.nodes.updateNodeOccupiedResources(pod.Spec.NodeName, 
podResource, AddOccupiedResource)
+               ctx.schedulerCache.AddPod(pod)
+               return
+       }
+
+       // conditions for release:
+       //   1. pod was previously assigned
+       //   2. pod is now in a terminated state
+       if oldPod != nil && utils.IsPodTerminated(pod) {
+               log.Log(log.ShimContext).Debug("pod terminated, trigger 
occupied resource update",
+                       zap.String("namespace", pod.Namespace),
+                       zap.String("podName", pod.Name),
+                       zap.String("podStatusBefore", podStatusBefore),
+                       zap.String("podStatusCurrent", 
string(pod.Status.Phase)))
+               // this means pod is terminated
+               // we need sub the occupied resource and re-sync with the 
scheduler-core
+               podResource := common.GetPodResource(pod)
+               ctx.nodes.updateNodeOccupiedResources(pod.Spec.NodeName, 
podResource, SubOccupiedResource)
+               ctx.schedulerCache.RemovePod(pod)
+               return
+       }
 }
 
-func (ctx *Context) removePodFromCache(obj interface{}) {
+func (ctx *Context) deletePod(obj interface{}) {
        var pod *v1.Pod
        switch t := obj.(type) {
        case *v1.Pod:
@@ -252,39 +317,47 @@ func (ctx *Context) removePodFromCache(obj interface{}) {
                return
        }
 
+       if utils.GetApplicationIDFromPod(pod) == "" {
+               ctx.deleteForeignPod(pod)
+       } else {
+               ctx.deleteYuniKornPod(pod)
+       }
+}
+
+func (ctx *Context) deleteYuniKornPod(pod *v1.Pod) {
+       ctx.lock.Lock()
+       defer ctx.lock.Unlock()
+
        log.Log(log.ShimContext).Debug("removing pod from cache", 
zap.String("podName", pod.Name))
        ctx.schedulerCache.RemovePod(pod)
 }
 
-func (ctx *Context) updatePodInCache(oldObj, newObj interface{}) {
-       _, err := utils.Convert2Pod(oldObj)
-       if err != nil {
-               log.Log(log.ShimContext).Error("failed to update pod in cache", 
zap.Error(err))
-               return
-       }
-       newPod, err := utils.Convert2Pod(newObj)
-       if err != nil {
-               log.Log(log.ShimContext).Error("failed to update pod in cache", 
zap.Error(err))
-               return
-       }
+func (ctx *Context) deleteForeignPod(pod *v1.Pod) {
+       ctx.lock.Lock()
+       defer ctx.lock.Unlock()
 
-       // treat terminated pods like a remove
-       if utils.IsPodTerminated(newPod) {
-               log.Log(log.ShimContext).Debug("Request to update terminated 
pod, removing from cache", zap.String("podName", newPod.Name))
-               ctx.schedulerCache.RemovePod(newPod)
+       oldPod, ok := ctx.schedulerCache.GetPod(string(pod.UID))
+       if !ok {
+               // if pod is not in scheduler cache, no node updates are needed
+               log.Log(log.ShimContext).Debug("unknown foreign pod deleted, no 
resource updated needed",
+                       zap.String("namespace", pod.Namespace),
+                       zap.String("podName", pod.Name))
                return
        }
 
-       ctx.schedulerCache.UpdatePod(newPod)
-}
-
-// filter pods by scheduler name and state
-func (ctx *Context) filterPods(obj interface{}) bool {
-       switch obj := obj.(type) {
-       case *v1.Pod:
-               return utils.GetApplicationIDFromPod(obj) != ""
-       default:
-               return false
+       // conditions for release:
+       //   1. pod is already assigned to a node
+       if oldPod != nil {
+               log.Log(log.ShimContext).Debug("foreign pod deleted, triggering 
occupied resource update",
+                       zap.String("namespace", pod.Namespace),
+                       zap.String("podName", pod.Name),
+                       zap.String("podStatusBefore", 
string(oldPod.Status.Phase)),
+                       zap.String("podStatusCurrent", 
string(pod.Status.Phase)))
+               // this means pod is terminated
+               // we need sub the occupied resource and re-sync with the 
scheduler-core
+               podResource := common.GetPodResource(pod)
+               ctx.nodes.updateNodeOccupiedResources(pod.Spec.NodeName, 
podResource, SubOccupiedResource)
+               ctx.schedulerCache.RemovePod(pod)
        }
 }
 
@@ -367,6 +440,9 @@ func (ctx *Context) filterPriorityClasses(obj interface{}) 
bool {
 }
 
 func (ctx *Context) addPriorityClass(obj interface{}) {
+       ctx.lock.Lock()
+       defer ctx.lock.Unlock()
+
        log.Log(log.ShimContext).Debug("priority class added")
        priorityClass := utils.Convert2PriorityClass(obj)
        if priorityClass != nil {
@@ -375,6 +451,9 @@ func (ctx *Context) addPriorityClass(obj interface{}) {
 }
 
 func (ctx *Context) updatePriorityClass(_, newObj interface{}) {
+       ctx.lock.Lock()
+       defer ctx.lock.Unlock()
+
        log.Log(log.ShimContext).Debug("priority class updated")
        priorityClass := utils.Convert2PriorityClass(newObj)
        if priorityClass != nil {
@@ -383,6 +462,9 @@ func (ctx *Context) updatePriorityClass(_, newObj 
interface{}) {
 }
 
 func (ctx *Context) deletePriorityClass(obj interface{}) {
+       ctx.lock.Lock()
+       defer ctx.lock.Unlock()
+
        log.Log(log.ShimContext).Debug("priorityClass deleted")
        var priorityClass *schedulingv1.PriorityClass
        switch t := obj.(type) {
diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go
index 599be918..a97aced1 100644
--- a/pkg/cache/context_test.go
+++ b/pkg/cache/context_test.go
@@ -48,6 +48,10 @@ import (
        "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
 )
 
+const (
+       Host1 = "HOST1"
+)
+
 var (
        testGroups = []string{"dev", "yunikorn"}
 )
@@ -334,49 +338,7 @@ func TestRemoveApplicationInternal(t *testing.T) {
        assert.Equal(t, ok, false)
 }
 
-func TestFilterPods(t *testing.T) {
-       context := initContextForTest()
-       pod1 := &v1.Pod{
-               TypeMeta: apis.TypeMeta{
-                       Kind:       "Pod",
-                       APIVersion: "v1",
-               },
-               ObjectMeta: apis.ObjectMeta{
-                       Name: "yunikorn-test-00001",
-                       UID:  "UID-00001",
-               },
-               Spec: v1.PodSpec{SchedulerName: "yunikorn"},
-       }
-       pod2 := &v1.Pod{
-               TypeMeta: apis.TypeMeta{
-                       Kind:       "Pod",
-                       APIVersion: "v1",
-               },
-               ObjectMeta: apis.ObjectMeta{
-                       Name: "yunikorn-test-00002",
-                       UID:  "UID-00002",
-               },
-               Spec: v1.PodSpec{SchedulerName: "default-scheduler"},
-       }
-       pod3 := &v1.Pod{
-               TypeMeta: apis.TypeMeta{
-                       Kind:       "Pod",
-                       APIVersion: "v1",
-               },
-               ObjectMeta: apis.ObjectMeta{
-                       Name:   "yunikorn-test-00003",
-                       UID:    "UID-00003",
-                       Labels: map[string]string{"applicationId": 
"test-00003"},
-               },
-               Spec: v1.PodSpec{SchedulerName: "yunikorn"},
-       }
-       assert.Check(t, !context.filterPods(nil), "nil object was allowed")
-       assert.Check(t, !context.filterPods(pod1), "yunikorn-managed pod with 
no app id was allowed")
-       assert.Check(t, !context.filterPods(pod2), "non-yunikorn-managed pod 
was allowed")
-       assert.Check(t, context.filterPods(pod3), "yunikorn-managed pod was 
filtered")
-}
-
-func TestAddPodToCache(t *testing.T) {
+func TestAddPod(t *testing.T) {
        context := initContextForTest()
 
        pod1 := &v1.Pod{
@@ -387,6 +349,9 @@ func TestAddPodToCache(t *testing.T) {
                ObjectMeta: apis.ObjectMeta{
                        Name: "yunikorn-test-00001",
                        UID:  "UID-00001",
+                       Annotations: map[string]string{
+                               constants.AnnotationApplicationID: 
"yunikorn-test-00001",
+                       },
                },
                Spec: v1.PodSpec{SchedulerName: "yunikorn"},
        }
@@ -398,6 +363,9 @@ func TestAddPodToCache(t *testing.T) {
                ObjectMeta: apis.ObjectMeta{
                        Name: "yunikorn-test-00002",
                        UID:  "UID-00002",
+                       Annotations: map[string]string{
+                               constants.AnnotationApplicationID: 
"yunikorn-test-00002",
+                       },
                },
                Spec: v1.PodSpec{SchedulerName: "yunikorn"},
                Status: v1.PodStatus{
@@ -405,9 +373,9 @@ func TestAddPodToCache(t *testing.T) {
                },
        }
 
-       context.addPodToCache(nil)  // no-op, but should not crash
-       context.addPodToCache(pod1) // should be added
-       context.addPodToCache(pod2) // should skip as pod is terminated
+       context.addPod(nil)  // no-op, but should not crash
+       context.addPod(pod1) // should be added
+       context.addPod(pod2) // should skip as pod is terminated
 
        _, ok := context.schedulerCache.GetPod("UID-00001")
        assert.Check(t, ok, "active pod was not added")
@@ -415,7 +383,7 @@ func TestAddPodToCache(t *testing.T) {
        assert.Check(t, !ok, "terminated pod was added")
 }
 
-func TestUpdatePodInCache(t *testing.T) {
+func TestUpdatePod(t *testing.T) {
        context := initContextForTest()
 
        pod1 := &v1.Pod{
@@ -424,9 +392,12 @@ func TestUpdatePodInCache(t *testing.T) {
                        APIVersion: "v1",
                },
                ObjectMeta: apis.ObjectMeta{
-                       Name:        "yunikorn-test-00001",
-                       UID:         "UID-00001",
-                       Annotations: map[string]string{"test.state": "new"},
+                       Name: "yunikorn-test-00001",
+                       UID:  "UID-00001",
+                       Annotations: map[string]string{
+                               constants.AnnotationApplicationID: 
"yunikorn-test-00001",
+                               "test.state":                      "new",
+                       },
                },
                Spec: v1.PodSpec{SchedulerName: "yunikorn"},
        }
@@ -436,9 +407,12 @@ func TestUpdatePodInCache(t *testing.T) {
                        APIVersion: "v1",
                },
                ObjectMeta: apis.ObjectMeta{
-                       Name:        "yunikorn-test-00001",
-                       UID:         "UID-00001",
-                       Annotations: map[string]string{"test.state": "updated"},
+                       Name: "yunikorn-test-00001",
+                       UID:  "UID-00001",
+                       Annotations: map[string]string{
+                               constants.AnnotationApplicationID: 
"yunikorn-test-00001",
+                               "test.state":                      "updated",
+                       },
                },
                Spec: v1.PodSpec{SchedulerName: "yunikorn"},
        }
@@ -450,6 +424,9 @@ func TestUpdatePodInCache(t *testing.T) {
                ObjectMeta: apis.ObjectMeta{
                        Name: "yunikorn-test-00001",
                        UID:  "UID-00001",
+                       Annotations: map[string]string{
+                               constants.AnnotationApplicationID: 
"yunikorn-test-00001",
+                       },
                },
                Spec: v1.PodSpec{SchedulerName: "yunikorn"},
                Status: v1.PodStatus{
@@ -457,29 +434,29 @@ func TestUpdatePodInCache(t *testing.T) {
                },
        }
 
-       context.addPodToCache(pod1)
+       context.addPod(pod1)
        _, ok := context.schedulerCache.GetPod("UID-00001")
        assert.Assert(t, ok, "pod1 is not present after adding")
 
        // these should not fail, but are no-ops
-       context.updatePodInCache(nil, nil)
-       context.updatePodInCache(nil, pod1)
-       context.updatePodInCache(pod1, nil)
+       context.updatePod(nil, nil)
+       context.updatePod(nil, pod1)
+       context.updatePod(pod1, nil)
 
        // ensure a terminated pod is removed
-       context.updatePodInCache(pod1, pod3)
+       context.updatePod(pod1, pod3)
        _, ok = context.schedulerCache.GetPod("UID-00001")
        assert.Check(t, !ok, "pod still found after termination")
 
        // ensure a non-terminated pod is updated
-       context.updatePodInCache(pod1, pod2)
+       context.updatePod(pod1, pod2)
        found, ok := context.schedulerCache.GetPod("UID-00001")
        if assert.Check(t, ok, "pod not found after update") {
                assert.Check(t, found.GetAnnotations()["test.state"] == 
"updated", "pod state not updated")
        }
 }
 
-func TestRemovePodFromCache(t *testing.T) {
+func TestDeletePod(t *testing.T) {
        context := initContextForTest()
 
        pod1 := &v1.Pod{
@@ -490,6 +467,9 @@ func TestRemovePodFromCache(t *testing.T) {
                ObjectMeta: apis.ObjectMeta{
                        Name: "yunikorn-test-00001",
                        UID:  "UID-00001",
+                       Annotations: map[string]string{
+                               constants.AnnotationApplicationID: 
"yunikorn-test-00001",
+                       },
                },
                Spec: v1.PodSpec{SchedulerName: "yunikorn"},
        }
@@ -501,30 +481,222 @@ func TestRemovePodFromCache(t *testing.T) {
                ObjectMeta: apis.ObjectMeta{
                        Name: "yunikorn-test-00002",
                        UID:  "UID-00002",
+                       Annotations: map[string]string{
+                               constants.AnnotationApplicationID: 
"yunikorn-test-00002",
+                       },
                },
                Spec: v1.PodSpec{SchedulerName: "yunikorn"},
        }
 
-       context.addPodToCache(pod1)
-       context.addPodToCache(pod2)
+       context.addPod(pod1)
+       context.addPod(pod2)
        _, ok := context.schedulerCache.GetPod("UID-00001")
        assert.Assert(t, ok, "pod1 is not present after adding")
        _, ok = context.schedulerCache.GetPod("UID-00002")
        assert.Assert(t, ok, "pod2 is not present after adding")
 
        // these should not fail, but here for completeness
-       context.removePodFromCache(nil)
-       context.removePodFromCache(cache.DeletedFinalStateUnknown{Key: 
"UID-00000", Obj: nil})
+       context.deletePod(nil)
+       context.deletePod(cache.DeletedFinalStateUnknown{Key: "UID-00000", Obj: 
nil})
 
-       context.removePodFromCache(pod1)
+       context.deletePod(pod1)
        _, ok = context.schedulerCache.GetPod("UID-00001")
        assert.Check(t, !ok, "pod1 is still present")
 
-       context.removePodFromCache(cache.DeletedFinalStateUnknown{Key: 
"UID-00002", Obj: pod2})
+       context.deletePod(cache.DeletedFinalStateUnknown{Key: "UID-00002", Obj: 
pod2})
        _, ok = context.schedulerCache.GetPod("UID-00002")
        assert.Check(t, !ok, "pod2 is still present")
 }
 
+//nolint:funlen
+func TestAddUpdatePodForeign(t *testing.T) {
+       mockedSchedulerApi := newMockSchedulerAPI()
+       context := initContextForTest()
+       context.nodes = newSchedulerNodes(mockedSchedulerApi, 
NewTestSchedulerCache())
+       host1 := nodeForTest(Host1, "10G", "10")
+       context.nodes.addNode(host1)
+
+       executed := false
+       expectAdd := false
+       expectRemove := false
+       tc := ""
+
+       mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
+               executed = true
+               assert.Equal(t, len(request.Nodes), 1, "%s: wrong node count", 
tc)
+               updatedNode := request.Nodes[0]
+               assert.Equal(t, updatedNode.NodeID, Host1, "%s: wrong nodeID", 
tc)
+               assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE, "%s: 
wrong action", tc)
+               assert.Equal(t, 
updatedNode.SchedulableResource.Resources[siCommon.Memory].Value, 
int64(10000*1000*1000), "%s: wrong schedulable memory", tc)
+               assert.Equal(t, 
updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000), 
"%s: wrong schedulable cpu", tc)
+               if expectAdd {
+                       assert.Equal(t, 
updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, 
int64(1000*1000*1000), "%s: wrong occupied memory (add)", tc)
+                       assert.Equal(t, 
updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(500), "%s: 
wrong occupied cpu (add)", tc)
+               }
+               if expectRemove {
+                       assert.Equal(t, 
updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(0), "%s: 
wrong occupied memory (remove)", tc)
+                       assert.Equal(t, 
updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(0), "%s: 
wrong occupied cpu (remove)", tc)
+               }
+               return nil
+       }
+
+       // pod is not assigned to any node
+       pod1 := foreignPod("pod1", "1G", "500m")
+       pod1.Status.Phase = v1.PodPending
+       pod1.Spec.NodeName = ""
+
+       // validate add
+       tc = "add-pod1"
+       executed = false
+       expectAdd = false
+       expectRemove = false
+       context.addPod(pod1)
+       assert.Assert(t, !executed, "unexpected update")
+       _, ok := context.schedulerCache.GetPod(string(pod1.UID))
+       assert.Assert(t, !ok, "unassigned pod found in cache")
+
+       // validate update
+       tc = "update-pod1"
+       executed = false
+       expectAdd = false
+       expectRemove = false
+       context.updatePod(nil, pod1)
+       assert.Assert(t, !executed, "unexpected update")
+       _, ok = context.schedulerCache.GetPod(string(pod1.UID))
+       assert.Assert(t, !ok, "unassigned pod found in cache")
+
+       // pod is assigned to a node but still in pending state, should update
+       pod2 := foreignPod("pod2", "1G", "500m")
+       pod2.Status.Phase = v1.PodPending
+       pod2.Spec.NodeName = Host1
+
+       // validate add
+       tc = "add-pod2"
+       executed = false
+       expectAdd = true
+       expectRemove = false
+       context.addPod(pod2)
+       assert.Assert(t, executed, "update not executed")
+       _, ok = context.schedulerCache.GetPod(string(pod2.UID))
+       assert.Assert(t, ok, "pod not found in cache")
+
+       // validate update
+       tc = "update-pod2"
+       executed = false
+       expectAdd = false
+       expectRemove = false
+       context.updatePod(nil, pod2)
+       assert.Assert(t, !executed, "unexpected update")
+       _, ok = context.schedulerCache.GetPod(string(pod2.UID))
+       assert.Assert(t, ok, "pod not found in cache")
+
+       // validate update when not already in cache
+       tc = "update-pod2-nocache-pre"
+       executed = false
+       expectAdd = false
+       expectRemove = true
+       context.deletePod(pod2)
+       tc = "update-pod2-nocache"
+       executed = false
+       expectAdd = true
+       expectRemove = false
+       context.updatePod(nil, pod2)
+       assert.Assert(t, executed, "update not executed")
+       _, ok = context.schedulerCache.GetPod(string(pod2.UID))
+       assert.Assert(t, ok, "pod not found in cache")
+
+       // pod is failed, should trigger update if already in cache
+       pod3 := pod2.DeepCopy()
+       pod3.Status.Phase = v1.PodFailed
+
+       // validate add
+       tc = "add-pod3"
+       executed = false
+       expectAdd = false
+       expectRemove = true
+       context.addPod(pod3)
+       assert.Assert(t, executed, "update not executed")
+       _, ok = context.schedulerCache.GetPod(string(pod3.UID))
+       assert.Assert(t, !ok, "failed pod found in cache")
+
+       // validate update when not already in cache
+       tc = "update-pod3-pre"
+       executed = false
+       expectAdd = true
+       expectRemove = false
+       context.addPod(pod2)
+       tc = "update-pod3"
+       executed = false
+       expectAdd = false
+       expectRemove = true
+       context.updatePod(nil, pod3)
+       assert.Assert(t, executed, "update not executed")
+       _, ok = context.schedulerCache.GetPod(string(pod3.UID))
+       assert.Assert(t, !ok, "failed pod found in cache")
+}
+
+func TestDeletePodForeign(t *testing.T) {
+       mockedSchedulerApi := newMockSchedulerAPI()
+       context := initContextForTest()
+       context.nodes = newSchedulerNodes(mockedSchedulerApi, 
NewTestSchedulerCache())
+       host1 := nodeForTest(Host1, "10G", "10")
+       context.nodes.addNode(host1)
+
+       executed := false
+       expectAdd := false
+       expectRemove := false
+       tc := ""
+
+       mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
+               executed = true
+               assert.Equal(t, len(request.Nodes), 1, "%s: wrong node count", 
tc)
+               updatedNode := request.Nodes[0]
+               assert.Equal(t, updatedNode.NodeID, Host1, "%s: wrong nodeID", 
tc)
+               assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE, "%s: 
wrong action", tc)
+               assert.Equal(t, 
updatedNode.SchedulableResource.Resources[siCommon.Memory].Value, 
int64(10000*1000*1000), "%s: wrong schedulable memory", tc)
+               assert.Equal(t, 
updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000), 
"%s: wrong schedulable cpu", tc)
+               if expectAdd {
+                       assert.Equal(t, 
updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, 
int64(1000*1000*1000), "%s: wrong occupied memory (add)", tc)
+                       assert.Equal(t, 
updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(500), "%s: 
wrong occupied cpu (add)", tc)
+               }
+               if expectRemove {
+                       assert.Equal(t, 
updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(0), "%s: 
wrong occupied memory (remove)", tc)
+                       assert.Equal(t, 
updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(0), "%s: 
wrong occupied cpu (remove)", tc)
+               }
+               return nil
+       }
+
+       // add existing pod
+       pod1 := foreignPod("pod1", "1G", "500m")
+       pod1.Status.Phase = v1.PodRunning
+       pod1.Spec.NodeName = Host1
+
+       // validate deletion of existing assigned pod
+       tc = "delete-pod1-pre"
+       executed = false
+       expectAdd = true
+       expectRemove = false
+       context.addPod(pod1)
+       tc = "delete-pod1"
+       executed = false
+       expectAdd = false
+       expectRemove = true
+       context.deletePod(pod1)
+       assert.Assert(t, executed, "update not executed")
+       _, ok := context.schedulerCache.GetPod(string(pod1.UID))
+       assert.Assert(t, !ok, "deleted pod found in cache")
+
+       // validate delete when not already found
+       tc = "delete-pod1-again"
+       executed = false
+       expectAdd = false
+       expectRemove = false
+       context.deletePod(pod1)
+       assert.Assert(t, !executed, "unexpected update")
+       _, ok = context.schedulerCache.GetPod(string(pod1.UID))
+       assert.Assert(t, !ok, "deleted pod found in cache")
+}
+
 func TestAddTask(t *testing.T) {
        context := initContextForTest()
 
@@ -1174,9 +1346,9 @@ func TestAddApplicationsWithTags(t *testing.T) {
                ObjectMeta: apis.ObjectMeta{
                        Name: "test2",
                        Annotations: map[string]string{
-                               constants.NamespaceQuota:          "{\"cpu\": 
\"1\", \"memory\": \"256M\", \"nvidia.com/gpu\": \"1\"}",
+                               constants.NamespaceQuota:                 
"{\"cpu\": \"1\", \"memory\": \"256M\", \"nvidia.com/gpu\": \"1\"}",
                                constants.DomainYuniKorn + "parentqueue": 
"root.test",
-                               constants.NamespaceGuaranteed:     "{\"cpu\": 
\"1\", \"memory\": \"256M\", \"nvidia.com/gpu\": \"1\"}",
+                               constants.NamespaceGuaranteed:            
"{\"cpu\": \"1\", \"memory\": \"256M\", \"nvidia.com/gpu\": \"1\"}",
                        },
                },
        }
@@ -1412,10 +1584,13 @@ func TestGetStateDump(t *testing.T) {
                        Namespace: "default",
                        Name:      "yunikorn-test-00001",
                        UID:       "UID-00001",
+                       Annotations: map[string]string{
+                               constants.AnnotationApplicationID: 
"yunikorn-test-00001",
+                       },
                },
                Spec: v1.PodSpec{SchedulerName: "yunikorn"},
        }
-       context.addPodToCache(pod1)
+       context.addPod(pod1)
 
        stateDumpStr, err := context.GetStateDump()
        assert.NilError(t, err, "error during state dump")
@@ -1597,3 +1772,51 @@ func waitForNodeAcceptedEvent(recorder 
*k8sEvents.FakeRecorder) error {
        }, 10*time.Millisecond, time.Second)
        return err
 }
+
+func nodeForTest(nodeID, memory, cpu string) *v1.Node {
+       resourceList := make(map[v1.ResourceName]resource.Quantity)
+       resourceList[v1.ResourceName("memory")] = resource.MustParse(memory)
+       resourceList[v1.ResourceName("cpu")] = resource.MustParse(cpu)
+       return &v1.Node{
+               TypeMeta: apis.TypeMeta{
+                       Kind:       "Node",
+                       APIVersion: "v1",
+               },
+               ObjectMeta: apis.ObjectMeta{
+                       Name:      nodeID,
+                       Namespace: "default",
+                       UID:       "uid_0001",
+               },
+               Spec: v1.NodeSpec{},
+               Status: v1.NodeStatus{
+                       Allocatable: resourceList,
+               },
+       }
+}
+
+func foreignPod(podName, memory, cpu string) *v1.Pod {
+       containers := make([]v1.Container, 0)
+       c1Resources := make(map[v1.ResourceName]resource.Quantity)
+       c1Resources[v1.ResourceMemory] = resource.MustParse(memory)
+       c1Resources[v1.ResourceCPU] = resource.MustParse(cpu)
+       containers = append(containers, v1.Container{
+               Name: "container-01",
+               Resources: v1.ResourceRequirements{
+                       Requests: c1Resources,
+               },
+       })
+
+       return &v1.Pod{
+               TypeMeta: apis.TypeMeta{
+                       Kind:       "Pod",
+                       APIVersion: "v1",
+               },
+               ObjectMeta: apis.ObjectMeta{
+                       Name: podName,
+                       UID:  types.UID(podName),
+               },
+               Spec: v1.PodSpec{
+                       Containers: containers,
+               },
+       }
+}
diff --git a/pkg/cache/node_coordinator.go b/pkg/cache/node_coordinator.go
deleted file mode 100644
index 7dbd20fc..00000000
--- a/pkg/cache/node_coordinator.go
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-*/
-
-package cache
-
-import (
-       "go.uber.org/zap"
-       v1 "k8s.io/api/core/v1"
-       k8sCache "k8s.io/client-go/tools/cache"
-
-       "github.com/apache/yunikorn-k8shim/pkg/common"
-       "github.com/apache/yunikorn-k8shim/pkg/common/utils"
-       "github.com/apache/yunikorn-k8shim/pkg/log"
-)
-
-// nodeResourceCoordinator looks at the resources that are not allocated by 
yunikorn,
-// and refresh scheduler cache to keep nodes' capacity in-sync.
-// this coordinator only looks after the pods that are not scheduled by 
yunikorn,
-// and it registers update/delete handler to the pod informer. It ensures that 
the
-// following operations are done
-//  1. when a pod is becoming Running, add occupied node resource
-//  2. when a pod is terminated, sub the occupied node resource
-//  3. when a pod is deleted, sub the occupied node resource
-//
-// each of these updates will trigger a node UPDATE action to update the 
occupied
-// resource in the scheduler-core.
-type nodeResourceCoordinator struct {
-       nodes *schedulerNodes
-}
-
-func newNodeResourceCoordinator(nodes *schedulerNodes) 
*nodeResourceCoordinator {
-       return &nodeResourceCoordinator{nodes}
-}
-
-// filter pods that not scheduled by us
-func (c *nodeResourceCoordinator) filterPods(obj interface{}) bool {
-       switch obj := obj.(type) {
-       case *v1.Pod:
-               return utils.GetApplicationIDFromPod(obj) == ""
-       default:
-               return false
-       }
-}
-
-func (c *nodeResourceCoordinator) updatePod(old, new interface{}) {
-       oldPod, err := utils.Convert2Pod(old)
-       if err != nil {
-               log.Log(log.ShimCacheNode).Error("expecting a pod object", 
zap.Error(err))
-               return
-       }
-
-       newPod, err := utils.Convert2Pod(new)
-       if err != nil {
-               log.Log(log.ShimCacheNode).Error("expecting a pod object", 
zap.Error(err))
-               return
-       }
-
-       // this handles the allocate and release of a pod that not scheduled by 
yunikorn
-       // the check is triggered when a pod status changes
-       // conditions for allocate:
-       //   1. pod got assigned to a node
-       //   2. pod is not in terminated state
-       if !utils.IsAssignedPod(oldPod) && utils.IsAssignedPod(newPod) && 
!utils.IsPodTerminated(newPod) {
-               log.Log(log.ShimCacheNode).Debug("pod is assigned to a node, 
trigger occupied resource update",
-                       zap.String("namespace", newPod.Namespace),
-                       zap.String("podName", newPod.Name),
-                       zap.String("podStatusBefore", 
string(oldPod.Status.Phase)),
-                       zap.String("podStatusCurrent", 
string(newPod.Status.Phase)))
-               // if pod is running but not scheduled by us,
-               // we need to notify scheduler-core to re-sync the node resource
-               podResource := common.GetPodResource(newPod)
-               c.nodes.updateNodeOccupiedResources(newPod.Spec.NodeName, 
podResource, AddOccupiedResource)
-               c.nodes.cache.AddPod(newPod)
-               return
-       }
-
-       // conditions for release:
-       //   1. pod is already assigned to a node
-       //   2. pod status changes from non-terminated to terminated state
-       if utils.IsAssignedPod(newPod) && oldPod.Status.Phase != 
newPod.Status.Phase && utils.IsPodTerminated(newPod) {
-               log.Log(log.ShimCacheNode).Debug("pod terminated, trigger 
occupied resource update",
-                       zap.String("namespace", newPod.Namespace),
-                       zap.String("podName", newPod.Name),
-                       zap.String("podStatusBefore", 
string(oldPod.Status.Phase)),
-                       zap.String("podStatusCurrent", 
string(newPod.Status.Phase)))
-               // this means pod is terminated
-               // we need sub the occupied resource and re-sync with the 
scheduler-core
-               podResource := common.GetPodResource(newPod)
-               c.nodes.updateNodeOccupiedResources(newPod.Spec.NodeName, 
podResource, SubOccupiedResource)
-               c.nodes.cache.RemovePod(newPod)
-               return
-       }
-}
-
-func (c *nodeResourceCoordinator) deletePod(obj interface{}) {
-       var pod *v1.Pod
-       switch t := obj.(type) {
-       case *v1.Pod:
-               pod = t
-       case k8sCache.DeletedFinalStateUnknown:
-               var err error
-               pod, err = utils.Convert2Pod(t.Obj)
-               if err != nil {
-                       log.Log(log.ShimCacheNode).Error(err.Error())
-                       return
-               }
-       default:
-               log.Log(log.ShimCacheNode).Error("cannot convert to pod")
-               return
-       }
-
-       // if pod is already terminated, that means the updates have already 
done
-       if utils.IsPodTerminated(pod) {
-               log.Log(log.ShimCacheNode).Debug("pod is already terminated, 
occupied resource updated should have already been done")
-               return
-       }
-
-       log.Log(log.ShimCacheNode).Info("deleting pod that scheduled by other 
schedulers",
-               zap.String("namespace", pod.Namespace),
-               zap.String("podName", pod.Name))
-
-       podResource := common.GetPodResource(pod)
-       c.nodes.updateNodeOccupiedResources(pod.Spec.NodeName, podResource, 
SubOccupiedResource)
-       c.nodes.cache.RemovePod(pod)
-}
diff --git a/pkg/cache/node_coordinator_test.go 
b/pkg/cache/node_coordinator_test.go
deleted file mode 100644
index 68a03f7a..00000000
--- a/pkg/cache/node_coordinator_test.go
+++ /dev/null
@@ -1,392 +0,0 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-*/
-
-package cache
-
-import (
-       "testing"
-
-       "gotest.tools/v3/assert"
-       v1 "k8s.io/api/core/v1"
-       "k8s.io/apimachinery/pkg/api/resource"
-       apis "k8s.io/apimachinery/pkg/apis/meta/v1"
-
-       siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
-       "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
-)
-
-const (
-       Host1     = "HOST1"
-       Host2     = "HOST2"
-       HostEmpty = ""
-)
-
-func PodForTest(podName, memory, cpu string) *v1.Pod {
-       containers := make([]v1.Container, 0)
-       c1Resources := make(map[v1.ResourceName]resource.Quantity)
-       c1Resources[v1.ResourceMemory] = resource.MustParse(memory)
-       c1Resources[v1.ResourceCPU] = resource.MustParse(cpu)
-       containers = append(containers, v1.Container{
-               Name: "container-01",
-               Resources: v1.ResourceRequirements{
-                       Requests: c1Resources,
-               },
-       })
-
-       return &v1.Pod{
-               TypeMeta: apis.TypeMeta{
-                       Kind:       "Pod",
-                       APIVersion: "v1",
-               },
-               ObjectMeta: apis.ObjectMeta{
-                       Name: podName,
-               },
-               Spec: v1.PodSpec{
-                       Containers: containers,
-               },
-       }
-}
-
-func NodeForTest(nodeID, memory, cpu string) *v1.Node {
-       resourceList := make(map[v1.ResourceName]resource.Quantity)
-       resourceList[v1.ResourceName("memory")] = resource.MustParse(memory)
-       resourceList[v1.ResourceName("cpu")] = resource.MustParse(cpu)
-       return &v1.Node{
-               TypeMeta: apis.TypeMeta{
-                       Kind:       "Node",
-                       APIVersion: "v1",
-               },
-               ObjectMeta: apis.ObjectMeta{
-                       Name:      nodeID,
-                       Namespace: "default",
-                       UID:       "uid_0001",
-               },
-               Spec: v1.NodeSpec{},
-               Status: v1.NodeStatus{
-                       Allocatable: resourceList,
-               },
-       }
-}
-
-func TestUpdatePod(t *testing.T) {
-       mockedSchedulerApi := newMockSchedulerAPI()
-       nodes := newSchedulerNodes(mockedSchedulerApi, NewTestSchedulerCache())
-       host1 := NodeForTest(Host1, "10G", "10")
-       host2 := NodeForTest(Host2, "10G", "10")
-       nodes.addNode(host1)
-       nodes.addNode(host2)
-       coordinator := newNodeResourceCoordinator(nodes)
-
-       // pod is not assigned to any node
-       // this won't trigger an update
-       pod1 := PodForTest("pod1", "1G", "500m")
-       pod2 := PodForTest("pod1", "1G", "500m")
-       pod1.Status.Phase = v1.PodPending
-       pod1.Status.Phase = v1.PodPending
-       pod1.Spec.NodeName = ""
-       pod2.Spec.NodeName = ""
-       mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
-               t.Fatalf("update should not run because state is not changed")
-               return nil
-       }
-       coordinator.updatePod(pod1, pod2)
-
-       // pod is already assigned to a node and state is running
-       // this won't trigger an update
-       pod1.Status.Phase = v1.PodRunning
-       pod1.Status.Phase = v1.PodRunning
-       pod1.Spec.NodeName = Host1
-       pod2.Spec.NodeName = Host1
-       mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
-               t.Fatalf("update should not run because state is not changed")
-               return nil
-       }
-       coordinator.updatePod(pod1, pod2)
-
-       // pod state remains in Pending, pod is assigned to a node
-       // this happens when the pod just gets allocated and started, but not 
in running state yet
-       // trigger an update
-       pod1.Status.Phase = v1.PodPending
-       pod2.Status.Phase = v1.PodPending
-       pod1.Spec.NodeName = HostEmpty
-       pod2.Spec.NodeName = Host1
-       executed := false
-       mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
-               executed = true
-               assert.Equal(t, len(request.Nodes), 1)
-               updatedNode := request.Nodes[0]
-               assert.Equal(t, updatedNode.NodeID, Host1)
-               assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE)
-               assert.Equal(t, 
updatedNode.SchedulableResource.Resources[siCommon.Memory].Value, 
int64(10000*1000*1000))
-               assert.Equal(t, 
updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000))
-               assert.Equal(t, 
updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, 
int64(1000*1000*1000))
-               assert.Equal(t, 
updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(500))
-               return nil
-       }
-       coordinator.updatePod(pod1, pod2)
-       assert.Assert(t, executed)
-
-       // pod state changed from running to failed, pod terminated
-       // trigger another update
-       pod1.Status.Phase = v1.PodRunning
-       pod2.Status.Phase = v1.PodFailed
-       pod1.Spec.NodeName = Host1
-       pod2.Spec.NodeName = Host1
-       executed = false
-       mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
-               executed = true
-               assert.Equal(t, len(request.Nodes), 1)
-               updatedNode := request.Nodes[0]
-               assert.Equal(t, updatedNode.NodeID, Host1)
-               assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE)
-               assert.Equal(t, 
updatedNode.SchedulableResource.Resources[siCommon.Memory].Value, 
int64(10000*1000*1000))
-               assert.Equal(t, 
updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000))
-               assert.Equal(t, 
updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(0))
-               assert.Equal(t, 
updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(0))
-               return nil
-       }
-       coordinator.updatePod(pod1, pod2)
-       assert.Assert(t, executed)
-
-       // pod state changed from pending to running
-       // this is not triggering a new update because the pod was already 
allocated to a node
-       pod1.Status.Phase = v1.PodPending
-       pod2.Status.Phase = v1.PodRunning
-       pod1.Spec.NodeName = Host2
-       pod2.Spec.NodeName = Host2
-       mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
-               t.Fatalf("update should not run because pod is already 
allocated")
-               return nil
-       }
-       coordinator.updatePod(pod1, pod2)
-
-       // pod state Running, pod is assigned to a node
-       // trigger an update
-       pod1.Status.Phase = v1.PodRunning
-       pod2.Status.Phase = v1.PodRunning
-       pod1.Spec.NodeName = ""
-       pod2.Spec.NodeName = Host2
-       executed = false
-       mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
-               executed = true
-               assert.Equal(t, len(request.Nodes), 1)
-               updatedNode := request.Nodes[0]
-               assert.Equal(t, updatedNode.NodeID, Host2)
-               assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE)
-               assert.Equal(t, 
updatedNode.SchedulableResource.Resources[siCommon.Memory].Value, 
int64(10000*1000*1000))
-               assert.Equal(t, 
updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000))
-               assert.Equal(t, 
updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, 
int64(1000*1000*1000))
-               assert.Equal(t, 
updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(500))
-               return nil
-       }
-       coordinator.updatePod(pod1, pod2)
-       assert.Assert(t, executed)
-
-       // pod state Running to Succeed, pod terminated
-       // this should trigger an update
-       pod1.Status.Phase = v1.PodRunning
-       pod2.Status.Phase = v1.PodSucceeded
-       pod1.Spec.NodeName = Host2
-       pod2.Spec.NodeName = Host2
-       executed = false
-       mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
-               executed = true
-               assert.Equal(t, len(request.Nodes), 1)
-               updatedNode := request.Nodes[0]
-               assert.Equal(t, updatedNode.NodeID, Host2)
-               assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE)
-               assert.Equal(t, 
updatedNode.SchedulableResource.Resources[siCommon.Memory].Value, 
int64(10000*1000*1000))
-               assert.Equal(t, 
updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000))
-               assert.Equal(t, 
updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(0))
-               assert.Equal(t, 
updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(0))
-               return nil
-       }
-       coordinator.updatePod(pod1, pod2)
-       assert.Assert(t, executed)
-
-       // pod gets assigned to a node, but pod status is already failed
-       // this should not trigger an update
-       pod1.Status.Phase = v1.PodFailed
-       pod2.Status.Phase = v1.PodFailed
-       pod1.Spec.NodeName = HostEmpty
-       pod2.Spec.NodeName = Host2
-       mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
-               t.Fatalf("update should not run because pod is already 
allocated")
-               return nil
-       }
-       coordinator.updatePod(pod1, pod2)
-}
-
-func TestDeletePod(t *testing.T) {
-       mockedSchedulerApi := newMockSchedulerAPI()
-       nodes := newSchedulerNodes(mockedSchedulerApi, NewTestSchedulerCache())
-       host1 := NodeForTest(Host1, "10G", "10")
-       nodes.addNode(host1)
-       coordinator := newNodeResourceCoordinator(nodes)
-
-       // pod from pending to running
-       // occupied resources should be added to the node
-       pod1 := PodForTest("pod1", "1G", "500m")
-       pod2 := PodForTest("pod1", "1G", "500m")
-       pod1.Status.Phase = v1.PodPending
-       pod2.Status.Phase = v1.PodRunning
-       pod1.Spec.NodeName = HostEmpty
-       pod2.Spec.NodeName = Host1
-       executed := false
-       mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
-               executed = true
-               assert.Equal(t, len(request.Nodes), 1)
-               updatedNode := request.Nodes[0]
-               assert.Equal(t, updatedNode.NodeID, Host1)
-               assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE)
-               assert.Equal(t, 
updatedNode.SchedulableResource.Resources[siCommon.Memory].Value, 
int64(10000*1000*1000))
-               assert.Equal(t, 
updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000))
-               assert.Equal(t, 
updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, 
int64(1000*1000*1000))
-               assert.Equal(t, 
updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(500))
-               return nil
-       }
-       coordinator.updatePod(pod1, pod2)
-       assert.Assert(t, executed)
-
-       // delete pod from the running state
-       executed = false
-       mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
-               executed = true
-               assert.Equal(t, len(request.Nodes), 1)
-               updatedNode := request.Nodes[0]
-               assert.Equal(t, updatedNode.NodeID, Host1)
-               assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE)
-               assert.Equal(t, 
updatedNode.SchedulableResource.Resources[siCommon.Memory].Value, int64(10000))
-               assert.Equal(t, 
updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000))
-               assert.Equal(t, 
updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(0))
-               assert.Equal(t, 
updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(0))
-               return nil
-       }
-       coordinator.deletePod(pod1)
-}
-
-func TestDeleteTerminatedPod(t *testing.T) {
-       mockedSchedulerApi := newMockSchedulerAPI()
-       nodes := newSchedulerNodes(mockedSchedulerApi, NewTestSchedulerCache())
-       host1 := NodeForTest(Host1, "10G", "10")
-       nodes.addNode(host1)
-       coordinator := newNodeResourceCoordinator(nodes)
-
-       // pod from pending to running
-       // occupied resources should be added to the node
-       pod1 := PodForTest("pod1", "1G", "500m")
-       pod2 := PodForTest("pod1", "1G", "500m")
-       pod1.Status.Phase = v1.PodPending
-       pod2.Status.Phase = v1.PodRunning
-       pod1.Spec.NodeName = HostEmpty
-       pod2.Spec.NodeName = Host1
-       executed := false
-       mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
-               executed = true
-               assert.Equal(t, len(request.Nodes), 1)
-               updatedNode := request.Nodes[0]
-               assert.Equal(t, updatedNode.NodeID, Host1)
-               assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE)
-               assert.Equal(t, 
updatedNode.SchedulableResource.Resources[siCommon.Memory].Value, 
int64(10000*1000*1000))
-               assert.Equal(t, 
updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000))
-               assert.Equal(t, 
updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, 
int64(1000*1000*1000))
-               assert.Equal(t, 
updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(500))
-               return nil
-       }
-       coordinator.updatePod(pod1, pod2)
-       assert.Assert(t, executed)
-
-       // pod from running to succeed
-       // occupied resources should be added to the node
-       pod1.Status.Phase = v1.PodRunning
-       pod2.Status.Phase = v1.PodSucceeded
-       pod1.Spec.NodeName = Host1
-       pod2.Spec.NodeName = Host1
-       executed = false
-       mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
-               executed = true
-               assert.Equal(t, len(request.Nodes), 1)
-               updatedNode := request.Nodes[0]
-               assert.Equal(t, updatedNode.NodeID, Host1)
-               assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE)
-               assert.Equal(t, 
updatedNode.SchedulableResource.Resources[siCommon.Memory].Value, 
int64(10000*1000*1000))
-               assert.Equal(t, 
updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000))
-               assert.Equal(t, 
updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(0))
-               assert.Equal(t, 
updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(0))
-               return nil
-       }
-       coordinator.updatePod(pod1, pod2)
-       assert.Assert(t, executed)
-
-       // delete pod from the succeed state
-       executed = false
-       mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
-               executed = true
-               t.Fatalf("update should not be triggered as it should have 
already done")
-               return nil
-       }
-       coordinator.deletePod(pod2)
-       assert.Equal(t, executed, false)
-}
-
-func TestNodeCoordinatorFilterPods(t *testing.T) {
-       mockedSchedulerAPI := newMockSchedulerAPI()
-       nodes := newSchedulerNodes(mockedSchedulerAPI, NewTestSchedulerCache())
-       host1 := NodeForTest(Host1, "10G", "10")
-       nodes.addNode(host1)
-       coordinator := newNodeResourceCoordinator(nodes)
-
-       pod1 := &v1.Pod{
-               TypeMeta: apis.TypeMeta{
-                       Kind:       "Pod",
-                       APIVersion: "v1",
-               },
-               ObjectMeta: apis.ObjectMeta{
-                       Name: "yunikorn-test-00001",
-                       UID:  "UID-00001",
-               },
-               Spec: v1.PodSpec{SchedulerName: "yunikorn"},
-       }
-       pod2 := &v1.Pod{
-               TypeMeta: apis.TypeMeta{
-                       Kind:       "Pod",
-                       APIVersion: "v1",
-               },
-               ObjectMeta: apis.ObjectMeta{
-                       Name: "yunikorn-test-00002",
-                       UID:  "UID-00002",
-               },
-               Spec: v1.PodSpec{SchedulerName: "default-scheduler"},
-       }
-       pod3 := &v1.Pod{
-               TypeMeta: apis.TypeMeta{
-                       Kind:       "Pod",
-                       APIVersion: "v1",
-               },
-               ObjectMeta: apis.ObjectMeta{
-                       Name:   "yunikorn-test-00003",
-                       UID:    "UID-00003",
-                       Labels: map[string]string{"applicationId": 
"test-00003"},
-               },
-               Spec: v1.PodSpec{SchedulerName: "yunikorn"},
-       }
-       assert.Check(t, !coordinator.filterPods(nil), "nil object was allowed")
-       assert.Check(t, coordinator.filterPods(pod1), "yunikorn-managed pod 
with no app id was filtered")
-       assert.Check(t, coordinator.filterPods(pod2), "non-yunikorn-managed pod 
was filtered")
-       assert.Check(t, !coordinator.filterPods(pod3), "yunikorn-managed pod 
was allowed")
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to