This is an automated email from the ASF dual-hosted git repository.

pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git


The following commit(s) were added to refs/heads/master by this push:
     new 26443bcc [YUNIKORN-2319] cache.Task: reference to old pod object is 
kept after update (#864)
26443bcc is described below

commit 26443bcc8418cbd09c5402527ef6fd6fb1ef328e
Author: Peter Bacsko <[email protected]>
AuthorDate: Thu Aug 1 12:09:34 2024 +0200

    [YUNIKORN-2319] cache.Task: reference to old pod object is kept after 
update (#864)
    
    Closes: #864
    
    Signed-off-by: Peter Bacsko <[email protected]>
---
 pkg/cache/context.go      | 24 ++++++++++++++----------
 pkg/cache/context_test.go |  5 +++++
 pkg/cache/task.go         | 10 +++++++++-
 3 files changed, 28 insertions(+), 11 deletions(-)

diff --git a/pkg/cache/context.go b/pkg/cache/context.go
index 0f7764ad..474a1deb 100644
--- a/pkg/cache/context.go
+++ b/pkg/cache/context.go
@@ -202,7 +202,7 @@ func (ctx *Context) updateNodeInternal(node *v1.Node, 
register bool) {
                        if applicationID == "" {
                                ctx.updateForeignPod(pod)
                        } else {
-                               ctx.updateYuniKornPod(pod)
+                               ctx.updateYuniKornPod(applicationID, pod)
                        }
                }
 
@@ -296,22 +296,26 @@ func (ctx *Context) UpdatePod(_, newObj interface{}) {
                log.Log(log.ShimContext).Error("failed to update pod", 
zap.Error(err))
                return
        }
-       if utils.GetApplicationIDFromPod(pod) == "" {
+       applicationID := utils.GetApplicationIDFromPod(pod)
+       if applicationID == "" {
                ctx.updateForeignPod(pod)
        } else {
-               ctx.updateYuniKornPod(pod)
+               ctx.updateYuniKornPod(applicationID, pod)
        }
 }
 
-func (ctx *Context) updateYuniKornPod(pod *v1.Pod) {
-       // treat terminated pods like a remove
-       if utils.IsPodTerminated(pod) {
-               if taskMeta, ok := getTaskMetadata(pod); ok {
-                       if app := ctx.getApplication(taskMeta.ApplicationID); 
app != nil {
-                               ctx.notifyTaskComplete(taskMeta.ApplicationID, 
taskMeta.TaskID)
-                       }
+func (ctx *Context) updateYuniKornPod(appID string, pod *v1.Pod) {
+       var app *Application
+       taskID := string(pod.UID)
+       if app = ctx.getApplication(appID); app != nil {
+               if task, err := app.GetTask(taskID); task != nil && err == nil {
+                       task.setTaskPod(pod)
                }
+       }
 
+       // treat terminated pods like a remove
+       if utils.IsPodTerminated(pod) {
+               ctx.notifyTaskComplete(appID, taskID)
                log.Log(log.ShimContext).Debug("Request to update terminated 
pod, removing from cache", zap.String("podName", pod.Name))
                ctx.schedulerCache.RemovePod(pod)
                return
diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go
index 002b3c81..028bd817 100644
--- a/pkg/cache/context_test.go
+++ b/pkg/cache/context_test.go
@@ -526,6 +526,11 @@ func TestUpdatePod(t *testing.T) {
        context.UpdatePod(pod1, pod3)
        pod = context.schedulerCache.GetPod(uid1)
        assert.Check(t, pod == nil, "pod still found after termination")
+       app := context.getApplication("yunikorn-test-00001")
+       // ensure that an updated pod is updated inside the Task
+       task, err := app.GetTask("UID-00001")
+       assert.NilError(t, err)
+       assert.Assert(t, task.GetTaskPod() == pod3, "task pod has not been 
updated")
 
        // ensure a non-terminated pod is updated
        context.UpdatePod(pod1, pod2)
diff --git a/pkg/cache/task.go b/pkg/cache/task.go
index 97f041ea..02b07d16 100644
--- a/pkg/cache/task.go
+++ b/pkg/cache/task.go
@@ -176,7 +176,7 @@ func (task *Task) getNodeName() string {
 }
 
 func (task *Task) DeleteTaskPod() error {
-       return task.context.apiProvider.GetAPIs().KubeClient.Delete(task.pod)
+       return 
task.context.apiProvider.GetAPIs().KubeClient.Delete(task.GetTaskPod())
 }
 
 func (task *Task) UpdateTaskPodStatus(pod *v1.Pod) (*v1.Pod, error) {
@@ -544,9 +544,11 @@ func (task *Task) releaseAllocation() {
 // this reduces the scheduling overhead by blocking such
 // request away from the core scheduler.
 func (task *Task) sanityCheckBeforeScheduling() error {
+       task.lock.RLock()
        // Check PVCs used by the pod
        namespace := task.pod.Namespace
        manifest := &(task.pod.Spec)
+       task.lock.RUnlock()
        for i := range manifest.Volumes {
                volume := &manifest.Volumes[i]
                if volume.PersistentVolumeClaim == nil {
@@ -599,3 +601,9 @@ func (task *Task) failWithEvent(errorMessage, actionReason 
string) {
        events.GetRecorder().Eventf(task.pod.DeepCopy(),
                nil, v1.EventTypeWarning, actionReason, actionReason, 
errorMessage)
 }
+
+func (task *Task) setTaskPod(pod *v1.Pod) {
+       task.lock.Lock()
+       defer task.lock.Unlock()
+       task.pod = pod
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to