This is an automated email from the ASF dual-hosted git repository.
wilfreds pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push:
new 36a1be26 [YUNIKORN-2100] Merge functionality of node coordinator into
context (#716)
36a1be26 is described below
commit 36a1be26427370d72f0e4575501b9873fd95d8b3
Author: Craig Condit <[email protected]>
AuthorDate: Thu Nov 2 18:30:02 2023 +1100
[YUNIKORN-2100] Merge functionality of node coordinator into context (#716)
Merge the node_coordinator.go pod event handler logic into the Context's pod
event handlers. Also, rename addPodToCache() / updatePodInCache() /
removePodFromCache() to addPod() / updatePod() / deletePod() for
consistency.
Closes: #716
Signed-off-by: Wilfred Spiegelenburg <[email protected]>
---
pkg/cache/context.go | 170 +++++++++++-----
pkg/cache/context_test.go | 361 +++++++++++++++++++++++++++-------
pkg/cache/node_coordinator.go | 140 -------------
pkg/cache/node_coordinator_test.go | 392 -------------------------------------
4 files changed, 418 insertions(+), 645 deletions(-)
diff --git a/pkg/cache/context.go b/pkg/cache/context.go
index a255e78d..f452c6ab 100644
--- a/pkg/cache/context.go
+++ b/pkg/cache/context.go
@@ -108,18 +108,9 @@ func (ctx *Context) AddSchedulingEventHandlers() {
ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{
Type: client.PodInformerHandlers,
- FilterFn: ctx.filterPods,
- AddFn: ctx.addPodToCache,
- UpdateFn: ctx.updatePodInCache,
- DeleteFn: ctx.removePodFromCache,
- })
-
- nodeCoordinator := newNodeResourceCoordinator(ctx.nodes)
- ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{
- Type: client.PodInformerHandlers,
- FilterFn: nodeCoordinator.filterPods,
- UpdateFn: nodeCoordinator.updatePod,
- DeleteFn: nodeCoordinator.deletePod,
+ AddFn: ctx.addPod,
+ UpdateFn: ctx.updatePod,
+ DeleteFn: ctx.deletePod,
})
ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{
@@ -147,6 +138,9 @@ func (ctx *Context) SetPluginMode(pluginMode bool) {
}
func (ctx *Context) addNode(obj interface{}) {
+ ctx.lock.Lock()
+ defer ctx.lock.Unlock()
+
node, err := convertToNode(obj)
if err != nil {
log.Log(log.ShimContext).Error("node conversion failed",
zap.Error(err))
@@ -166,6 +160,9 @@ func (ctx *Context) addNode(obj interface{}) {
}
func (ctx *Context) updateNode(oldObj, newObj interface{}) {
+ ctx.lock.Lock()
+ defer ctx.lock.Unlock()
+
// we only trigger update when resource changes
oldNode, err := convertToNode(oldObj)
if err != nil {
@@ -189,6 +186,9 @@ func (ctx *Context) updateNode(oldObj, newObj interface{}) {
}
func (ctx *Context) deleteNode(obj interface{}) {
+ ctx.lock.Lock()
+ defer ctx.lock.Unlock()
+
var node *v1.Node
switch t := obj.(type) {
case *v1.Node:
@@ -217,25 +217,90 @@ func (ctx *Context) deleteNode(obj interface{}) {
fmt.Sprintf("node %s is deleted from the scheduler", node.Name))
}
-func (ctx *Context) addPodToCache(obj interface{}) {
+func (ctx *Context) addPod(obj interface{}) {
pod, err := utils.Convert2Pod(obj)
if err != nil {
- log.Log(log.ShimContext).Error("failed to add pod to cache",
zap.Error(err))
+ log.Log(log.ShimContext).Error("failed to add pod",
zap.Error(err))
+ return
+ }
+ if utils.GetApplicationIDFromPod(pod) == "" {
+ ctx.updateForeignPod(pod)
+ } else {
+ ctx.updateYuniKornPod(pod)
+ }
+}
+
+func (ctx *Context) updatePod(_, newObj interface{}) {
+ pod, err := utils.Convert2Pod(newObj)
+ if err != nil {
+ log.Log(log.ShimContext).Error("failed to update pod",
zap.Error(err))
return
}
+ if utils.GetApplicationIDFromPod(pod) == "" {
+ ctx.updateForeignPod(pod)
+ } else {
+ ctx.updateYuniKornPod(pod)
+ }
+}
- // treat a terminated pod like a removal
+func (ctx *Context) updateYuniKornPod(pod *v1.Pod) {
+ ctx.lock.Lock()
+ defer ctx.lock.Unlock()
+
+ // treat terminated pods like a remove
if utils.IsPodTerminated(pod) {
- log.Log(log.ShimContext).Debug("Request to add terminated pod,
removing from cache", zap.String("podName", pod.Name))
+ log.Log(log.ShimContext).Debug("Request to update terminated
pod, removing from cache", zap.String("podName", pod.Name))
ctx.schedulerCache.RemovePod(pod)
return
}
+ ctx.schedulerCache.UpdatePod(pod)
+}
- log.Log(log.ShimContext).Debug("adding pod to cache",
zap.String("podName", pod.Name))
- ctx.schedulerCache.AddPod(pod)
+func (ctx *Context) updateForeignPod(pod *v1.Pod) {
+ ctx.lock.Lock()
+ defer ctx.lock.Unlock()
+
+ podStatusBefore := ""
+ oldPod, ok := ctx.schedulerCache.GetPod(string(pod.UID))
+ if ok {
+ podStatusBefore = string(oldPod.Status.Phase)
+ }
+
+ // conditions for allocate:
+ // 1. pod was previously assigned
+ // 2. pod is now assigned
+ // 3. pod is not in terminated state
+ if oldPod == nil && utils.IsAssignedPod(pod) &&
!utils.IsPodTerminated(pod) {
+ log.Log(log.ShimContext).Debug("pod is assigned to a node,
trigger occupied resource update",
+ zap.String("namespace", pod.Namespace),
+ zap.String("podName", pod.Name),
+ zap.String("podStatusBefore", podStatusBefore),
+ zap.String("podStatusCurrent",
string(pod.Status.Phase)))
+ podResource := common.GetPodResource(pod)
+ ctx.nodes.updateNodeOccupiedResources(pod.Spec.NodeName,
podResource, AddOccupiedResource)
+ ctx.schedulerCache.AddPod(pod)
+ return
+ }
+
+ // conditions for release:
+ // 1. pod was previously assigned
+ // 2. pod is now in a terminated state
+ if oldPod != nil && utils.IsPodTerminated(pod) {
+ log.Log(log.ShimContext).Debug("pod terminated, trigger
occupied resource update",
+ zap.String("namespace", pod.Namespace),
+ zap.String("podName", pod.Name),
+ zap.String("podStatusBefore", podStatusBefore),
+ zap.String("podStatusCurrent",
string(pod.Status.Phase)))
+ // this means pod is terminated
+ // we need sub the occupied resource and re-sync with the
scheduler-core
+ podResource := common.GetPodResource(pod)
+ ctx.nodes.updateNodeOccupiedResources(pod.Spec.NodeName,
podResource, SubOccupiedResource)
+ ctx.schedulerCache.RemovePod(pod)
+ return
+ }
}
-func (ctx *Context) removePodFromCache(obj interface{}) {
+func (ctx *Context) deletePod(obj interface{}) {
var pod *v1.Pod
switch t := obj.(type) {
case *v1.Pod:
@@ -252,39 +317,47 @@ func (ctx *Context) removePodFromCache(obj interface{}) {
return
}
+ if utils.GetApplicationIDFromPod(pod) == "" {
+ ctx.deleteForeignPod(pod)
+ } else {
+ ctx.deleteYuniKornPod(pod)
+ }
+}
+
+func (ctx *Context) deleteYuniKornPod(pod *v1.Pod) {
+ ctx.lock.Lock()
+ defer ctx.lock.Unlock()
+
log.Log(log.ShimContext).Debug("removing pod from cache",
zap.String("podName", pod.Name))
ctx.schedulerCache.RemovePod(pod)
}
-func (ctx *Context) updatePodInCache(oldObj, newObj interface{}) {
- _, err := utils.Convert2Pod(oldObj)
- if err != nil {
- log.Log(log.ShimContext).Error("failed to update pod in cache",
zap.Error(err))
- return
- }
- newPod, err := utils.Convert2Pod(newObj)
- if err != nil {
- log.Log(log.ShimContext).Error("failed to update pod in cache",
zap.Error(err))
- return
- }
+func (ctx *Context) deleteForeignPod(pod *v1.Pod) {
+ ctx.lock.Lock()
+ defer ctx.lock.Unlock()
- // treat terminated pods like a remove
- if utils.IsPodTerminated(newPod) {
- log.Log(log.ShimContext).Debug("Request to update terminated
pod, removing from cache", zap.String("podName", newPod.Name))
- ctx.schedulerCache.RemovePod(newPod)
+ oldPod, ok := ctx.schedulerCache.GetPod(string(pod.UID))
+ if !ok {
+ // if pod is not in scheduler cache, no node updates are needed
+ log.Log(log.ShimContext).Debug("unknown foreign pod deleted, no
resource updated needed",
+ zap.String("namespace", pod.Namespace),
+ zap.String("podName", pod.Name))
return
}
- ctx.schedulerCache.UpdatePod(newPod)
-}
-
-// filter pods by scheduler name and state
-func (ctx *Context) filterPods(obj interface{}) bool {
- switch obj := obj.(type) {
- case *v1.Pod:
- return utils.GetApplicationIDFromPod(obj) != ""
- default:
- return false
+ // conditions for release:
+ // 1. pod is already assigned to a node
+ if oldPod != nil {
+ log.Log(log.ShimContext).Debug("foreign pod deleted, triggering
occupied resource update",
+ zap.String("namespace", pod.Namespace),
+ zap.String("podName", pod.Name),
+ zap.String("podStatusBefore",
string(oldPod.Status.Phase)),
+ zap.String("podStatusCurrent",
string(pod.Status.Phase)))
+ // this means pod is terminated
+ // we need sub the occupied resource and re-sync with the
scheduler-core
+ podResource := common.GetPodResource(pod)
+ ctx.nodes.updateNodeOccupiedResources(pod.Spec.NodeName,
podResource, SubOccupiedResource)
+ ctx.schedulerCache.RemovePod(pod)
}
}
@@ -367,6 +440,9 @@ func (ctx *Context) filterPriorityClasses(obj interface{})
bool {
}
func (ctx *Context) addPriorityClass(obj interface{}) {
+ ctx.lock.Lock()
+ defer ctx.lock.Unlock()
+
log.Log(log.ShimContext).Debug("priority class added")
priorityClass := utils.Convert2PriorityClass(obj)
if priorityClass != nil {
@@ -375,6 +451,9 @@ func (ctx *Context) addPriorityClass(obj interface{}) {
}
func (ctx *Context) updatePriorityClass(_, newObj interface{}) {
+ ctx.lock.Lock()
+ defer ctx.lock.Unlock()
+
log.Log(log.ShimContext).Debug("priority class updated")
priorityClass := utils.Convert2PriorityClass(newObj)
if priorityClass != nil {
@@ -383,6 +462,9 @@ func (ctx *Context) updatePriorityClass(_, newObj
interface{}) {
}
func (ctx *Context) deletePriorityClass(obj interface{}) {
+ ctx.lock.Lock()
+ defer ctx.lock.Unlock()
+
log.Log(log.ShimContext).Debug("priorityClass deleted")
var priorityClass *schedulingv1.PriorityClass
switch t := obj.(type) {
diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go
index 599be918..a97aced1 100644
--- a/pkg/cache/context_test.go
+++ b/pkg/cache/context_test.go
@@ -48,6 +48,10 @@ import (
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
+const (
+ Host1 = "HOST1"
+)
+
var (
testGroups = []string{"dev", "yunikorn"}
)
@@ -334,49 +338,7 @@ func TestRemoveApplicationInternal(t *testing.T) {
assert.Equal(t, ok, false)
}
-func TestFilterPods(t *testing.T) {
- context := initContextForTest()
- pod1 := &v1.Pod{
- TypeMeta: apis.TypeMeta{
- Kind: "Pod",
- APIVersion: "v1",
- },
- ObjectMeta: apis.ObjectMeta{
- Name: "yunikorn-test-00001",
- UID: "UID-00001",
- },
- Spec: v1.PodSpec{SchedulerName: "yunikorn"},
- }
- pod2 := &v1.Pod{
- TypeMeta: apis.TypeMeta{
- Kind: "Pod",
- APIVersion: "v1",
- },
- ObjectMeta: apis.ObjectMeta{
- Name: "yunikorn-test-00002",
- UID: "UID-00002",
- },
- Spec: v1.PodSpec{SchedulerName: "default-scheduler"},
- }
- pod3 := &v1.Pod{
- TypeMeta: apis.TypeMeta{
- Kind: "Pod",
- APIVersion: "v1",
- },
- ObjectMeta: apis.ObjectMeta{
- Name: "yunikorn-test-00003",
- UID: "UID-00003",
- Labels: map[string]string{"applicationId":
"test-00003"},
- },
- Spec: v1.PodSpec{SchedulerName: "yunikorn"},
- }
- assert.Check(t, !context.filterPods(nil), "nil object was allowed")
- assert.Check(t, !context.filterPods(pod1), "yunikorn-managed pod with
no app id was allowed")
- assert.Check(t, !context.filterPods(pod2), "non-yunikorn-managed pod
was allowed")
- assert.Check(t, context.filterPods(pod3), "yunikorn-managed pod was
filtered")
-}
-
-func TestAddPodToCache(t *testing.T) {
+func TestAddPod(t *testing.T) {
context := initContextForTest()
pod1 := &v1.Pod{
@@ -387,6 +349,9 @@ func TestAddPodToCache(t *testing.T) {
ObjectMeta: apis.ObjectMeta{
Name: "yunikorn-test-00001",
UID: "UID-00001",
+ Annotations: map[string]string{
+ constants.AnnotationApplicationID:
"yunikorn-test-00001",
+ },
},
Spec: v1.PodSpec{SchedulerName: "yunikorn"},
}
@@ -398,6 +363,9 @@ func TestAddPodToCache(t *testing.T) {
ObjectMeta: apis.ObjectMeta{
Name: "yunikorn-test-00002",
UID: "UID-00002",
+ Annotations: map[string]string{
+ constants.AnnotationApplicationID:
"yunikorn-test-00002",
+ },
},
Spec: v1.PodSpec{SchedulerName: "yunikorn"},
Status: v1.PodStatus{
@@ -405,9 +373,9 @@ func TestAddPodToCache(t *testing.T) {
},
}
- context.addPodToCache(nil) // no-op, but should not crash
- context.addPodToCache(pod1) // should be added
- context.addPodToCache(pod2) // should skip as pod is terminated
+ context.addPod(nil) // no-op, but should not crash
+ context.addPod(pod1) // should be added
+ context.addPod(pod2) // should skip as pod is terminated
_, ok := context.schedulerCache.GetPod("UID-00001")
assert.Check(t, ok, "active pod was not added")
@@ -415,7 +383,7 @@ func TestAddPodToCache(t *testing.T) {
assert.Check(t, !ok, "terminated pod was added")
}
-func TestUpdatePodInCache(t *testing.T) {
+func TestUpdatePod(t *testing.T) {
context := initContextForTest()
pod1 := &v1.Pod{
@@ -424,9 +392,12 @@ func TestUpdatePodInCache(t *testing.T) {
APIVersion: "v1",
},
ObjectMeta: apis.ObjectMeta{
- Name: "yunikorn-test-00001",
- UID: "UID-00001",
- Annotations: map[string]string{"test.state": "new"},
+ Name: "yunikorn-test-00001",
+ UID: "UID-00001",
+ Annotations: map[string]string{
+ constants.AnnotationApplicationID:
"yunikorn-test-00001",
+ "test.state": "new",
+ },
},
Spec: v1.PodSpec{SchedulerName: "yunikorn"},
}
@@ -436,9 +407,12 @@ func TestUpdatePodInCache(t *testing.T) {
APIVersion: "v1",
},
ObjectMeta: apis.ObjectMeta{
- Name: "yunikorn-test-00001",
- UID: "UID-00001",
- Annotations: map[string]string{"test.state": "updated"},
+ Name: "yunikorn-test-00001",
+ UID: "UID-00001",
+ Annotations: map[string]string{
+ constants.AnnotationApplicationID:
"yunikorn-test-00001",
+ "test.state": "updated",
+ },
},
Spec: v1.PodSpec{SchedulerName: "yunikorn"},
}
@@ -450,6 +424,9 @@ func TestUpdatePodInCache(t *testing.T) {
ObjectMeta: apis.ObjectMeta{
Name: "yunikorn-test-00001",
UID: "UID-00001",
+ Annotations: map[string]string{
+ constants.AnnotationApplicationID:
"yunikorn-test-00001",
+ },
},
Spec: v1.PodSpec{SchedulerName: "yunikorn"},
Status: v1.PodStatus{
@@ -457,29 +434,29 @@ func TestUpdatePodInCache(t *testing.T) {
},
}
- context.addPodToCache(pod1)
+ context.addPod(pod1)
_, ok := context.schedulerCache.GetPod("UID-00001")
assert.Assert(t, ok, "pod1 is not present after adding")
// these should not fail, but are no-ops
- context.updatePodInCache(nil, nil)
- context.updatePodInCache(nil, pod1)
- context.updatePodInCache(pod1, nil)
+ context.updatePod(nil, nil)
+ context.updatePod(nil, pod1)
+ context.updatePod(pod1, nil)
// ensure a terminated pod is removed
- context.updatePodInCache(pod1, pod3)
+ context.updatePod(pod1, pod3)
_, ok = context.schedulerCache.GetPod("UID-00001")
assert.Check(t, !ok, "pod still found after termination")
// ensure a non-terminated pod is updated
- context.updatePodInCache(pod1, pod2)
+ context.updatePod(pod1, pod2)
found, ok := context.schedulerCache.GetPod("UID-00001")
if assert.Check(t, ok, "pod not found after update") {
assert.Check(t, found.GetAnnotations()["test.state"] ==
"updated", "pod state not updated")
}
}
-func TestRemovePodFromCache(t *testing.T) {
+func TestDeletePod(t *testing.T) {
context := initContextForTest()
pod1 := &v1.Pod{
@@ -490,6 +467,9 @@ func TestRemovePodFromCache(t *testing.T) {
ObjectMeta: apis.ObjectMeta{
Name: "yunikorn-test-00001",
UID: "UID-00001",
+ Annotations: map[string]string{
+ constants.AnnotationApplicationID:
"yunikorn-test-00001",
+ },
},
Spec: v1.PodSpec{SchedulerName: "yunikorn"},
}
@@ -501,30 +481,222 @@ func TestRemovePodFromCache(t *testing.T) {
ObjectMeta: apis.ObjectMeta{
Name: "yunikorn-test-00002",
UID: "UID-00002",
+ Annotations: map[string]string{
+ constants.AnnotationApplicationID:
"yunikorn-test-00002",
+ },
},
Spec: v1.PodSpec{SchedulerName: "yunikorn"},
}
- context.addPodToCache(pod1)
- context.addPodToCache(pod2)
+ context.addPod(pod1)
+ context.addPod(pod2)
_, ok := context.schedulerCache.GetPod("UID-00001")
assert.Assert(t, ok, "pod1 is not present after adding")
_, ok = context.schedulerCache.GetPod("UID-00002")
assert.Assert(t, ok, "pod2 is not present after adding")
// these should not fail, but here for completeness
- context.removePodFromCache(nil)
- context.removePodFromCache(cache.DeletedFinalStateUnknown{Key:
"UID-00000", Obj: nil})
+ context.deletePod(nil)
+ context.deletePod(cache.DeletedFinalStateUnknown{Key: "UID-00000", Obj:
nil})
- context.removePodFromCache(pod1)
+ context.deletePod(pod1)
_, ok = context.schedulerCache.GetPod("UID-00001")
assert.Check(t, !ok, "pod1 is still present")
- context.removePodFromCache(cache.DeletedFinalStateUnknown{Key:
"UID-00002", Obj: pod2})
+ context.deletePod(cache.DeletedFinalStateUnknown{Key: "UID-00002", Obj:
pod2})
_, ok = context.schedulerCache.GetPod("UID-00002")
assert.Check(t, !ok, "pod2 is still present")
}
+//nolint:funlen
+func TestAddUpdatePodForeign(t *testing.T) {
+ mockedSchedulerApi := newMockSchedulerAPI()
+ context := initContextForTest()
+ context.nodes = newSchedulerNodes(mockedSchedulerApi,
NewTestSchedulerCache())
+ host1 := nodeForTest(Host1, "10G", "10")
+ context.nodes.addNode(host1)
+
+ executed := false
+ expectAdd := false
+ expectRemove := false
+ tc := ""
+
+ mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
+ executed = true
+ assert.Equal(t, len(request.Nodes), 1, "%s: wrong node count",
tc)
+ updatedNode := request.Nodes[0]
+ assert.Equal(t, updatedNode.NodeID, Host1, "%s: wrong nodeID",
tc)
+ assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE, "%s:
wrong action", tc)
+ assert.Equal(t,
updatedNode.SchedulableResource.Resources[siCommon.Memory].Value,
int64(10000*1000*1000), "%s: wrong schedulable memory", tc)
+ assert.Equal(t,
updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000),
"%s: wrong schedulable cpu", tc)
+ if expectAdd {
+ assert.Equal(t,
updatedNode.OccupiedResource.Resources[siCommon.Memory].Value,
int64(1000*1000*1000), "%s: wrong occupied memory (add)", tc)
+ assert.Equal(t,
updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(500), "%s:
wrong occupied cpu (add)", tc)
+ }
+ if expectRemove {
+ assert.Equal(t,
updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(0), "%s:
wrong occupied memory (remove)", tc)
+ assert.Equal(t,
updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(0), "%s:
wrong occupied cpu (remove)", tc)
+ }
+ return nil
+ }
+
+ // pod is not assigned to any node
+ pod1 := foreignPod("pod1", "1G", "500m")
+ pod1.Status.Phase = v1.PodPending
+ pod1.Spec.NodeName = ""
+
+ // validate add
+ tc = "add-pod1"
+ executed = false
+ expectAdd = false
+ expectRemove = false
+ context.addPod(pod1)
+ assert.Assert(t, !executed, "unexpected update")
+ _, ok := context.schedulerCache.GetPod(string(pod1.UID))
+ assert.Assert(t, !ok, "unassigned pod found in cache")
+
+ // validate update
+ tc = "update-pod1"
+ executed = false
+ expectAdd = false
+ expectRemove = false
+ context.updatePod(nil, pod1)
+ assert.Assert(t, !executed, "unexpected update")
+ _, ok = context.schedulerCache.GetPod(string(pod1.UID))
+ assert.Assert(t, !ok, "unassigned pod found in cache")
+
+ // pod is assigned to a node but still in pending state, should update
+ pod2 := foreignPod("pod2", "1G", "500m")
+ pod2.Status.Phase = v1.PodPending
+ pod2.Spec.NodeName = Host1
+
+ // validate add
+ tc = "add-pod2"
+ executed = false
+ expectAdd = true
+ expectRemove = false
+ context.addPod(pod2)
+ assert.Assert(t, executed, "update not executed")
+ _, ok = context.schedulerCache.GetPod(string(pod2.UID))
+ assert.Assert(t, ok, "pod not found in cache")
+
+ // validate update
+ tc = "update-pod2"
+ executed = false
+ expectAdd = false
+ expectRemove = false
+ context.updatePod(nil, pod2)
+ assert.Assert(t, !executed, "unexpected update")
+ _, ok = context.schedulerCache.GetPod(string(pod2.UID))
+ assert.Assert(t, ok, "pod not found in cache")
+
+ // validate update when not already in cache
+ tc = "update-pod2-nocache-pre"
+ executed = false
+ expectAdd = false
+ expectRemove = true
+ context.deletePod(pod2)
+ tc = "update-pod2-nocache"
+ executed = false
+ expectAdd = true
+ expectRemove = false
+ context.updatePod(nil, pod2)
+ assert.Assert(t, executed, "update not executed")
+ _, ok = context.schedulerCache.GetPod(string(pod2.UID))
+ assert.Assert(t, ok, "pod not found in cache")
+
+ // pod is failed, should trigger update if already in cache
+ pod3 := pod2.DeepCopy()
+ pod3.Status.Phase = v1.PodFailed
+
+ // validate add
+ tc = "add-pod3"
+ executed = false
+ expectAdd = false
+ expectRemove = true
+ context.addPod(pod3)
+ assert.Assert(t, executed, "update not executed")
+ _, ok = context.schedulerCache.GetPod(string(pod3.UID))
+ assert.Assert(t, !ok, "failed pod found in cache")
+
+ // validate update when not already in cache
+ tc = "update-pod3-pre"
+ executed = false
+ expectAdd = true
+ expectRemove = false
+ context.addPod(pod2)
+ tc = "update-pod3"
+ executed = false
+ expectAdd = false
+ expectRemove = true
+ context.updatePod(nil, pod3)
+ assert.Assert(t, executed, "update not executed")
+ _, ok = context.schedulerCache.GetPod(string(pod3.UID))
+ assert.Assert(t, !ok, "failed pod found in cache")
+}
+
+func TestDeletePodForeign(t *testing.T) {
+ mockedSchedulerApi := newMockSchedulerAPI()
+ context := initContextForTest()
+ context.nodes = newSchedulerNodes(mockedSchedulerApi,
NewTestSchedulerCache())
+ host1 := nodeForTest(Host1, "10G", "10")
+ context.nodes.addNode(host1)
+
+ executed := false
+ expectAdd := false
+ expectRemove := false
+ tc := ""
+
+ mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
+ executed = true
+ assert.Equal(t, len(request.Nodes), 1, "%s: wrong node count",
tc)
+ updatedNode := request.Nodes[0]
+ assert.Equal(t, updatedNode.NodeID, Host1, "%s: wrong nodeID",
tc)
+ assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE, "%s:
wrong action", tc)
+ assert.Equal(t,
updatedNode.SchedulableResource.Resources[siCommon.Memory].Value,
int64(10000*1000*1000), "%s: wrong schedulable memory", tc)
+ assert.Equal(t,
updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000),
"%s: wrong schedulable cpu", tc)
+ if expectAdd {
+ assert.Equal(t,
updatedNode.OccupiedResource.Resources[siCommon.Memory].Value,
int64(1000*1000*1000), "%s: wrong occupied memory (add)", tc)
+ assert.Equal(t,
updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(500), "%s:
wrong occupied cpu (add)", tc)
+ }
+ if expectRemove {
+ assert.Equal(t,
updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(0), "%s:
wrong occupied memory (remove)", tc)
+ assert.Equal(t,
updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(0), "%s:
wrong occupied cpu (remove)", tc)
+ }
+ return nil
+ }
+
+ // add existing pod
+ pod1 := foreignPod("pod1", "1G", "500m")
+ pod1.Status.Phase = v1.PodRunning
+ pod1.Spec.NodeName = Host1
+
+ // validate deletion of existing assigned pod
+ tc = "delete-pod1-pre"
+ executed = false
+ expectAdd = true
+ expectRemove = false
+ context.addPod(pod1)
+ tc = "delete-pod1"
+ executed = false
+ expectAdd = false
+ expectRemove = true
+ context.deletePod(pod1)
+ assert.Assert(t, executed, "update not executed")
+ _, ok := context.schedulerCache.GetPod(string(pod1.UID))
+ assert.Assert(t, !ok, "deleted pod found in cache")
+
+ // validate delete when not already found
+ tc = "delete-pod1-again"
+ executed = false
+ expectAdd = false
+ expectRemove = false
+ context.deletePod(pod1)
+ assert.Assert(t, !executed, "unexpected update")
+ _, ok = context.schedulerCache.GetPod(string(pod1.UID))
+ assert.Assert(t, !ok, "deleted pod found in cache")
+}
+
func TestAddTask(t *testing.T) {
context := initContextForTest()
@@ -1174,9 +1346,9 @@ func TestAddApplicationsWithTags(t *testing.T) {
ObjectMeta: apis.ObjectMeta{
Name: "test2",
Annotations: map[string]string{
- constants.NamespaceQuota: "{\"cpu\":
\"1\", \"memory\": \"256M\", \"nvidia.com/gpu\": \"1\"}",
+ constants.NamespaceQuota:
"{\"cpu\": \"1\", \"memory\": \"256M\", \"nvidia.com/gpu\": \"1\"}",
constants.DomainYuniKorn + "parentqueue":
"root.test",
- constants.NamespaceGuaranteed: "{\"cpu\":
\"1\", \"memory\": \"256M\", \"nvidia.com/gpu\": \"1\"}",
+ constants.NamespaceGuaranteed:
"{\"cpu\": \"1\", \"memory\": \"256M\", \"nvidia.com/gpu\": \"1\"}",
},
},
}
@@ -1412,10 +1584,13 @@ func TestGetStateDump(t *testing.T) {
Namespace: "default",
Name: "yunikorn-test-00001",
UID: "UID-00001",
+ Annotations: map[string]string{
+ constants.AnnotationApplicationID:
"yunikorn-test-00001",
+ },
},
Spec: v1.PodSpec{SchedulerName: "yunikorn"},
}
- context.addPodToCache(pod1)
+ context.addPod(pod1)
stateDumpStr, err := context.GetStateDump()
assert.NilError(t, err, "error during state dump")
@@ -1597,3 +1772,51 @@ func waitForNodeAcceptedEvent(recorder
*k8sEvents.FakeRecorder) error {
}, 10*time.Millisecond, time.Second)
return err
}
+
+func nodeForTest(nodeID, memory, cpu string) *v1.Node {
+ resourceList := make(map[v1.ResourceName]resource.Quantity)
+ resourceList[v1.ResourceName("memory")] = resource.MustParse(memory)
+ resourceList[v1.ResourceName("cpu")] = resource.MustParse(cpu)
+ return &v1.Node{
+ TypeMeta: apis.TypeMeta{
+ Kind: "Node",
+ APIVersion: "v1",
+ },
+ ObjectMeta: apis.ObjectMeta{
+ Name: nodeID,
+ Namespace: "default",
+ UID: "uid_0001",
+ },
+ Spec: v1.NodeSpec{},
+ Status: v1.NodeStatus{
+ Allocatable: resourceList,
+ },
+ }
+}
+
+func foreignPod(podName, memory, cpu string) *v1.Pod {
+ containers := make([]v1.Container, 0)
+ c1Resources := make(map[v1.ResourceName]resource.Quantity)
+ c1Resources[v1.ResourceMemory] = resource.MustParse(memory)
+ c1Resources[v1.ResourceCPU] = resource.MustParse(cpu)
+ containers = append(containers, v1.Container{
+ Name: "container-01",
+ Resources: v1.ResourceRequirements{
+ Requests: c1Resources,
+ },
+ })
+
+ return &v1.Pod{
+ TypeMeta: apis.TypeMeta{
+ Kind: "Pod",
+ APIVersion: "v1",
+ },
+ ObjectMeta: apis.ObjectMeta{
+ Name: podName,
+ UID: types.UID(podName),
+ },
+ Spec: v1.PodSpec{
+ Containers: containers,
+ },
+ }
+}
diff --git a/pkg/cache/node_coordinator.go b/pkg/cache/node_coordinator.go
deleted file mode 100644
index 7dbd20fc..00000000
--- a/pkg/cache/node_coordinator.go
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-*/
-
-package cache
-
-import (
- "go.uber.org/zap"
- v1 "k8s.io/api/core/v1"
- k8sCache "k8s.io/client-go/tools/cache"
-
- "github.com/apache/yunikorn-k8shim/pkg/common"
- "github.com/apache/yunikorn-k8shim/pkg/common/utils"
- "github.com/apache/yunikorn-k8shim/pkg/log"
-)
-
-// nodeResourceCoordinator looks at the resources that are not allocated by
yunikorn,
-// and refresh scheduler cache to keep nodes' capacity in-sync.
-// this coordinator only looks after the pods that are not scheduled by
yunikorn,
-// and it registers update/delete handler to the pod informer. It ensures that
the
-// following operations are done
-// 1. when a pod is becoming Running, add occupied node resource
-// 2. when a pod is terminated, sub the occupied node resource
-// 3. when a pod is deleted, sub the occupied node resource
-//
-// each of these updates will trigger a node UPDATE action to update the
occupied
-// resource in the scheduler-core.
-type nodeResourceCoordinator struct {
- nodes *schedulerNodes
-}
-
-func newNodeResourceCoordinator(nodes *schedulerNodes)
*nodeResourceCoordinator {
- return &nodeResourceCoordinator{nodes}
-}
-
-// filter pods that not scheduled by us
-func (c *nodeResourceCoordinator) filterPods(obj interface{}) bool {
- switch obj := obj.(type) {
- case *v1.Pod:
- return utils.GetApplicationIDFromPod(obj) == ""
- default:
- return false
- }
-}
-
-func (c *nodeResourceCoordinator) updatePod(old, new interface{}) {
- oldPod, err := utils.Convert2Pod(old)
- if err != nil {
- log.Log(log.ShimCacheNode).Error("expecting a pod object",
zap.Error(err))
- return
- }
-
- newPod, err := utils.Convert2Pod(new)
- if err != nil {
- log.Log(log.ShimCacheNode).Error("expecting a pod object",
zap.Error(err))
- return
- }
-
- // this handles the allocate and release of a pod that not scheduled by
yunikorn
- // the check is triggered when a pod status changes
- // conditions for allocate:
- // 1. pod got assigned to a node
- // 2. pod is not in terminated state
- if !utils.IsAssignedPod(oldPod) && utils.IsAssignedPod(newPod) &&
!utils.IsPodTerminated(newPod) {
- log.Log(log.ShimCacheNode).Debug("pod is assigned to a node,
trigger occupied resource update",
- zap.String("namespace", newPod.Namespace),
- zap.String("podName", newPod.Name),
- zap.String("podStatusBefore",
string(oldPod.Status.Phase)),
- zap.String("podStatusCurrent",
string(newPod.Status.Phase)))
- // if pod is running but not scheduled by us,
- // we need to notify scheduler-core to re-sync the node resource
- podResource := common.GetPodResource(newPod)
- c.nodes.updateNodeOccupiedResources(newPod.Spec.NodeName,
podResource, AddOccupiedResource)
- c.nodes.cache.AddPod(newPod)
- return
- }
-
- // conditions for release:
- // 1. pod is already assigned to a node
- // 2. pod status changes from non-terminated to terminated state
- if utils.IsAssignedPod(newPod) && oldPod.Status.Phase !=
newPod.Status.Phase && utils.IsPodTerminated(newPod) {
- log.Log(log.ShimCacheNode).Debug("pod terminated, trigger
occupied resource update",
- zap.String("namespace", newPod.Namespace),
- zap.String("podName", newPod.Name),
- zap.String("podStatusBefore",
string(oldPod.Status.Phase)),
- zap.String("podStatusCurrent",
string(newPod.Status.Phase)))
- // this means pod is terminated
- // we need sub the occupied resource and re-sync with the
scheduler-core
- podResource := common.GetPodResource(newPod)
- c.nodes.updateNodeOccupiedResources(newPod.Spec.NodeName,
podResource, SubOccupiedResource)
- c.nodes.cache.RemovePod(newPod)
- return
- }
-}
-
-func (c *nodeResourceCoordinator) deletePod(obj interface{}) {
- var pod *v1.Pod
- switch t := obj.(type) {
- case *v1.Pod:
- pod = t
- case k8sCache.DeletedFinalStateUnknown:
- var err error
- pod, err = utils.Convert2Pod(t.Obj)
- if err != nil {
- log.Log(log.ShimCacheNode).Error(err.Error())
- return
- }
- default:
- log.Log(log.ShimCacheNode).Error("cannot convert to pod")
- return
- }
-
- // if pod is already terminated, that means the updates have already
done
- if utils.IsPodTerminated(pod) {
- log.Log(log.ShimCacheNode).Debug("pod is already terminated,
occupied resource updated should have already been done")
- return
- }
-
- log.Log(log.ShimCacheNode).Info("deleting pod that scheduled by other
schedulers",
- zap.String("namespace", pod.Namespace),
- zap.String("podName", pod.Name))
-
- podResource := common.GetPodResource(pod)
- c.nodes.updateNodeOccupiedResources(pod.Spec.NodeName, podResource,
SubOccupiedResource)
- c.nodes.cache.RemovePod(pod)
-}
diff --git a/pkg/cache/node_coordinator_test.go
b/pkg/cache/node_coordinator_test.go
deleted file mode 100644
index 68a03f7a..00000000
--- a/pkg/cache/node_coordinator_test.go
+++ /dev/null
@@ -1,392 +0,0 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-*/
-
-package cache
-
-import (
- "testing"
-
- "gotest.tools/v3/assert"
- v1 "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/api/resource"
- apis "k8s.io/apimachinery/pkg/apis/meta/v1"
-
- siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
- "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
-)
-
-const (
- Host1 = "HOST1"
- Host2 = "HOST2"
- HostEmpty = ""
-)
-
-func PodForTest(podName, memory, cpu string) *v1.Pod {
- containers := make([]v1.Container, 0)
- c1Resources := make(map[v1.ResourceName]resource.Quantity)
- c1Resources[v1.ResourceMemory] = resource.MustParse(memory)
- c1Resources[v1.ResourceCPU] = resource.MustParse(cpu)
- containers = append(containers, v1.Container{
- Name: "container-01",
- Resources: v1.ResourceRequirements{
- Requests: c1Resources,
- },
- })
-
- return &v1.Pod{
- TypeMeta: apis.TypeMeta{
- Kind: "Pod",
- APIVersion: "v1",
- },
- ObjectMeta: apis.ObjectMeta{
- Name: podName,
- },
- Spec: v1.PodSpec{
- Containers: containers,
- },
- }
-}
-
-func NodeForTest(nodeID, memory, cpu string) *v1.Node {
- resourceList := make(map[v1.ResourceName]resource.Quantity)
- resourceList[v1.ResourceName("memory")] = resource.MustParse(memory)
- resourceList[v1.ResourceName("cpu")] = resource.MustParse(cpu)
- return &v1.Node{
- TypeMeta: apis.TypeMeta{
- Kind: "Node",
- APIVersion: "v1",
- },
- ObjectMeta: apis.ObjectMeta{
- Name: nodeID,
- Namespace: "default",
- UID: "uid_0001",
- },
- Spec: v1.NodeSpec{},
- Status: v1.NodeStatus{
- Allocatable: resourceList,
- },
- }
-}
-
-func TestUpdatePod(t *testing.T) {
- mockedSchedulerApi := newMockSchedulerAPI()
- nodes := newSchedulerNodes(mockedSchedulerApi, NewTestSchedulerCache())
- host1 := NodeForTest(Host1, "10G", "10")
- host2 := NodeForTest(Host2, "10G", "10")
- nodes.addNode(host1)
- nodes.addNode(host2)
- coordinator := newNodeResourceCoordinator(nodes)
-
- // pod is not assigned to any node
- // this won't trigger an update
- pod1 := PodForTest("pod1", "1G", "500m")
- pod2 := PodForTest("pod1", "1G", "500m")
- pod1.Status.Phase = v1.PodPending
- pod1.Status.Phase = v1.PodPending
- pod1.Spec.NodeName = ""
- pod2.Spec.NodeName = ""
- mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
- t.Fatalf("update should not run because state is not changed")
- return nil
- }
- coordinator.updatePod(pod1, pod2)
-
- // pod is already assigned to a node and state is running
- // this won't trigger an update
- pod1.Status.Phase = v1.PodRunning
- pod1.Status.Phase = v1.PodRunning
- pod1.Spec.NodeName = Host1
- pod2.Spec.NodeName = Host1
- mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
- t.Fatalf("update should not run because state is not changed")
- return nil
- }
- coordinator.updatePod(pod1, pod2)
-
- // pod state remains in Pending, pod is assigned to a node
- // this happens when the pod just gets allocated and started, but not
in running state yet
- // trigger an update
- pod1.Status.Phase = v1.PodPending
- pod2.Status.Phase = v1.PodPending
- pod1.Spec.NodeName = HostEmpty
- pod2.Spec.NodeName = Host1
- executed := false
- mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
- executed = true
- assert.Equal(t, len(request.Nodes), 1)
- updatedNode := request.Nodes[0]
- assert.Equal(t, updatedNode.NodeID, Host1)
- assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE)
- assert.Equal(t,
updatedNode.SchedulableResource.Resources[siCommon.Memory].Value,
int64(10000*1000*1000))
- assert.Equal(t,
updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000))
- assert.Equal(t,
updatedNode.OccupiedResource.Resources[siCommon.Memory].Value,
int64(1000*1000*1000))
- assert.Equal(t,
updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(500))
- return nil
- }
- coordinator.updatePod(pod1, pod2)
- assert.Assert(t, executed)
-
- // pod state changed from running to failed, pod terminated
- // trigger another update
- pod1.Status.Phase = v1.PodRunning
- pod2.Status.Phase = v1.PodFailed
- pod1.Spec.NodeName = Host1
- pod2.Spec.NodeName = Host1
- executed = false
- mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
- executed = true
- assert.Equal(t, len(request.Nodes), 1)
- updatedNode := request.Nodes[0]
- assert.Equal(t, updatedNode.NodeID, Host1)
- assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE)
- assert.Equal(t,
updatedNode.SchedulableResource.Resources[siCommon.Memory].Value,
int64(10000*1000*1000))
- assert.Equal(t,
updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000))
- assert.Equal(t,
updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(0))
- assert.Equal(t,
updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(0))
- return nil
- }
- coordinator.updatePod(pod1, pod2)
- assert.Assert(t, executed)
-
- // pod state changed from pending to running
- // this is not triggering a new update because the pod was already
allocated to a node
- pod1.Status.Phase = v1.PodPending
- pod2.Status.Phase = v1.PodRunning
- pod1.Spec.NodeName = Host2
- pod2.Spec.NodeName = Host2
- mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
- t.Fatalf("update should not run because pod is already
allocated")
- return nil
- }
- coordinator.updatePod(pod1, pod2)
-
- // pod state Running, pod is assigned to a node
- // trigger an update
- pod1.Status.Phase = v1.PodRunning
- pod2.Status.Phase = v1.PodRunning
- pod1.Spec.NodeName = ""
- pod2.Spec.NodeName = Host2
- executed = false
- mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
- executed = true
- assert.Equal(t, len(request.Nodes), 1)
- updatedNode := request.Nodes[0]
- assert.Equal(t, updatedNode.NodeID, Host2)
- assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE)
- assert.Equal(t,
updatedNode.SchedulableResource.Resources[siCommon.Memory].Value,
int64(10000*1000*1000))
- assert.Equal(t,
updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000))
- assert.Equal(t,
updatedNode.OccupiedResource.Resources[siCommon.Memory].Value,
int64(1000*1000*1000))
- assert.Equal(t,
updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(500))
- return nil
- }
- coordinator.updatePod(pod1, pod2)
- assert.Assert(t, executed)
-
- // pod state Running to Succeed, pod terminated
- // this should trigger an update
- pod1.Status.Phase = v1.PodRunning
- pod2.Status.Phase = v1.PodSucceeded
- pod1.Spec.NodeName = Host2
- pod2.Spec.NodeName = Host2
- executed = false
- mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
- executed = true
- assert.Equal(t, len(request.Nodes), 1)
- updatedNode := request.Nodes[0]
- assert.Equal(t, updatedNode.NodeID, Host2)
- assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE)
- assert.Equal(t,
updatedNode.SchedulableResource.Resources[siCommon.Memory].Value,
int64(10000*1000*1000))
- assert.Equal(t,
updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000))
- assert.Equal(t,
updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(0))
- assert.Equal(t,
updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(0))
- return nil
- }
- coordinator.updatePod(pod1, pod2)
- assert.Assert(t, executed)
-
- // pod gets assigned to a node, but pod status is already failed
- // this should not trigger an update
- pod1.Status.Phase = v1.PodFailed
- pod2.Status.Phase = v1.PodFailed
- pod1.Spec.NodeName = HostEmpty
- pod2.Spec.NodeName = Host2
- mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
- t.Fatalf("update should not run because pod is already
allocated")
- return nil
- }
- coordinator.updatePod(pod1, pod2)
-}
-
-func TestDeletePod(t *testing.T) {
- mockedSchedulerApi := newMockSchedulerAPI()
- nodes := newSchedulerNodes(mockedSchedulerApi, NewTestSchedulerCache())
- host1 := NodeForTest(Host1, "10G", "10")
- nodes.addNode(host1)
- coordinator := newNodeResourceCoordinator(nodes)
-
- // pod from pending to running
- // occupied resources should be added to the node
- pod1 := PodForTest("pod1", "1G", "500m")
- pod2 := PodForTest("pod1", "1G", "500m")
- pod1.Status.Phase = v1.PodPending
- pod2.Status.Phase = v1.PodRunning
- pod1.Spec.NodeName = HostEmpty
- pod2.Spec.NodeName = Host1
- executed := false
- mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
- executed = true
- assert.Equal(t, len(request.Nodes), 1)
- updatedNode := request.Nodes[0]
- assert.Equal(t, updatedNode.NodeID, Host1)
- assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE)
- assert.Equal(t,
updatedNode.SchedulableResource.Resources[siCommon.Memory].Value,
int64(10000*1000*1000))
- assert.Equal(t,
updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000))
- assert.Equal(t,
updatedNode.OccupiedResource.Resources[siCommon.Memory].Value,
int64(1000*1000*1000))
- assert.Equal(t,
updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(500))
- return nil
- }
- coordinator.updatePod(pod1, pod2)
- assert.Assert(t, executed)
-
- // delete pod from the running state
- executed = false
- mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
- executed = true
- assert.Equal(t, len(request.Nodes), 1)
- updatedNode := request.Nodes[0]
- assert.Equal(t, updatedNode.NodeID, Host1)
- assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE)
- assert.Equal(t,
updatedNode.SchedulableResource.Resources[siCommon.Memory].Value, int64(10000))
- assert.Equal(t,
updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000))
- assert.Equal(t,
updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(0))
- assert.Equal(t,
updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(0))
- return nil
- }
- coordinator.deletePod(pod1)
-}
-
-func TestDeleteTerminatedPod(t *testing.T) {
- mockedSchedulerApi := newMockSchedulerAPI()
- nodes := newSchedulerNodes(mockedSchedulerApi, NewTestSchedulerCache())
- host1 := NodeForTest(Host1, "10G", "10")
- nodes.addNode(host1)
- coordinator := newNodeResourceCoordinator(nodes)
-
- // pod from pending to running
- // occupied resources should be added to the node
- pod1 := PodForTest("pod1", "1G", "500m")
- pod2 := PodForTest("pod1", "1G", "500m")
- pod1.Status.Phase = v1.PodPending
- pod2.Status.Phase = v1.PodRunning
- pod1.Spec.NodeName = HostEmpty
- pod2.Spec.NodeName = Host1
- executed := false
- mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
- executed = true
- assert.Equal(t, len(request.Nodes), 1)
- updatedNode := request.Nodes[0]
- assert.Equal(t, updatedNode.NodeID, Host1)
- assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE)
- assert.Equal(t,
updatedNode.SchedulableResource.Resources[siCommon.Memory].Value,
int64(10000*1000*1000))
- assert.Equal(t,
updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000))
- assert.Equal(t,
updatedNode.OccupiedResource.Resources[siCommon.Memory].Value,
int64(1000*1000*1000))
- assert.Equal(t,
updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(500))
- return nil
- }
- coordinator.updatePod(pod1, pod2)
- assert.Assert(t, executed)
-
- // pod from running to succeed
- // occupied resources should be added to the node
- pod1.Status.Phase = v1.PodRunning
- pod2.Status.Phase = v1.PodSucceeded
- pod1.Spec.NodeName = Host1
- pod2.Spec.NodeName = Host1
- executed = false
- mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
- executed = true
- assert.Equal(t, len(request.Nodes), 1)
- updatedNode := request.Nodes[0]
- assert.Equal(t, updatedNode.NodeID, Host1)
- assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE)
- assert.Equal(t,
updatedNode.SchedulableResource.Resources[siCommon.Memory].Value,
int64(10000*1000*1000))
- assert.Equal(t,
updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000))
- assert.Equal(t,
updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(0))
- assert.Equal(t,
updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(0))
- return nil
- }
- coordinator.updatePod(pod1, pod2)
- assert.Assert(t, executed)
-
- // delete pod from the succeed state
- executed = false
- mockedSchedulerApi.UpdateNodeFn = func(request *si.NodeRequest) error {
- executed = true
- t.Fatalf("update should not be triggered as it should have
already done")
- return nil
- }
- coordinator.deletePod(pod2)
- assert.Equal(t, executed, false)
-}
-
-func TestNodeCoordinatorFilterPods(t *testing.T) {
- mockedSchedulerAPI := newMockSchedulerAPI()
- nodes := newSchedulerNodes(mockedSchedulerAPI, NewTestSchedulerCache())
- host1 := NodeForTest(Host1, "10G", "10")
- nodes.addNode(host1)
- coordinator := newNodeResourceCoordinator(nodes)
-
- pod1 := &v1.Pod{
- TypeMeta: apis.TypeMeta{
- Kind: "Pod",
- APIVersion: "v1",
- },
- ObjectMeta: apis.ObjectMeta{
- Name: "yunikorn-test-00001",
- UID: "UID-00001",
- },
- Spec: v1.PodSpec{SchedulerName: "yunikorn"},
- }
- pod2 := &v1.Pod{
- TypeMeta: apis.TypeMeta{
- Kind: "Pod",
- APIVersion: "v1",
- },
- ObjectMeta: apis.ObjectMeta{
- Name: "yunikorn-test-00002",
- UID: "UID-00002",
- },
- Spec: v1.PodSpec{SchedulerName: "default-scheduler"},
- }
- pod3 := &v1.Pod{
- TypeMeta: apis.TypeMeta{
- Kind: "Pod",
- APIVersion: "v1",
- },
- ObjectMeta: apis.ObjectMeta{
- Name: "yunikorn-test-00003",
- UID: "UID-00003",
- Labels: map[string]string{"applicationId":
"test-00003"},
- },
- Spec: v1.PodSpec{SchedulerName: "yunikorn"},
- }
- assert.Check(t, !coordinator.filterPods(nil), "nil object was allowed")
- assert.Check(t, coordinator.filterPods(pod1), "yunikorn-managed pod
with no app id was filtered")
- assert.Check(t, coordinator.filterPods(pod2), "non-yunikorn-managed pod
was filtered")
- assert.Check(t, !coordinator.filterPods(pod3), "yunikorn-managed pod
was allowed")
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]