This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push:
new 753e4f8a [YUNIKORN-2834] [shim] Add non-YuniKorn allocation tracking
logic (#918)
753e4f8a is described below
commit 753e4f8a3ad28e390cb947ada3acabebb32bc473
Author: Peter Bacsko <[email protected]>
AuthorDate: Wed Oct 16 09:36:05 2024 +0200
[YUNIKORN-2834] [shim] Add non-YuniKorn allocation tracking logic (#918)
Closes: #918
Signed-off-by: Peter Bacsko <[email protected]>
---
go.mod | 2 +-
go.sum | 4 +-
pkg/cache/context.go | 66 +++----
pkg/cache/context_test.go | 383 +++++++++++++++++++++-----------------
pkg/common/constants/constants.go | 1 +
pkg/common/si_helper.go | 46 +++++
pkg/common/si_helper_test.go | 78 ++++++++
pkg/shim/scheduler_mock_test.go | 1 -
8 files changed, 370 insertions(+), 211 deletions(-)
diff --git a/go.mod b/go.mod
index fc0ffd5d..b5dbbe8c 100644
--- a/go.mod
+++ b/go.mod
@@ -23,7 +23,7 @@ go 1.22.0
toolchain go1.22.5
require (
- github.com/apache/yunikorn-core v0.0.0-20241002095736-a2d3d43a145d
+ github.com/apache/yunikorn-core v0.0.0-20241003152125-4ea225160acf
github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240924203603-aaf51c93d3a0
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
diff --git a/go.sum b/go.sum
index c9f1c1bd..566b07b2 100644
--- a/go.sum
+++ b/go.sum
@@ -8,8 +8,8 @@ github.com/NYTimes/gziphandler v1.1.1
h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cq
github.com/NYTimes/gziphandler v1.1.1/go.mod
h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/antlr4-go/antlr/v4 v4.13.0
h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI=
github.com/antlr4-go/antlr/v4 v4.13.0/go.mod
h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g=
-github.com/apache/yunikorn-core v0.0.0-20241002095736-a2d3d43a145d
h1:awo2goBrw25P1aFNZgYJ0q7V+5ycMqMhvI60B75OzQg=
-github.com/apache/yunikorn-core v0.0.0-20241002095736-a2d3d43a145d/go.mod
h1:q6OXYpCTGvMJxsEorpIF6icKM/IioMmU6KcsclV1kI0=
+github.com/apache/yunikorn-core v0.0.0-20241003152125-4ea225160acf
h1:wKySiY4IA9Us287QRnIxFnuTHXaMSeQ3BhAwSrSW/sQ=
+github.com/apache/yunikorn-core v0.0.0-20241003152125-4ea225160acf/go.mod
h1:q6OXYpCTGvMJxsEorpIF6icKM/IioMmU6KcsclV1kI0=
github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240924203603-aaf51c93d3a0
h1:/9j0YXuifvoOl4YVEbO0r+DPkkYLzaQ+/ac+xCc7SY8=
github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240924203603-aaf51c93d3a0/go.mod
h1:co3uU98sj1CUTPNTM13lTyi+CY0DOgDndDW2KiUjktU=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5
h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
diff --git a/pkg/cache/context.go b/pkg/cache/context.go
index b4dca41e..613bbc52 100644
--- a/pkg/cache/context.go
+++ b/pkg/cache/context.go
@@ -365,12 +365,11 @@ func (ctx *Context) updateForeignPod(pod *v1.Pod) {
podStatusBefore = string(oldPod.Status.Phase)
}
- // conditions for allocate:
- // 1. pod was previously assigned
- // 2. pod is now assigned
- // 3. pod is not in terminated state
- // 4. pod references a known node
- if oldPod == nil && utils.IsAssignedPod(pod) &&
!utils.IsPodTerminated(pod) {
+ // conditions for allocate/update:
+ // 1. pod is now assigned
+ // 2. pod is not in terminated state
+ // 3. pod references a known node
+ if utils.IsAssignedPod(pod) && !utils.IsPodTerminated(pod) {
if ctx.schedulerCache.UpdatePod(pod) {
// pod was accepted by a real node
log.Log(log.ShimContext).Debug("pod is assigned to a
node, trigger occupied resource update",
@@ -378,10 +377,14 @@ func (ctx *Context) updateForeignPod(pod *v1.Pod) {
zap.String("podName", pod.Name),
zap.String("podStatusBefore", podStatusBefore),
zap.String("podStatusCurrent",
string(pod.Status.Phase)))
- ctx.updateNodeOccupiedResources(pod.Spec.NodeName,
pod.Namespace, pod.Name, common.GetPodResource(pod),
schedulercache.AddOccupiedResource)
+ allocReq := common.CreateAllocationForForeignPod(pod)
+ if err :=
ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(allocReq); err != nil {
+ log.Log(log.ShimContext).Error("failed to add
foreign allocation to the core",
+ zap.Error(err))
+ }
} else {
// pod is orphaned (references an unknown node)
- log.Log(log.ShimContext).Info("skipping occupied
resource update for assigned orphaned pod",
+ log.Log(log.ShimContext).Info("skipping updating
allocation for assigned orphaned pod",
zap.String("namespace", pod.Namespace),
zap.String("podName", pod.Name),
zap.String("nodeName", pod.Spec.NodeName))
@@ -401,9 +404,13 @@ func (ctx *Context) updateForeignPod(pod *v1.Pod) {
zap.String("podStatusBefore", podStatusBefore),
zap.String("podStatusCurrent",
string(pod.Status.Phase)))
// this means pod is terminated
- // we need sub the occupied resource and re-sync with
the scheduler-core
- ctx.updateNodeOccupiedResources(pod.Spec.NodeName,
pod.Namespace, pod.Name, common.GetPodResource(pod),
schedulercache.SubOccupiedResource)
+ // remove from the scheduler cache and create release
request to remove foreign allocation from the core
ctx.schedulerCache.RemovePod(pod)
+ releaseReq :=
common.CreateReleaseRequestForForeignPod(string(pod.UID),
constants.DefaultPartition)
+ if err :=
ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(releaseReq); err != nil
{
+ log.Log(log.ShimContext).Error("failed to
remove foreign allocation from the core",
+ zap.Error(err))
+ }
} else {
// pod is orphaned (references an unknown node)
log.Log(log.ShimContext).Info("skipping occupied
resource update for terminated orphaned pod",
@@ -453,40 +460,17 @@ func (ctx *Context) deleteYuniKornPod(pod *v1.Pod) {
func (ctx *Context) deleteForeignPod(pod *v1.Pod) {
ctx.lock.Lock()
defer ctx.lock.Unlock()
- oldPod := ctx.schedulerCache.GetPod(string(pod.UID))
- if oldPod == nil {
- // if pod is not in scheduler cache, no node updates are needed
- log.Log(log.ShimContext).Debug("unknown foreign pod deleted, no
resource updated needed",
- zap.String("namespace", pod.Namespace),
- zap.String("podName", pod.Name))
- return
+ releaseReq := common.CreateReleaseRequestForForeignPod(string(pod.UID),
constants.DefaultPartition)
+ if err :=
ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(releaseReq); err != nil
{
+ log.Log(log.ShimContext).Error("failed to remove foreign
allocation from the core",
+ zap.Error(err))
}
- // conditions for release:
- // 1. pod is already assigned to a node
- // 2. pod was not in a terminal state before
- // 3. pod references a known node
- if !utils.IsPodTerminated(oldPod) {
- if !ctx.schedulerCache.IsPodOrphaned(string(oldPod.UID)) {
- log.Log(log.ShimContext).Debug("foreign pod deleted,
triggering occupied resource update",
- zap.String("namespace", pod.Namespace),
- zap.String("podName", pod.Name),
- zap.String("podStatusBefore",
string(oldPod.Status.Phase)),
- zap.String("podStatusCurrent",
string(pod.Status.Phase)))
- // this means pod is terminated
- // we need sub the occupied resource and re-sync with
the scheduler-core
- ctx.updateNodeOccupiedResources(pod.Spec.NodeName,
pod.Namespace, pod.Name, common.GetPodResource(pod),
schedulercache.SubOccupiedResource)
- } else {
- // pod is orphaned (references an unknown node)
- log.Log(log.ShimContext).Info("skipping occupied
resource update for removed orphaned pod",
- zap.String("namespace", pod.Namespace),
- zap.String("podName", pod.Name),
- zap.String("nodeName", pod.Spec.NodeName))
- }
- ctx.schedulerCache.RemovePod(pod)
- }
+ log.Log(log.ShimContext).Debug("removing pod from cache",
zap.String("podName", pod.Name))
+ ctx.schedulerCache.RemovePod(pod)
}
+//nolint:unused
func (ctx *Context) updateNodeOccupiedResources(nodeName string, namespace
string, podName string, resource *si.Resource, opt schedulercache.UpdateType) {
if common.IsZero(resource) {
return
@@ -1613,7 +1597,7 @@ func (ctx *Context) decommissionNode(node *v1.Node) error
{
}
func (ctx *Context) updateNodeResources(node *v1.Node, capacity *si.Resource,
occupied *si.Resource) error {
- request := common.CreateUpdateRequestForUpdatedNode(node.Name,
capacity, occupied)
+ request := common.CreateUpdateRequestForUpdatedNode(node.Name,
capacity, nil)
return ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateNode(request)
}
diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go
index b5098872..a8214bcc 100644
--- a/pkg/cache/context_test.go
+++ b/pkg/cache/context_test.go
@@ -22,6 +22,7 @@ import (
"encoding/json"
"fmt"
"strings"
+ "sync/atomic"
"testing"
"time"
@@ -74,11 +75,13 @@ const (
taskUID4 = "task00004"
taskUnknown = "non_existing_taskID"
- podName1 = "pod1"
- podName2 = "pod2"
- podName3 = "pod3"
- podName4 = "pod4"
- podNamespace = "yk"
+ podName1 = "pod1"
+ podName2 = "pod2"
+ podName3 = "pod3"
+ podName4 = "pod4"
+ podForeignName = "foreign-1"
+ podForeignUID = "UUID-foreign-1"
+ podNamespace = "yk"
nodeName1 = "node1"
nodeName2 = "node2"
@@ -527,39 +530,11 @@ func TestAddUpdatePodForeign(t *testing.T) {
defer dispatcher.UnregisterAllEventHandlers()
defer dispatcher.Stop()
- executed := false
- expectAdd := false
- expectRemove := false
- tc := ""
-
- validatorFunc := func(request *si.NodeRequest) error {
- assert.Equal(t, len(request.Nodes), 1, "%s: wrong node count",
tc)
- updatedNode := request.Nodes[0]
- assert.Equal(t, updatedNode.NodeID, Host1, "%s: wrong nodeID",
tc)
- switch updatedNode.Action {
- case si.NodeInfo_CREATE_DRAIN:
- return nil
- case si.NodeInfo_DRAIN_TO_SCHEDULABLE:
- return nil
- case si.NodeInfo_UPDATE:
- executed = true
- default:
- assert.Equal(t, false, "Unexpected action: %d",
updatedNode.Action)
- return nil
- }
- assert.Equal(t,
updatedNode.SchedulableResource.Resources[siCommon.Memory].Value,
int64(10000*1000*1000), "%s: wrong schedulable memory", tc)
- assert.Equal(t,
updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000),
"%s: wrong schedulable cpu", tc)
- if expectAdd {
- assert.Equal(t,
updatedNode.OccupiedResource.Resources[siCommon.Memory].Value,
int64(1000*1000*1000), "%s: wrong occupied memory (add)", tc)
- assert.Equal(t,
updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(500), "%s:
wrong occupied cpu (add)", tc)
- }
- if expectRemove {
- assert.Equal(t,
updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(0), "%s:
wrong occupied memory (remove)", tc)
- assert.Equal(t,
updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(0), "%s:
wrong occupied cpu (remove)", tc)
- }
+ var allocRequest *si.AllocationRequest
+ apiProvider.MockSchedulerAPIUpdateAllocationFn(func(request
*si.AllocationRequest) error {
+ allocRequest = request
return nil
- }
-
+ })
apiProvider.MockSchedulerAPIUpdateNodeFn(func(request *si.NodeRequest)
error {
for _, node := range request.Nodes {
if node.Action == si.NodeInfo_CREATE_DRAIN {
@@ -569,10 +544,10 @@ func TestAddUpdatePodForeign(t *testing.T) {
})
}
}
- return validatorFunc(request)
+ return nil
})
- host1 := nodeForTest(Host1, "10G", "10")
+ host1 := nodeForTest(Host1, "10G", "10") // add existing foreign pod
context.updateNode(nil, host1)
// pod is not assigned to any node
@@ -580,22 +555,18 @@ func TestAddUpdatePodForeign(t *testing.T) {
pod1.Status.Phase = v1.PodPending
pod1.Spec.NodeName = ""
- // validate add
- tc = "add-pod1"
- executed = false
- expectAdd = false
- expectRemove = false
+ // validate add (pending, no node assigned)
+ allocRequest = nil
context.AddPod(pod1)
- assert.Assert(t, !executed, "unexpected update")
+ assert.Assert(t, allocRequest == nil, "unexpected update")
pod := context.schedulerCache.GetPod(string(pod1.UID))
assert.Assert(t, pod == nil, "unassigned pod found in cache")
- // validate update
- tc = "update-pod1"
- executed = false
- expectRemove = false
+ // validate update (no change)
+ allocRequest = nil
context.UpdatePod(nil, pod1)
- assert.Assert(t, !executed, "unexpected update")
+ assert.Assert(t, allocRequest == nil, "unexpected update")
+ pod = context.schedulerCache.GetPod(string(pod1.UID))
assert.Assert(t, pod == nil, "unassigned pod found in cache")
// pod is assigned to a node but still in pending state, should update
@@ -604,155 +575,91 @@ func TestAddUpdatePodForeign(t *testing.T) {
pod2.Spec.NodeName = Host1
// validate add
- tc = "add-pod2"
- executed = false
- expectAdd = true
- expectRemove = false
context.AddPod(pod2)
- assert.Assert(t, executed, "updated expected")
+ assert.Assert(t, allocRequest != nil, "update expected")
+ assertAddForeignPod(t, podName2, Host1, allocRequest)
pod = context.schedulerCache.GetPod(string(pod2.UID))
assert.Assert(t, pod != nil, "pod not found in cache")
- // validate update
- tc = "update-pod2"
- executed = false
- expectAdd = false
- expectRemove = false
+ // validate update (no change)
+ allocRequest = nil
context.UpdatePod(nil, pod2)
- assert.Assert(t, !executed, "unexpected update")
+ assert.Assert(t, allocRequest != nil, "update expected")
pod = context.schedulerCache.GetPod(string(pod2.UID))
assert.Assert(t, pod != nil, "pod not found in cache")
// validate update when not already in cache
- tc = "update-pod2-nocache-pre"
- executed = false
- expectAdd = false
- expectRemove = true
+ allocRequest = nil
context.DeletePod(pod2)
- assert.Assert(t, executed, "expected update")
- tc = "update-pod2-nocache"
- executed = false
- expectAdd = true
- expectRemove = false
+ assertReleaseForeignPod(t, podName2, allocRequest)
+
+ allocRequest = nil
context.UpdatePod(nil, pod2)
- assert.Assert(t, executed, "expected update")
+ assert.Assert(t, allocRequest != nil, "expected update")
pod = context.schedulerCache.GetPod(string(pod2.UID))
assert.Assert(t, pod != nil, "pod not found in cache")
+ assertAddForeignPod(t, podName2, Host1, allocRequest)
// pod is failed, should trigger update if already in cache
pod3 := pod2.DeepCopy()
pod3.Status.Phase = v1.PodFailed
// validate add
- tc = "add-pod3"
- executed = false
- expectAdd = false
- expectRemove = true
+ allocRequest = nil
context.AddPod(pod3)
- assert.Assert(t, executed, "expected update")
+ assert.Assert(t, allocRequest != nil, "expected update")
pod = context.schedulerCache.GetPod(string(pod3.UID))
assert.Assert(t, pod == nil, "failed pod found in cache")
+ assert.Assert(t, allocRequest.Releases != nil) // expecting a release
due to pod status
+ assertReleaseForeignPod(t, podName2, allocRequest)
+}
- // validate update when not already in cache
- tc = "update-pod3-pre"
- executed = false
- expectAdd = true
- expectRemove = false
- context.AddPod(pod2)
- tc = "update-pod3"
- executed = false
- expectAdd = false
- expectRemove = true
- context.UpdatePod(nil, pod3)
- assert.Assert(t, executed, "expected update")
- pod = context.schedulerCache.GetPod(string(pod3.UID))
- assert.Assert(t, pod == nil, "failed pod found in cache")
+func assertAddForeignPod(t *testing.T, podName, host string, allocRequest
*si.AllocationRequest) {
+ t.Helper()
+ assert.Equal(t, 1, len(allocRequest.Allocations))
+ tags := allocRequest.Allocations[0].AllocationTags
+ assert.Equal(t, 2, len(tags))
+ assert.Equal(t, siCommon.AllocTypeDefault, tags[siCommon.Foreign])
+ assert.Equal(t, podName, allocRequest.Allocations[0].AllocationKey)
+ assert.Equal(t, host, allocRequest.Allocations[0].NodeID)
+}
+
+func assertReleaseForeignPod(t *testing.T, podName string, allocRequest
*si.AllocationRequest) {
+ t.Helper()
+ assert.Assert(t, allocRequest.Releases != nil)
+ assert.Equal(t, 1, len(allocRequest.Releases.AllocationsToRelease))
+ assert.Equal(t, podName,
allocRequest.Releases.AllocationsToRelease[0].AllocationKey)
+ assert.Equal(t, constants.DefaultPartition,
allocRequest.Releases.AllocationsToRelease[0].PartitionName)
+ assert.Equal(t, "",
allocRequest.Releases.AllocationsToRelease[0].ApplicationID)
+ assert.Equal(t, si.TerminationType_STOPPED_BY_RM,
allocRequest.Releases.AllocationsToRelease[0].TerminationType)
}
func TestDeletePodForeign(t *testing.T) {
context, apiProvider := initContextAndAPIProviderForTest()
- dispatcher.Start()
- defer dispatcher.UnregisterAllEventHandlers()
- defer dispatcher.Stop()
- executed := false
- expectAdd := false
- expectRemove := false
- tc := ""
-
- validatorFunc := func(request *si.NodeRequest) error {
- executed = true
- assert.Equal(t, len(request.Nodes), 1, "%s: wrong node count",
tc)
- updatedNode := request.Nodes[0]
- switch updatedNode.Action {
- case si.NodeInfo_CREATE_DRAIN:
- return nil
- case si.NodeInfo_DRAIN_TO_SCHEDULABLE:
- return nil
- case si.NodeInfo_UPDATE:
- executed = true
- default:
- assert.Equal(t, false, "Unexpected action: %d",
updatedNode.Action)
- return nil
- }
- assert.Equal(t, updatedNode.NodeID, Host1, "%s: wrong nodeID",
tc)
- assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE, "%s:
wrong action", tc)
- assert.Equal(t,
updatedNode.SchedulableResource.Resources[siCommon.Memory].Value,
int64(10000*1000*1000), "%s: wrong schedulable memory", tc)
- assert.Equal(t,
updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000),
"%s: wrong schedulable cpu", tc)
- if expectAdd {
- assert.Equal(t,
updatedNode.OccupiedResource.Resources[siCommon.Memory].Value,
int64(1000*1000*1000), "%s: wrong occupied memory (add)", tc)
- assert.Equal(t,
updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(500), "%s:
wrong occupied cpu (add)", tc)
- }
- if expectRemove {
- assert.Equal(t,
updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(0), "%s:
wrong occupied memory (remove)", tc)
- assert.Equal(t,
updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(0), "%s:
wrong occupied cpu (remove)", tc)
- }
+ var allocRequest *si.AllocationRequest
+ apiProvider.MockSchedulerAPIUpdateAllocationFn(func(request
*si.AllocationRequest) error {
+ allocRequest = request
return nil
- }
-
- apiProvider.MockSchedulerAPIUpdateNodeFn(func(request *si.NodeRequest)
error {
- for _, node := range request.Nodes {
- if node.Action == si.NodeInfo_CREATE_DRAIN {
- dispatcher.Dispatch(CachedSchedulerNodeEvent{
- NodeID: node.NodeID,
- Event: NodeAccepted,
- })
- }
- }
- return validatorFunc(request)
})
- host1 := nodeForTest(Host1, "10G", "10")
- context.updateNode(nil, host1)
-
- // add existing pod
+ // add existing foreign pod
pod1 := foreignPod(podName1, "1G", "500m")
pod1.Status.Phase = v1.PodRunning
pod1.Spec.NodeName = Host1
-
- // validate deletion of existing assigned pod
- tc = "delete-pod1-pre"
- executed = false
- expectAdd = true
- expectRemove = false
context.AddPod(pod1)
- tc = "delete-pod1"
- executed = false
- expectAdd = false
- expectRemove = true
+ allocRequest = nil
context.DeletePod(pod1)
- assert.Assert(t, executed, "update not executed")
- pod := context.schedulerCache.GetPod(string(pod1.UID))
- assert.Assert(t, pod == nil, "deleted pod found in cache")
- // validate delete when not already found
- tc = "delete-pod1-again"
- executed = false
- expectAdd = false
- expectRemove = false
- context.DeletePod(pod1)
- assert.Assert(t, !executed, "unexpected update")
- pod = context.schedulerCache.GetPod(string(pod1.UID))
+ assert.Assert(t, allocRequest != nil, "update not executed")
+ assert.Equal(t, 0, len(allocRequest.Allocations))
+ assert.Assert(t, allocRequest.Releases != nil)
+ assert.Equal(t, 1, len(allocRequest.Releases.AllocationsToRelease))
+ assert.Equal(t, podName1,
allocRequest.Releases.AllocationsToRelease[0].AllocationKey)
+ assert.Equal(t, constants.DefaultPartition,
allocRequest.Releases.AllocationsToRelease[0].PartitionName)
+ assert.Equal(t, "",
allocRequest.Releases.AllocationsToRelease[0].ApplicationID)
+ assert.Equal(t, si.TerminationType_STOPPED_BY_RM,
allocRequest.Releases.AllocationsToRelease[0].TerminationType)
+ pod := context.schedulerCache.GetPod(string(pod1.UID))
assert.Assert(t, pod == nil, "deleted pod found in cache")
}
@@ -1993,6 +1900,10 @@ func TestInitializeState(t *testing.T) {
},
}}
podLister.AddPod(orphaned)
+ // add an orphan foreign pod
+ orphanForeign := newPodHelper(podForeignName, "default", podForeignUID,
nodeName2, "", v1.PodRunning)
+ orphanForeign.Spec.SchedulerName = ""
+ podLister.AddPod(orphanForeign)
err := context.InitializeState()
assert.NilError(t, err, "InitializeState failed")
@@ -2005,19 +1916,20 @@ func TestInitializeState(t *testing.T) {
assert.Equal(t, pc.Annotations[constants.AnnotationAllowPreemption],
constants.True, "wrong allow-preemption value")
// verify occupied / capacity on node
- capacity, occupied, ok :=
context.schedulerCache.SnapshotResources(nodeName1)
+ capacity, _, ok := context.schedulerCache.SnapshotResources(nodeName1)
assert.Assert(t, ok, "Unable to retrieve node resources")
expectedCapacity := common.ParseResource("4", "10G")
assert.Equal(t, expectedCapacity.Resources["vcore"].Value,
capacity.Resources["vcore"].Value, "wrong capacity vcore")
assert.Equal(t, expectedCapacity.Resources["memory"].Value,
capacity.Resources["memory"].Value, "wrong capacity memory")
- expectedOccupied := common.ParseResource("1500m", "2G")
- assert.Equal(t, expectedOccupied.Resources["vcore"].Value,
occupied.Resources["vcore"].Value, "wrong occupied vcore")
- assert.Equal(t, expectedOccupied.Resources["memory"].Value,
occupied.Resources["memory"].Value, "wrong occupied memory")
// check that pod orphan status is correct
assert.Check(t, !context.schedulerCache.IsPodOrphaned(podName1), "pod1
should not be orphaned")
assert.Check(t, !context.schedulerCache.IsPodOrphaned(podName2), "pod2
should not be orphaned")
assert.Check(t, context.schedulerCache.IsPodOrphaned(podName3), "pod3
should be orphaned")
+ assert.Check(t, context.schedulerCache.IsPodOrphaned(podForeignUID),
"foreign pod should be orphaned")
+ assert.Check(t, context.schedulerCache.GetPod("foreignRunning") != nil,
"foreign running pod is not in the cache")
+ assert.Check(t, context.schedulerCache.GetPod("foreignPending") == nil,
"foreign pending pod should not be in the cache")
+ assert.Check(t,
!context.schedulerCache.IsPodOrphaned("foreignRunning"), "foreign running pod
should not be orphaned")
// pod1 is pending
task1 := context.getTask(appID1, podName1)
@@ -2034,6 +1946,145 @@ func TestInitializeState(t *testing.T) {
assert.Assert(t, task3 == nil, "pod3 was found")
}
+func TestPodAdoption(t *testing.T) {
+ ctx, apiProvider := initContextAndAPIProviderForTest()
+ dispatcher.Start()
+ defer dispatcher.UnregisterAllEventHandlers()
+ defer dispatcher.Stop()
+ apiProvider.MockSchedulerAPIUpdateNodeFn(func(request *si.NodeRequest)
error {
+ for _, node := range request.Nodes {
+ dispatcher.Dispatch(CachedSchedulerNodeEvent{
+ NodeID: node.NodeID,
+ Event: NodeAccepted,
+ })
+ }
+ return nil
+ })
+
+ // add pods w/o node & check orphan status
+ pod1 := newPodHelper(podName1, namespace, pod1UID, Host1, appID,
v1.PodRunning)
+ pod2 := newPodHelper(podForeignName, namespace, podForeignUID, Host1,
"", v1.PodRunning)
+ pod2.Spec.SchedulerName = ""
+ ctx.AddPod(pod1)
+ ctx.AddPod(pod2)
+ assert.Assert(t, ctx.schedulerCache.IsPodOrphaned(pod1UID), "Yunikorn
pod is not orphan")
+ assert.Assert(t, ctx.schedulerCache.IsPodOrphaned(podForeignUID),
"foreign pod is not orphan")
+
+ // add node
+ node := v1.Node{
+ ObjectMeta: apis.ObjectMeta{
+ Name: Host1,
+ Namespace: "default",
+ UID: uid1,
+ },
+ }
+ ctx.addNode(&node)
+
+ // check that node has adopted the pods
+ assert.Assert(t, !ctx.schedulerCache.IsPodOrphaned(pod1UID), "Yunikorn
pod has not been adopted")
+ assert.Assert(t, !ctx.schedulerCache.IsPodOrphaned(podForeignUID),
"foreign pod has not been adopted")
+}
+
+func TestOrphanPodUpdate(t *testing.T) {
+ ctx, apiProvider := initContextAndAPIProviderForTest()
+ dispatcher.Start()
+ defer dispatcher.UnregisterAllEventHandlers()
+ defer dispatcher.Stop()
+ var update atomic.Bool
+ apiProvider.MockSchedulerAPIUpdateAllocationFn(func(request
*si.AllocationRequest) error {
+ update.Store(true)
+ return nil
+ })
+
+ // add pods w/o node & check orphan status
+ pod1 := newPodHelper(podName1, namespace, pod1UID, Host1, appID,
v1.PodPending)
+ pod2 := newPodHelper(podForeignName, namespace, podForeignUID, Host1,
"", v1.PodPending)
+ pod2.Spec.SchedulerName = ""
+ ctx.AddPod(pod1)
+ ctx.AddPod(pod2)
+ assert.Assert(t, ctx.schedulerCache.IsPodOrphaned(pod1UID), "Yunikorn
pod is not orphan")
+ assert.Assert(t, ctx.schedulerCache.IsPodOrphaned(podForeignUID),
"foreign pod is not orphan")
+ assert.Assert(t, ctx.getApplication(appID) == nil)
+
+ // update orphan pods
+ pod1Upd := pod1.DeepCopy()
+ pod1Upd.Status.Phase = v1.PodRunning
+ pod2Upd := pod2.DeepCopy()
+ pod2Upd.Status.Phase = v1.PodRunning
+
+ ctx.UpdatePod(pod1, pod1Upd)
+ assert.Assert(t, ctx.getApplication(appID) == nil)
+ assert.Assert(t, ctx.schedulerCache.IsPodOrphaned(pod1UID), "Yunikorn
pod is not orphan after update")
+ assert.Equal(t, v1.PodRunning,
ctx.schedulerCache.GetPod(pod1UID).Status.Phase, "pod has not been updated in
the cache")
+ assert.Assert(t, !update.Load(), "allocation update has been triggered
for Yunikorn orphan pod")
+
+ ctx.UpdatePod(pod2, pod2Upd)
+ assert.Assert(t, ctx.schedulerCache.IsPodOrphaned(podForeignUID),
"foreign pod is not orphan after update")
+ assert.Equal(t, v1.PodRunning,
ctx.schedulerCache.GetPod(podForeignUID).Status.Phase, "foreign pod has not
been updated in the cache")
+ assert.Assert(t, !update.Load(), "allocation update has been triggered
for foreign orphan pod")
+}
+
+func TestOrphanPodDelete(t *testing.T) {
+ ctx, apiProvider := initContextAndAPIProviderForTest()
+ dispatcher.Start()
+ defer dispatcher.UnregisterAllEventHandlers()
+ defer dispatcher.Stop()
+ var taskEventSent atomic.Bool
+ dispatcher.RegisterEventHandler("TestTaskHandler",
dispatcher.EventTypeTask, func(obj interface{}) {
+ taskEventSent.Store(true)
+ })
+ var request *si.AllocationRequest
+ apiProvider.MockSchedulerAPIUpdateAllocationFn(func(r
*si.AllocationRequest) error {
+ request = r
+ return nil
+ })
+ apiProvider.MockSchedulerAPIUpdateNodeFn(func(request *si.NodeRequest)
error {
+ for _, node := range request.Nodes {
+ dispatcher.Dispatch(CachedSchedulerNodeEvent{
+ NodeID: node.NodeID,
+ Event: NodeAccepted,
+ })
+ }
+ return nil
+ })
+
+ // add pods w/o node & check orphan status
+ pod1 := newPodHelper(podName1, namespace, pod1UID, Host1, appID,
v1.PodPending)
+ pod2 := newPodHelper(podForeignName, namespace, podForeignUID, Host1,
"", v1.PodPending)
+ pod2.Spec.SchedulerName = ""
+ ctx.AddPod(pod1)
+ ctx.AddPod(pod2)
+ assert.Assert(t, ctx.schedulerCache.IsPodOrphaned(pod1UID), "Yunikorn
pod is not orphan")
+ assert.Assert(t, ctx.schedulerCache.IsPodOrphaned(podForeignUID),
"foreign pod is not orphan")
+ assert.Assert(t, ctx.getApplication(appID) == nil)
+
+ // add a node with pod - this creates the application object
+ node := v1.Node{
+ ObjectMeta: apis.ObjectMeta{
+ Name: Host2,
+ Namespace: "default",
+ UID: uid1,
+ },
+ }
+ ctx.addNode(&node)
+ pod3 := newPodHelper(podName2, namespace, pod2UID, Host2, appID,
v1.PodPending)
+ ctx.AddPod(pod3)
+ assert.Assert(t, !ctx.schedulerCache.IsPodOrphaned(pod2UID), "Yunikorn
pod is orphan")
+ assert.Assert(t, ctx.getApplication(appID) != nil)
+
+ // delete orphan YK pod
+ ctx.DeletePod(pod1)
+ err := utils.WaitForCondition(taskEventSent.Load, 100*time.Millisecond,
time.Second)
+ assert.NilError(t, err)
+
+ // delete foreign pod
+ ctx.DeletePod(pod2)
+ assert.Assert(t, request != nil)
+ assert.Assert(t, request.Releases != nil)
+ assert.Equal(t, 1, len(request.Releases.AllocationsToRelease))
+ assert.Equal(t, podForeignUID,
request.Releases.AllocationsToRelease[0].AllocationKey)
+}
+
func TestTaskRemoveOnCompletion(t *testing.T) {
context := initContextForTest()
dispatcher.Start()
diff --git a/pkg/common/constants/constants.go
b/pkg/common/constants/constants.go
index 5a848b4a..09d9ea13 100644
--- a/pkg/common/constants/constants.go
+++ b/pkg/common/constants/constants.go
@@ -66,6 +66,7 @@ const SchedulerName = "yunikorn"
// OwnerReferences
const DaemonSetType = "DaemonSet"
+const NodeKind = "Node"
// Gang scheduling
const PlaceholderContainerImage = "registry.k8s.io/pause:3.7"
diff --git a/pkg/common/si_helper.go b/pkg/common/si_helper.go
index d94a708c..d2ef9a5c 100644
--- a/pkg/common/si_helper.go
+++ b/pkg/common/si_helper.go
@@ -114,6 +114,33 @@ func CreateAllocationForTask(appID, taskID, nodeID string,
resource *si.Resource
}
}
+func CreateAllocationForForeignPod(pod *v1.Pod) *si.AllocationRequest {
+ podType := common.AllocTypeDefault
+ for _, ref := range pod.OwnerReferences {
+ if ref.Kind == constants.NodeKind {
+ podType = common.AllocTypeStatic
+ break
+ }
+ }
+
+ allocation := si.Allocation{
+ AllocationTags: map[string]string{
+ common.Foreign: podType,
+ },
+ AllocationKey: string(pod.UID),
+ ResourcePerAlloc: GetPodResource(pod),
+ Priority: CreatePriorityForTask(pod),
+ NodeID: pod.Spec.NodeName,
+ }
+
+ allocation.AllocationTags[common.CreationTime] =
strconv.FormatInt(pod.CreationTimestamp.Unix(), 10)
+
+ return &si.AllocationRequest{
+ Allocations: []*si.Allocation{&allocation},
+ RmID: conf.GetSchedulerConf().ClusterID,
+ }
+}
+
func GetTerminationTypeFromString(terminationTypeStr string)
si.TerminationType {
if v, ok := si.TerminationType_value[terminationTypeStr]; ok {
return si.TerminationType(v)
@@ -141,6 +168,25 @@ func CreateReleaseRequestForTask(appID, taskID, partition
string, terminationTyp
}
}
+func CreateReleaseRequestForForeignPod(uid, partition string)
*si.AllocationRequest {
+ allocToRelease := make([]*si.AllocationRelease, 1)
+ allocToRelease[0] = &si.AllocationRelease{
+ AllocationKey: uid,
+ PartitionName: partition,
+ TerminationType: si.TerminationType_STOPPED_BY_RM,
+ Message: "pod terminated",
+ }
+
+ releaseRequest := si.AllocationReleasesRequest{
+ AllocationsToRelease: allocToRelease,
+ }
+
+ return &si.AllocationRequest{
+ Releases: &releaseRequest,
+ RmID: conf.GetSchedulerConf().ClusterID,
+ }
+}
+
// CreateUpdateRequestForUpdatedNode builds a NodeRequest for capacity and
occupied resource updates
func CreateUpdateRequestForUpdatedNode(nodeID string, capacity *si.Resource,
occupied *si.Resource) *si.NodeRequest {
nodeInfo := &si.NodeInfo{
diff --git a/pkg/common/si_helper_test.go b/pkg/common/si_helper_test.go
index 9ccf619a..0439f7b3 100644
--- a/pkg/common/si_helper_test.go
+++ b/pkg/common/si_helper_test.go
@@ -19,9 +19,11 @@ package common
import (
"testing"
+ "time"
"gotest.tools/v3/assert"
v1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/api/resource"
apis "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/apache/yunikorn-scheduler-interface/lib/go/common"
@@ -415,3 +417,79 @@ func TestGetTerminationTypeFromString(t *testing.T) {
})
}
}
+
+func TestCreateAllocationForForeignPod(t *testing.T) {
+ cResources := make(map[v1.ResourceName]resource.Quantity)
+ cResources[v1.ResourceMemory] = resource.MustParse("500M")
+ cResources[v1.ResourceCPU] = resource.MustParse("1")
+ var containers []v1.Container
+ containers = append(containers, v1.Container{
+ Name: "container-01",
+ Resources: v1.ResourceRequirements{
+ Requests: cResources,
+ },
+ })
+
+ pod := &v1.Pod{
+ TypeMeta: apis.TypeMeta{
+ Kind: "Pod",
+ APIVersion: "v1",
+ },
+ ObjectMeta: apis.ObjectMeta{
+ Name: "test",
+ UID: "UID-00001",
+ CreationTimestamp: apis.Time{
+ Time: time.Unix(1, 0),
+ },
+ },
+ Spec: v1.PodSpec{
+ Containers: containers,
+ NodeName: nodeID,
+ },
+ }
+
+ allocReq := CreateAllocationForForeignPod(pod)
+ assert.Equal(t, 1, len(allocReq.Allocations))
+ assert.Equal(t, "mycluster", allocReq.RmID)
+ assert.Assert(t, allocReq.Releases == nil)
+ alloc := allocReq.Allocations[0]
+ assert.Equal(t, nodeID, alloc.NodeID)
+ assert.Equal(t, "UID-00001", alloc.AllocationKey)
+ assert.Equal(t, int32(0), alloc.Priority)
+ res := alloc.ResourcePerAlloc
+ assert.Equal(t, 3, len(res.Resources))
+ assert.Equal(t, int64(500000000), res.Resources["memory"].Value)
+ assert.Equal(t, int64(1000), res.Resources["vcore"].Value)
+ assert.Equal(t, int64(1), res.Resources["pods"].Value)
+ assert.Equal(t, 2, len(alloc.AllocationTags))
+ assert.Equal(t, "1", alloc.AllocationTags[common.CreationTime])
+ assert.Equal(t, common.AllocTypeDefault,
alloc.AllocationTags[common.Foreign])
+
+ // set priority & change pod type to static
+ prio := int32(12)
+ pod.Spec.Priority = &prio
+ pod.OwnerReferences = []apis.OwnerReference{
+ {
+ Kind: "Node",
+ },
+ }
+ allocReq = CreateAllocationForForeignPod(pod)
+ assert.Equal(t, 2, len(alloc.AllocationTags))
+ alloc = allocReq.Allocations[0]
+ assert.Equal(t, common.AllocTypeStatic,
alloc.AllocationTags[common.Foreign])
+ assert.Equal(t, int32(12), alloc.Priority)
+}
+
+func TestCreateReleaseRequestForForeignPod(t *testing.T) {
+ allocReq := CreateReleaseRequestForForeignPod("UID-0001", "partition")
+
+ assert.Assert(t, allocReq.Releases != nil)
+ assert.Equal(t, "mycluster", allocReq.RmID)
+ releaseReq := allocReq.Releases
+ assert.Equal(t, 1, len(releaseReq.AllocationsToRelease))
+ release := releaseReq.AllocationsToRelease[0]
+ assert.Equal(t, "UID-0001", release.AllocationKey)
+ assert.Equal(t, "partition", release.PartitionName)
+ assert.Equal(t, si.TerminationType_STOPPED_BY_RM,
release.TerminationType)
+ assert.Equal(t, "pod terminated", release.Message)
+}
diff --git a/pkg/shim/scheduler_mock_test.go b/pkg/shim/scheduler_mock_test.go
index 1c3f3f36..7b8de855 100644
--- a/pkg/shim/scheduler_mock_test.go
+++ b/pkg/shim/scheduler_mock_test.go
@@ -341,7 +341,6 @@ func createUpdateRequestForNewNode(nodeID string,
nodeLabels map[string]string,
nodeInfo := &si.NodeInfo{
NodeID: nodeID,
SchedulableResource: capacity,
- OccupiedResource: occupied,
Attributes: map[string]string{
constants.DefaultNodeAttributeHostNameKey: nodeID,
constants.DefaultNodeAttributeRackNameKey:
constants.DefaultRackName,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]