This is an automated email from the ASF dual-hosted git repository.

pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git


The following commit(s) were added to refs/heads/master by this push:
     new 753e4f8a [YUNIKORN-2834] [shim] Add non-YuniKorn allocation tracking 
logic (#918)
753e4f8a is described below

commit 753e4f8a3ad28e390cb947ada3acabebb32bc473
Author: Peter Bacsko <[email protected]>
AuthorDate: Wed Oct 16 09:36:05 2024 +0200

    [YUNIKORN-2834] [shim] Add non-YuniKorn allocation tracking logic (#918)
    
    Closes: #918
    
    Signed-off-by: Peter Bacsko <[email protected]>
---
 go.mod                            |   2 +-
 go.sum                            |   4 +-
 pkg/cache/context.go              |  66 +++----
 pkg/cache/context_test.go         | 383 +++++++++++++++++++++-----------------
 pkg/common/constants/constants.go |   1 +
 pkg/common/si_helper.go           |  46 +++++
 pkg/common/si_helper_test.go      |  78 ++++++++
 pkg/shim/scheduler_mock_test.go   |   1 -
 8 files changed, 370 insertions(+), 211 deletions(-)

diff --git a/go.mod b/go.mod
index fc0ffd5d..b5dbbe8c 100644
--- a/go.mod
+++ b/go.mod
@@ -23,7 +23,7 @@ go 1.22.0
 toolchain go1.22.5
 
 require (
-       github.com/apache/yunikorn-core v0.0.0-20241002095736-a2d3d43a145d
+       github.com/apache/yunikorn-core v0.0.0-20241003152125-4ea225160acf
        github.com/apache/yunikorn-scheduler-interface 
v0.0.0-20240924203603-aaf51c93d3a0
        github.com/google/go-cmp v0.6.0
        github.com/google/uuid v1.6.0
diff --git a/go.sum b/go.sum
index c9f1c1bd..566b07b2 100644
--- a/go.sum
+++ b/go.sum
@@ -8,8 +8,8 @@ github.com/NYTimes/gziphandler v1.1.1 
h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cq
 github.com/NYTimes/gziphandler v1.1.1/go.mod 
h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
 github.com/antlr4-go/antlr/v4 v4.13.0 
h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI=
 github.com/antlr4-go/antlr/v4 v4.13.0/go.mod 
h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g=
-github.com/apache/yunikorn-core v0.0.0-20241002095736-a2d3d43a145d 
h1:awo2goBrw25P1aFNZgYJ0q7V+5ycMqMhvI60B75OzQg=
-github.com/apache/yunikorn-core v0.0.0-20241002095736-a2d3d43a145d/go.mod 
h1:q6OXYpCTGvMJxsEorpIF6icKM/IioMmU6KcsclV1kI0=
+github.com/apache/yunikorn-core v0.0.0-20241003152125-4ea225160acf 
h1:wKySiY4IA9Us287QRnIxFnuTHXaMSeQ3BhAwSrSW/sQ=
+github.com/apache/yunikorn-core v0.0.0-20241003152125-4ea225160acf/go.mod 
h1:q6OXYpCTGvMJxsEorpIF6icKM/IioMmU6KcsclV1kI0=
 github.com/apache/yunikorn-scheduler-interface 
v0.0.0-20240924203603-aaf51c93d3a0 
h1:/9j0YXuifvoOl4YVEbO0r+DPkkYLzaQ+/ac+xCc7SY8=
 github.com/apache/yunikorn-scheduler-interface 
v0.0.0-20240924203603-aaf51c93d3a0/go.mod 
h1:co3uU98sj1CUTPNTM13lTyi+CY0DOgDndDW2KiUjktU=
 github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 
h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
diff --git a/pkg/cache/context.go b/pkg/cache/context.go
index b4dca41e..613bbc52 100644
--- a/pkg/cache/context.go
+++ b/pkg/cache/context.go
@@ -365,12 +365,11 @@ func (ctx *Context) updateForeignPod(pod *v1.Pod) {
                podStatusBefore = string(oldPod.Status.Phase)
        }
 
-       // conditions for allocate:
-       //   1. pod was previously assigned
-       //   2. pod is now assigned
-       //   3. pod is not in terminated state
-       //   4. pod references a known node
-       if oldPod == nil && utils.IsAssignedPod(pod) && 
!utils.IsPodTerminated(pod) {
+       // conditions for allocate/update:
+       //   1. pod is now assigned
+       //   2. pod is not in terminated state
+       //   3. pod references a known node
+       if utils.IsAssignedPod(pod) && !utils.IsPodTerminated(pod) {
                if ctx.schedulerCache.UpdatePod(pod) {
                        // pod was accepted by a real node
                        log.Log(log.ShimContext).Debug("pod is assigned to a 
node, trigger occupied resource update",
@@ -378,10 +377,14 @@ func (ctx *Context) updateForeignPod(pod *v1.Pod) {
                                zap.String("podName", pod.Name),
                                zap.String("podStatusBefore", podStatusBefore),
                                zap.String("podStatusCurrent", 
string(pod.Status.Phase)))
-                       ctx.updateNodeOccupiedResources(pod.Spec.NodeName, 
pod.Namespace, pod.Name, common.GetPodResource(pod), 
schedulercache.AddOccupiedResource)
+                       allocReq := common.CreateAllocationForForeignPod(pod)
+                       if err := 
ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(allocReq); err != nil {
+                               log.Log(log.ShimContext).Error("failed to add 
foreign allocation to the core",
+                                       zap.Error(err))
+                       }
                } else {
                        // pod is orphaned (references an unknown node)
-                       log.Log(log.ShimContext).Info("skipping occupied 
resource update for assigned orphaned pod",
+                       log.Log(log.ShimContext).Info("skipping updating 
allocation for assigned orphaned pod",
                                zap.String("namespace", pod.Namespace),
                                zap.String("podName", pod.Name),
                                zap.String("nodeName", pod.Spec.NodeName))
@@ -401,9 +404,13 @@ func (ctx *Context) updateForeignPod(pod *v1.Pod) {
                                zap.String("podStatusBefore", podStatusBefore),
                                zap.String("podStatusCurrent", 
string(pod.Status.Phase)))
                        // this means pod is terminated
-                       // we need sub the occupied resource and re-sync with 
the scheduler-core
-                       ctx.updateNodeOccupiedResources(pod.Spec.NodeName, 
pod.Namespace, pod.Name, common.GetPodResource(pod), 
schedulercache.SubOccupiedResource)
+                       // remove from the scheduler cache and create release 
request to remove foreign allocation from the core
                        ctx.schedulerCache.RemovePod(pod)
+                       releaseReq := 
common.CreateReleaseRequestForForeignPod(string(pod.UID), 
constants.DefaultPartition)
+                       if err := 
ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(releaseReq); err != nil 
{
+                               log.Log(log.ShimContext).Error("failed to 
remove foreign allocation from the core",
+                                       zap.Error(err))
+                       }
                } else {
                        // pod is orphaned (references an unknown node)
                        log.Log(log.ShimContext).Info("skipping occupied 
resource update for terminated orphaned pod",
@@ -453,40 +460,17 @@ func (ctx *Context) deleteYuniKornPod(pod *v1.Pod) {
 func (ctx *Context) deleteForeignPod(pod *v1.Pod) {
        ctx.lock.Lock()
        defer ctx.lock.Unlock()
-       oldPod := ctx.schedulerCache.GetPod(string(pod.UID))
-       if oldPod == nil {
-               // if pod is not in scheduler cache, no node updates are needed
-               log.Log(log.ShimContext).Debug("unknown foreign pod deleted, no 
resource updated needed",
-                       zap.String("namespace", pod.Namespace),
-                       zap.String("podName", pod.Name))
-               return
+       releaseReq := common.CreateReleaseRequestForForeignPod(string(pod.UID), 
constants.DefaultPartition)
+       if err := 
ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(releaseReq); err != nil 
{
+               log.Log(log.ShimContext).Error("failed to remove foreign 
allocation from the core",
+                       zap.Error(err))
        }
 
-       // conditions for release:
-       //   1. pod is already assigned to a node
-       //   2. pod was not in a terminal state before
-       //   3. pod references a known node
-       if !utils.IsPodTerminated(oldPod) {
-               if !ctx.schedulerCache.IsPodOrphaned(string(oldPod.UID)) {
-                       log.Log(log.ShimContext).Debug("foreign pod deleted, 
triggering occupied resource update",
-                               zap.String("namespace", pod.Namespace),
-                               zap.String("podName", pod.Name),
-                               zap.String("podStatusBefore", 
string(oldPod.Status.Phase)),
-                               zap.String("podStatusCurrent", 
string(pod.Status.Phase)))
-                       // this means pod is terminated
-                       // we need sub the occupied resource and re-sync with 
the scheduler-core
-                       ctx.updateNodeOccupiedResources(pod.Spec.NodeName, 
pod.Namespace, pod.Name, common.GetPodResource(pod), 
schedulercache.SubOccupiedResource)
-               } else {
-                       // pod is orphaned (references an unknown node)
-                       log.Log(log.ShimContext).Info("skipping occupied 
resource update for removed orphaned pod",
-                               zap.String("namespace", pod.Namespace),
-                               zap.String("podName", pod.Name),
-                               zap.String("nodeName", pod.Spec.NodeName))
-               }
-               ctx.schedulerCache.RemovePod(pod)
-       }
+       log.Log(log.ShimContext).Debug("removing pod from cache", 
zap.String("podName", pod.Name))
+       ctx.schedulerCache.RemovePod(pod)
 }
 
+//nolint:unused
 func (ctx *Context) updateNodeOccupiedResources(nodeName string, namespace 
string, podName string, resource *si.Resource, opt schedulercache.UpdateType) {
        if common.IsZero(resource) {
                return
@@ -1613,7 +1597,7 @@ func (ctx *Context) decommissionNode(node *v1.Node) error 
{
 }
 
 func (ctx *Context) updateNodeResources(node *v1.Node, capacity *si.Resource, 
occupied *si.Resource) error {
-       request := common.CreateUpdateRequestForUpdatedNode(node.Name, 
capacity, occupied)
+       request := common.CreateUpdateRequestForUpdatedNode(node.Name, 
capacity, nil)
        return ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateNode(request)
 }
 
diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go
index b5098872..a8214bcc 100644
--- a/pkg/cache/context_test.go
+++ b/pkg/cache/context_test.go
@@ -22,6 +22,7 @@ import (
        "encoding/json"
        "fmt"
        "strings"
+       "sync/atomic"
        "testing"
        "time"
 
@@ -74,11 +75,13 @@ const (
        taskUID4    = "task00004"
        taskUnknown = "non_existing_taskID"
 
-       podName1     = "pod1"
-       podName2     = "pod2"
-       podName3     = "pod3"
-       podName4     = "pod4"
-       podNamespace = "yk"
+       podName1       = "pod1"
+       podName2       = "pod2"
+       podName3       = "pod3"
+       podName4       = "pod4"
+       podForeignName = "foreign-1"
+       podForeignUID  = "UUID-foreign-1"
+       podNamespace   = "yk"
 
        nodeName1    = "node1"
        nodeName2    = "node2"
@@ -527,39 +530,11 @@ func TestAddUpdatePodForeign(t *testing.T) {
        defer dispatcher.UnregisterAllEventHandlers()
        defer dispatcher.Stop()
 
-       executed := false
-       expectAdd := false
-       expectRemove := false
-       tc := ""
-
-       validatorFunc := func(request *si.NodeRequest) error {
-               assert.Equal(t, len(request.Nodes), 1, "%s: wrong node count", 
tc)
-               updatedNode := request.Nodes[0]
-               assert.Equal(t, updatedNode.NodeID, Host1, "%s: wrong nodeID", 
tc)
-               switch updatedNode.Action {
-               case si.NodeInfo_CREATE_DRAIN:
-                       return nil
-               case si.NodeInfo_DRAIN_TO_SCHEDULABLE:
-                       return nil
-               case si.NodeInfo_UPDATE:
-                       executed = true
-               default:
-                       assert.Equal(t, false, "Unexpected action: %d", 
updatedNode.Action)
-                       return nil
-               }
-               assert.Equal(t, 
updatedNode.SchedulableResource.Resources[siCommon.Memory].Value, 
int64(10000*1000*1000), "%s: wrong schedulable memory", tc)
-               assert.Equal(t, 
updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000), 
"%s: wrong schedulable cpu", tc)
-               if expectAdd {
-                       assert.Equal(t, 
updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, 
int64(1000*1000*1000), "%s: wrong occupied memory (add)", tc)
-                       assert.Equal(t, 
updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(500), "%s: 
wrong occupied cpu (add)", tc)
-               }
-               if expectRemove {
-                       assert.Equal(t, 
updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(0), "%s: 
wrong occupied memory (remove)", tc)
-                       assert.Equal(t, 
updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(0), "%s: 
wrong occupied cpu (remove)", tc)
-               }
+       var allocRequest *si.AllocationRequest
+       apiProvider.MockSchedulerAPIUpdateAllocationFn(func(request 
*si.AllocationRequest) error {
+               allocRequest = request
                return nil
-       }
-
+       })
        apiProvider.MockSchedulerAPIUpdateNodeFn(func(request *si.NodeRequest) 
error {
                for _, node := range request.Nodes {
                        if node.Action == si.NodeInfo_CREATE_DRAIN {
@@ -569,10 +544,10 @@ func TestAddUpdatePodForeign(t *testing.T) {
                                })
                        }
                }
-               return validatorFunc(request)
+               return nil
        })
 
-       host1 := nodeForTest(Host1, "10G", "10")
+       host1 := nodeForTest(Host1, "10G", "10") // add existing foreign pod
        context.updateNode(nil, host1)
 
        // pod is not assigned to any node
@@ -580,22 +555,18 @@ func TestAddUpdatePodForeign(t *testing.T) {
        pod1.Status.Phase = v1.PodPending
        pod1.Spec.NodeName = ""
 
-       // validate add
-       tc = "add-pod1"
-       executed = false
-       expectAdd = false
-       expectRemove = false
+       // validate add (pending, no node assigned)
+       allocRequest = nil
        context.AddPod(pod1)
-       assert.Assert(t, !executed, "unexpected update")
+       assert.Assert(t, allocRequest == nil, "unexpected update")
        pod := context.schedulerCache.GetPod(string(pod1.UID))
        assert.Assert(t, pod == nil, "unassigned pod found in cache")
 
-       // validate update
-       tc = "update-pod1"
-       executed = false
-       expectRemove = false
+       // validate update (no change)
+       allocRequest = nil
        context.UpdatePod(nil, pod1)
-       assert.Assert(t, !executed, "unexpected update")
+       assert.Assert(t, allocRequest == nil, "unexpected update")
+       pod = context.schedulerCache.GetPod(string(pod1.UID))
        assert.Assert(t, pod == nil, "unassigned pod found in cache")
 
        // pod is assigned to a node but still in pending state, should update
@@ -604,155 +575,91 @@ func TestAddUpdatePodForeign(t *testing.T) {
        pod2.Spec.NodeName = Host1
 
        // validate add
-       tc = "add-pod2"
-       executed = false
-       expectAdd = true
-       expectRemove = false
        context.AddPod(pod2)
-       assert.Assert(t, executed, "updated expected")
+       assert.Assert(t, allocRequest != nil, "update expected")
+       assertAddForeignPod(t, podName2, Host1, allocRequest)
        pod = context.schedulerCache.GetPod(string(pod2.UID))
        assert.Assert(t, pod != nil, "pod not found in cache")
 
-       // validate update
-       tc = "update-pod2"
-       executed = false
-       expectAdd = false
-       expectRemove = false
+       // validate update (no change)
+       allocRequest = nil
        context.UpdatePod(nil, pod2)
-       assert.Assert(t, !executed, "unexpected update")
+       assert.Assert(t, allocRequest != nil, "update expected")
        pod = context.schedulerCache.GetPod(string(pod2.UID))
        assert.Assert(t, pod != nil, "pod not found in cache")
 
        // validate update when not already in cache
-       tc = "update-pod2-nocache-pre"
-       executed = false
-       expectAdd = false
-       expectRemove = true
+       allocRequest = nil
        context.DeletePod(pod2)
-       assert.Assert(t, executed, "expected update")
-       tc = "update-pod2-nocache"
-       executed = false
-       expectAdd = true
-       expectRemove = false
+       assertReleaseForeignPod(t, podName2, allocRequest)
+
+       allocRequest = nil
        context.UpdatePod(nil, pod2)
-       assert.Assert(t, executed, "expected update")
+       assert.Assert(t, allocRequest != nil, "expected update")
        pod = context.schedulerCache.GetPod(string(pod2.UID))
        assert.Assert(t, pod != nil, "pod not found in cache")
+       assertAddForeignPod(t, podName2, Host1, allocRequest)
 
        // pod is failed, should trigger update if already in cache
        pod3 := pod2.DeepCopy()
        pod3.Status.Phase = v1.PodFailed
 
        // validate add
-       tc = "add-pod3"
-       executed = false
-       expectAdd = false
-       expectRemove = true
+       allocRequest = nil
        context.AddPod(pod3)
-       assert.Assert(t, executed, "expected update")
+       assert.Assert(t, allocRequest != nil, "expected update")
        pod = context.schedulerCache.GetPod(string(pod3.UID))
        assert.Assert(t, pod == nil, "failed pod found in cache")
+       assert.Assert(t, allocRequest.Releases != nil) // expecting a release 
due to pod status
+       assertReleaseForeignPod(t, podName2, allocRequest)
+}
 
-       // validate update when not already in cache
-       tc = "update-pod3-pre"
-       executed = false
-       expectAdd = true
-       expectRemove = false
-       context.AddPod(pod2)
-       tc = "update-pod3"
-       executed = false
-       expectAdd = false
-       expectRemove = true
-       context.UpdatePod(nil, pod3)
-       assert.Assert(t, executed, "expected update")
-       pod = context.schedulerCache.GetPod(string(pod3.UID))
-       assert.Assert(t, pod == nil, "failed pod found in cache")
+func assertAddForeignPod(t *testing.T, podName, host string, allocRequest 
*si.AllocationRequest) {
+       t.Helper()
+       assert.Equal(t, 1, len(allocRequest.Allocations))
+       tags := allocRequest.Allocations[0].AllocationTags
+       assert.Equal(t, 2, len(tags))
+       assert.Equal(t, siCommon.AllocTypeDefault, tags[siCommon.Foreign])
+       assert.Equal(t, podName, allocRequest.Allocations[0].AllocationKey)
+       assert.Equal(t, host, allocRequest.Allocations[0].NodeID)
+}
+
+func assertReleaseForeignPod(t *testing.T, podName string, allocRequest 
*si.AllocationRequest) {
+       t.Helper()
+       assert.Assert(t, allocRequest.Releases != nil)
+       assert.Equal(t, 1, len(allocRequest.Releases.AllocationsToRelease))
+       assert.Equal(t, podName, 
allocRequest.Releases.AllocationsToRelease[0].AllocationKey)
+       assert.Equal(t, constants.DefaultPartition, 
allocRequest.Releases.AllocationsToRelease[0].PartitionName)
+       assert.Equal(t, "", 
allocRequest.Releases.AllocationsToRelease[0].ApplicationID)
+       assert.Equal(t, si.TerminationType_STOPPED_BY_RM, 
allocRequest.Releases.AllocationsToRelease[0].TerminationType)
 }
 
 func TestDeletePodForeign(t *testing.T) {
        context, apiProvider := initContextAndAPIProviderForTest()
-       dispatcher.Start()
-       defer dispatcher.UnregisterAllEventHandlers()
-       defer dispatcher.Stop()
 
-       executed := false
-       expectAdd := false
-       expectRemove := false
-       tc := ""
-
-       validatorFunc := func(request *si.NodeRequest) error {
-               executed = true
-               assert.Equal(t, len(request.Nodes), 1, "%s: wrong node count", 
tc)
-               updatedNode := request.Nodes[0]
-               switch updatedNode.Action {
-               case si.NodeInfo_CREATE_DRAIN:
-                       return nil
-               case si.NodeInfo_DRAIN_TO_SCHEDULABLE:
-                       return nil
-               case si.NodeInfo_UPDATE:
-                       executed = true
-               default:
-                       assert.Equal(t, false, "Unexpected action: %d", 
updatedNode.Action)
-                       return nil
-               }
-               assert.Equal(t, updatedNode.NodeID, Host1, "%s: wrong nodeID", 
tc)
-               assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE, "%s: 
wrong action", tc)
-               assert.Equal(t, 
updatedNode.SchedulableResource.Resources[siCommon.Memory].Value, 
int64(10000*1000*1000), "%s: wrong schedulable memory", tc)
-               assert.Equal(t, 
updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000), 
"%s: wrong schedulable cpu", tc)
-               if expectAdd {
-                       assert.Equal(t, 
updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, 
int64(1000*1000*1000), "%s: wrong occupied memory (add)", tc)
-                       assert.Equal(t, 
updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(500), "%s: 
wrong occupied cpu (add)", tc)
-               }
-               if expectRemove {
-                       assert.Equal(t, 
updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(0), "%s: 
wrong occupied memory (remove)", tc)
-                       assert.Equal(t, 
updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(0), "%s: 
wrong occupied cpu (remove)", tc)
-               }
+       var allocRequest *si.AllocationRequest
+       apiProvider.MockSchedulerAPIUpdateAllocationFn(func(request 
*si.AllocationRequest) error {
+               allocRequest = request
                return nil
-       }
-
-       apiProvider.MockSchedulerAPIUpdateNodeFn(func(request *si.NodeRequest) 
error {
-               for _, node := range request.Nodes {
-                       if node.Action == si.NodeInfo_CREATE_DRAIN {
-                               dispatcher.Dispatch(CachedSchedulerNodeEvent{
-                                       NodeID: node.NodeID,
-                                       Event:  NodeAccepted,
-                               })
-                       }
-               }
-               return validatorFunc(request)
        })
 
-       host1 := nodeForTest(Host1, "10G", "10")
-       context.updateNode(nil, host1)
-
-       // add existing pod
+       // add existing foreign pod
        pod1 := foreignPod(podName1, "1G", "500m")
        pod1.Status.Phase = v1.PodRunning
        pod1.Spec.NodeName = Host1
-
-       // validate deletion of existing assigned pod
-       tc = "delete-pod1-pre"
-       executed = false
-       expectAdd = true
-       expectRemove = false
        context.AddPod(pod1)
-       tc = "delete-pod1"
-       executed = false
-       expectAdd = false
-       expectRemove = true
+       allocRequest = nil
        context.DeletePod(pod1)
-       assert.Assert(t, executed, "update not executed")
-       pod := context.schedulerCache.GetPod(string(pod1.UID))
-       assert.Assert(t, pod == nil, "deleted pod found in cache")
 
-       // validate delete when not already found
-       tc = "delete-pod1-again"
-       executed = false
-       expectAdd = false
-       expectRemove = false
-       context.DeletePod(pod1)
-       assert.Assert(t, !executed, "unexpected update")
-       pod = context.schedulerCache.GetPod(string(pod1.UID))
+       assert.Assert(t, allocRequest != nil, "update not executed")
+       assert.Equal(t, 0, len(allocRequest.Allocations))
+       assert.Assert(t, allocRequest.Releases != nil)
+       assert.Equal(t, 1, len(allocRequest.Releases.AllocationsToRelease))
+       assert.Equal(t, podName1, 
allocRequest.Releases.AllocationsToRelease[0].AllocationKey)
+       assert.Equal(t, constants.DefaultPartition, 
allocRequest.Releases.AllocationsToRelease[0].PartitionName)
+       assert.Equal(t, "", 
allocRequest.Releases.AllocationsToRelease[0].ApplicationID)
+       assert.Equal(t, si.TerminationType_STOPPED_BY_RM, 
allocRequest.Releases.AllocationsToRelease[0].TerminationType)
+       pod := context.schedulerCache.GetPod(string(pod1.UID))
        assert.Assert(t, pod == nil, "deleted pod found in cache")
 }
 
@@ -1993,6 +1900,10 @@ func TestInitializeState(t *testing.T) {
                },
        }}
        podLister.AddPod(orphaned)
+       // add an orphan foreign pod
+       orphanForeign := newPodHelper(podForeignName, "default", podForeignUID, 
nodeName2, "", v1.PodRunning)
+       orphanForeign.Spec.SchedulerName = ""
+       podLister.AddPod(orphanForeign)
 
        err := context.InitializeState()
        assert.NilError(t, err, "InitializeState failed")
@@ -2005,19 +1916,20 @@ func TestInitializeState(t *testing.T) {
        assert.Equal(t, pc.Annotations[constants.AnnotationAllowPreemption], 
constants.True, "wrong allow-preemption value")
 
        // verify occupied / capacity on node
-       capacity, occupied, ok := 
context.schedulerCache.SnapshotResources(nodeName1)
+       capacity, _, ok := context.schedulerCache.SnapshotResources(nodeName1)
        assert.Assert(t, ok, "Unable to retrieve node resources")
        expectedCapacity := common.ParseResource("4", "10G")
        assert.Equal(t, expectedCapacity.Resources["vcore"].Value, 
capacity.Resources["vcore"].Value, "wrong capacity vcore")
        assert.Equal(t, expectedCapacity.Resources["memory"].Value, 
capacity.Resources["memory"].Value, "wrong capacity memory")
-       expectedOccupied := common.ParseResource("1500m", "2G")
-       assert.Equal(t, expectedOccupied.Resources["vcore"].Value, 
occupied.Resources["vcore"].Value, "wrong occupied vcore")
-       assert.Equal(t, expectedOccupied.Resources["memory"].Value, 
occupied.Resources["memory"].Value, "wrong occupied memory")
 
        // check that pod orphan status is correct
        assert.Check(t, !context.schedulerCache.IsPodOrphaned(podName1), "pod1 
should not be orphaned")
        assert.Check(t, !context.schedulerCache.IsPodOrphaned(podName2), "pod2 
should not be orphaned")
        assert.Check(t, context.schedulerCache.IsPodOrphaned(podName3), "pod3 
should be orphaned")
+       assert.Check(t, context.schedulerCache.IsPodOrphaned(podForeignUID), 
"foreign pod should be orphaned")
+       assert.Check(t, context.schedulerCache.GetPod("foreignRunning") != nil, 
"foreign running pod is not in the cache")
+       assert.Check(t, context.schedulerCache.GetPod("foreignPending") == nil, 
"foreign pending pod should not be in the cache")
+       assert.Check(t, 
!context.schedulerCache.IsPodOrphaned("foreignRunning"), "foreign running pod 
should not be orphaned")
 
        // pod1 is pending
        task1 := context.getTask(appID1, podName1)
@@ -2034,6 +1946,145 @@ func TestInitializeState(t *testing.T) {
        assert.Assert(t, task3 == nil, "pod3 was found")
 }
 
+func TestPodAdoption(t *testing.T) {
+       ctx, apiProvider := initContextAndAPIProviderForTest()
+       dispatcher.Start()
+       defer dispatcher.UnregisterAllEventHandlers()
+       defer dispatcher.Stop()
+       apiProvider.MockSchedulerAPIUpdateNodeFn(func(request *si.NodeRequest) 
error {
+               for _, node := range request.Nodes {
+                       dispatcher.Dispatch(CachedSchedulerNodeEvent{
+                               NodeID: node.NodeID,
+                               Event:  NodeAccepted,
+                       })
+               }
+               return nil
+       })
+
+       // add pods w/o node & check orphan status
+       pod1 := newPodHelper(podName1, namespace, pod1UID, Host1, appID, 
v1.PodRunning)
+       pod2 := newPodHelper(podForeignName, namespace, podForeignUID, Host1, 
"", v1.PodRunning)
+       pod2.Spec.SchedulerName = ""
+       ctx.AddPod(pod1)
+       ctx.AddPod(pod2)
+       assert.Assert(t, ctx.schedulerCache.IsPodOrphaned(pod1UID), "Yunikorn 
pod is not orphan")
+       assert.Assert(t, ctx.schedulerCache.IsPodOrphaned(podForeignUID), 
"foreign pod is not orphan")
+
+       // add node
+       node := v1.Node{
+               ObjectMeta: apis.ObjectMeta{
+                       Name:      Host1,
+                       Namespace: "default",
+                       UID:       uid1,
+               },
+       }
+       ctx.addNode(&node)
+
+       // check that node has adopted the pods
+       assert.Assert(t, !ctx.schedulerCache.IsPodOrphaned(pod1UID), "Yunikorn 
pod has not been adopted")
+       assert.Assert(t, !ctx.schedulerCache.IsPodOrphaned(podForeignUID), 
"foreign pod has not been adopted")
+}
+
+func TestOrphanPodUpdate(t *testing.T) {
+       ctx, apiProvider := initContextAndAPIProviderForTest()
+       dispatcher.Start()
+       defer dispatcher.UnregisterAllEventHandlers()
+       defer dispatcher.Stop()
+       var update atomic.Bool
+       apiProvider.MockSchedulerAPIUpdateAllocationFn(func(request 
*si.AllocationRequest) error {
+               update.Store(true)
+               return nil
+       })
+
+       // add pods w/o node & check orphan status
+       pod1 := newPodHelper(podName1, namespace, pod1UID, Host1, appID, 
v1.PodPending)
+       pod2 := newPodHelper(podForeignName, namespace, podForeignUID, Host1, 
"", v1.PodPending)
+       pod2.Spec.SchedulerName = ""
+       ctx.AddPod(pod1)
+       ctx.AddPod(pod2)
+       assert.Assert(t, ctx.schedulerCache.IsPodOrphaned(pod1UID), "Yunikorn 
pod is not orphan")
+       assert.Assert(t, ctx.schedulerCache.IsPodOrphaned(podForeignUID), 
"foreign pod is not orphan")
+       assert.Assert(t, ctx.getApplication(appID) == nil)
+
+       // update orphan pods
+       pod1Upd := pod1.DeepCopy()
+       pod1Upd.Status.Phase = v1.PodRunning
+       pod2Upd := pod2.DeepCopy()
+       pod2Upd.Status.Phase = v1.PodRunning
+
+       ctx.UpdatePod(pod1, pod1Upd)
+       assert.Assert(t, ctx.getApplication(appID) == nil)
+       assert.Assert(t, ctx.schedulerCache.IsPodOrphaned(pod1UID), "Yunikorn 
pod is not orphan after update")
+       assert.Equal(t, v1.PodRunning, 
ctx.schedulerCache.GetPod(pod1UID).Status.Phase, "pod has not been updated in 
the cache")
+       assert.Assert(t, !update.Load(), "allocation update has been triggered 
for Yunikorn orphan pod")
+
+       ctx.UpdatePod(pod2, pod2Upd)
+       assert.Assert(t, ctx.schedulerCache.IsPodOrphaned(podForeignUID), 
"foreign pod is not orphan after update")
+       assert.Equal(t, v1.PodRunning, 
ctx.schedulerCache.GetPod(podForeignUID).Status.Phase, "foreign pod has not 
been updated in the cache")
+       assert.Assert(t, !update.Load(), "allocation update has been triggered 
for foreign orphan pod")
+}
+
+func TestOrphanPodDelete(t *testing.T) {
+       ctx, apiProvider := initContextAndAPIProviderForTest()
+       dispatcher.Start()
+       defer dispatcher.UnregisterAllEventHandlers()
+       defer dispatcher.Stop()
+       var taskEventSent atomic.Bool
+       dispatcher.RegisterEventHandler("TestTaskHandler", 
dispatcher.EventTypeTask, func(obj interface{}) {
+               taskEventSent.Store(true)
+       })
+       var request *si.AllocationRequest
+       apiProvider.MockSchedulerAPIUpdateAllocationFn(func(r 
*si.AllocationRequest) error {
+               request = r
+               return nil
+       })
+       apiProvider.MockSchedulerAPIUpdateNodeFn(func(request *si.NodeRequest) 
error {
+               for _, node := range request.Nodes {
+                       dispatcher.Dispatch(CachedSchedulerNodeEvent{
+                               NodeID: node.NodeID,
+                               Event:  NodeAccepted,
+                       })
+               }
+               return nil
+       })
+
+       // add pods w/o node & check orphan status
+       pod1 := newPodHelper(podName1, namespace, pod1UID, Host1, appID, 
v1.PodPending)
+       pod2 := newPodHelper(podForeignName, namespace, podForeignUID, Host1, 
"", v1.PodPending)
+       pod2.Spec.SchedulerName = ""
+       ctx.AddPod(pod1)
+       ctx.AddPod(pod2)
+       assert.Assert(t, ctx.schedulerCache.IsPodOrphaned(pod1UID), "Yunikorn 
pod is not orphan")
+       assert.Assert(t, ctx.schedulerCache.IsPodOrphaned(podForeignUID), 
"foreign pod is not orphan")
+       assert.Assert(t, ctx.getApplication(appID) == nil)
+
+       // add a node with pod - this creates the application object
+       node := v1.Node{
+               ObjectMeta: apis.ObjectMeta{
+                       Name:      Host2,
+                       Namespace: "default",
+                       UID:       uid1,
+               },
+       }
+       ctx.addNode(&node)
+       pod3 := newPodHelper(podName2, namespace, pod2UID, Host2, appID, 
v1.PodPending)
+       ctx.AddPod(pod3)
+       assert.Assert(t, !ctx.schedulerCache.IsPodOrphaned(pod2UID), "Yunikorn 
pod is orphan")
+       assert.Assert(t, ctx.getApplication(appID) != nil)
+
+       // delete orphan YK pod
+       ctx.DeletePod(pod1)
+       err := utils.WaitForCondition(taskEventSent.Load, 100*time.Millisecond, 
time.Second)
+       assert.NilError(t, err)
+
+       // delete foreign pod
+       ctx.DeletePod(pod2)
+       assert.Assert(t, request != nil)
+       assert.Assert(t, request.Releases != nil)
+       assert.Equal(t, 1, len(request.Releases.AllocationsToRelease))
+       assert.Equal(t, podForeignUID, 
request.Releases.AllocationsToRelease[0].AllocationKey)
+}
+
 func TestTaskRemoveOnCompletion(t *testing.T) {
        context := initContextForTest()
        dispatcher.Start()
diff --git a/pkg/common/constants/constants.go 
b/pkg/common/constants/constants.go
index 5a848b4a..09d9ea13 100644
--- a/pkg/common/constants/constants.go
+++ b/pkg/common/constants/constants.go
@@ -66,6 +66,7 @@ const SchedulerName = "yunikorn"
 
 // OwnerReferences
 const DaemonSetType = "DaemonSet"
+const NodeKind = "Node"
 
 // Gang scheduling
 const PlaceholderContainerImage = "registry.k8s.io/pause:3.7"
diff --git a/pkg/common/si_helper.go b/pkg/common/si_helper.go
index d94a708c..d2ef9a5c 100644
--- a/pkg/common/si_helper.go
+++ b/pkg/common/si_helper.go
@@ -114,6 +114,33 @@ func CreateAllocationForTask(appID, taskID, nodeID string, 
resource *si.Resource
        }
 }
 
+func CreateAllocationForForeignPod(pod *v1.Pod) *si.AllocationRequest {
+       podType := common.AllocTypeDefault
+       for _, ref := range pod.OwnerReferences {
+               if ref.Kind == constants.NodeKind {
+                       podType = common.AllocTypeStatic
+                       break
+               }
+       }
+
+       allocation := si.Allocation{
+               AllocationTags: map[string]string{
+                       common.Foreign: podType,
+               },
+               AllocationKey:    string(pod.UID),
+               ResourcePerAlloc: GetPodResource(pod),
+               Priority:         CreatePriorityForTask(pod),
+               NodeID:           pod.Spec.NodeName,
+       }
+
+       allocation.AllocationTags[common.CreationTime] = 
strconv.FormatInt(pod.CreationTimestamp.Unix(), 10)
+
+       return &si.AllocationRequest{
+               Allocations: []*si.Allocation{&allocation},
+               RmID:        conf.GetSchedulerConf().ClusterID,
+       }
+}
+
 func GetTerminationTypeFromString(terminationTypeStr string) 
si.TerminationType {
        if v, ok := si.TerminationType_value[terminationTypeStr]; ok {
                return si.TerminationType(v)
@@ -141,6 +168,25 @@ func CreateReleaseRequestForTask(appID, taskID, partition 
string, terminationTyp
        }
 }
 
+func CreateReleaseRequestForForeignPod(uid, partition string) 
*si.AllocationRequest {
+       allocToRelease := make([]*si.AllocationRelease, 1)
+       allocToRelease[0] = &si.AllocationRelease{
+               AllocationKey:   uid,
+               PartitionName:   partition,
+               TerminationType: si.TerminationType_STOPPED_BY_RM,
+               Message:         "pod terminated",
+       }
+
+       releaseRequest := si.AllocationReleasesRequest{
+               AllocationsToRelease: allocToRelease,
+       }
+
+       return &si.AllocationRequest{
+               Releases: &releaseRequest,
+               RmID:     conf.GetSchedulerConf().ClusterID,
+       }
+}
+
 // CreateUpdateRequestForUpdatedNode builds a NodeRequest for capacity and 
occupied resource updates
 func CreateUpdateRequestForUpdatedNode(nodeID string, capacity *si.Resource, 
occupied *si.Resource) *si.NodeRequest {
        nodeInfo := &si.NodeInfo{
diff --git a/pkg/common/si_helper_test.go b/pkg/common/si_helper_test.go
index 9ccf619a..0439f7b3 100644
--- a/pkg/common/si_helper_test.go
+++ b/pkg/common/si_helper_test.go
@@ -19,9 +19,11 @@ package common
 
 import (
        "testing"
+       "time"
 
        "gotest.tools/v3/assert"
        v1 "k8s.io/api/core/v1"
+       "k8s.io/apimachinery/pkg/api/resource"
        apis "k8s.io/apimachinery/pkg/apis/meta/v1"
 
        "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
@@ -415,3 +417,79 @@ func TestGetTerminationTypeFromString(t *testing.T) {
                })
        }
 }
+
+func TestCreateAllocationForForeignPod(t *testing.T) {
+       cResources := make(map[v1.ResourceName]resource.Quantity)
+       cResources[v1.ResourceMemory] = resource.MustParse("500M")
+       cResources[v1.ResourceCPU] = resource.MustParse("1")
+       var containers []v1.Container
+       containers = append(containers, v1.Container{
+               Name: "container-01",
+               Resources: v1.ResourceRequirements{
+                       Requests: cResources,
+               },
+       })
+
+       pod := &v1.Pod{
+               TypeMeta: apis.TypeMeta{
+                       Kind:       "Pod",
+                       APIVersion: "v1",
+               },
+               ObjectMeta: apis.ObjectMeta{
+                       Name: "test",
+                       UID:  "UID-00001",
+                       CreationTimestamp: apis.Time{
+                               Time: time.Unix(1, 0),
+                       },
+               },
+               Spec: v1.PodSpec{
+                       Containers: containers,
+                       NodeName:   nodeID,
+               },
+       }
+
+       allocReq := CreateAllocationForForeignPod(pod)
+       assert.Equal(t, 1, len(allocReq.Allocations))
+       assert.Equal(t, "mycluster", allocReq.RmID)
+       assert.Assert(t, allocReq.Releases == nil)
+       alloc := allocReq.Allocations[0]
+       assert.Equal(t, nodeID, alloc.NodeID)
+       assert.Equal(t, "UID-00001", alloc.AllocationKey)
+       assert.Equal(t, int32(0), alloc.Priority)
+       res := alloc.ResourcePerAlloc
+       assert.Equal(t, 3, len(res.Resources))
+       assert.Equal(t, int64(500000000), res.Resources["memory"].Value)
+       assert.Equal(t, int64(1000), res.Resources["vcore"].Value)
+       assert.Equal(t, int64(1), res.Resources["pods"].Value)
+       assert.Equal(t, 2, len(alloc.AllocationTags))
+       assert.Equal(t, "1", alloc.AllocationTags[common.CreationTime])
+       assert.Equal(t, common.AllocTypeDefault, 
alloc.AllocationTags[common.Foreign])
+
+       // set priority & change pod type to static
+       prio := int32(12)
+       pod.Spec.Priority = &prio
+       pod.OwnerReferences = []apis.OwnerReference{
+               {
+                       Kind: "Node",
+               },
+       }
+       allocReq = CreateAllocationForForeignPod(pod)
+       assert.Equal(t, 2, len(alloc.AllocationTags))
+       alloc = allocReq.Allocations[0]
+       assert.Equal(t, common.AllocTypeStatic, 
alloc.AllocationTags[common.Foreign])
+       assert.Equal(t, int32(12), alloc.Priority)
+}
+
+func TestCreateReleaseRequestForForeignPod(t *testing.T) {
+       allocReq := CreateReleaseRequestForForeignPod("UID-0001", "partition")
+
+       assert.Assert(t, allocReq.Releases != nil)
+       assert.Equal(t, "mycluster", allocReq.RmID)
+       releaseReq := allocReq.Releases
+       assert.Equal(t, 1, len(releaseReq.AllocationsToRelease))
+       release := releaseReq.AllocationsToRelease[0]
+       assert.Equal(t, "UID-0001", release.AllocationKey)
+       assert.Equal(t, "partition", release.PartitionName)
+       assert.Equal(t, si.TerminationType_STOPPED_BY_RM, 
release.TerminationType)
+       assert.Equal(t, "pod terminated", release.Message)
+}
diff --git a/pkg/shim/scheduler_mock_test.go b/pkg/shim/scheduler_mock_test.go
index 1c3f3f36..7b8de855 100644
--- a/pkg/shim/scheduler_mock_test.go
+++ b/pkg/shim/scheduler_mock_test.go
@@ -341,7 +341,6 @@ func createUpdateRequestForNewNode(nodeID string, 
nodeLabels map[string]string,
        nodeInfo := &si.NodeInfo{
                NodeID:              nodeID,
                SchedulableResource: capacity,
-               OccupiedResource:    occupied,
                Attributes: map[string]string{
                        constants.DefaultNodeAttributeHostNameKey: nodeID,
                        constants.DefaultNodeAttributeRackNameKey: 
constants.DefaultRackName,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to