This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push:
new 26443bcc [YUNIKORN-2319] cache.Task: reference to old pod object is
kept after update (#864)
26443bcc is described below
commit 26443bcc8418cbd09c5402527ef6fd6fb1ef328e
Author: Peter Bacsko <[email protected]>
AuthorDate: Thu Aug 1 12:09:34 2024 +0200
[YUNIKORN-2319] cache.Task: reference to old pod object is kept after
update (#864)
Closes: #864
Signed-off-by: Peter Bacsko <[email protected]>
---
pkg/cache/context.go | 24 ++++++++++++++----------
pkg/cache/context_test.go | 5 +++++
pkg/cache/task.go | 10 +++++++++-
3 files changed, 28 insertions(+), 11 deletions(-)
diff --git a/pkg/cache/context.go b/pkg/cache/context.go
index 0f7764ad..474a1deb 100644
--- a/pkg/cache/context.go
+++ b/pkg/cache/context.go
@@ -202,7 +202,7 @@ func (ctx *Context) updateNodeInternal(node *v1.Node,
register bool) {
if applicationID == "" {
ctx.updateForeignPod(pod)
} else {
- ctx.updateYuniKornPod(pod)
+ ctx.updateYuniKornPod(applicationID, pod)
}
}
@@ -296,22 +296,26 @@ func (ctx *Context) UpdatePod(_, newObj interface{}) {
log.Log(log.ShimContext).Error("failed to update pod",
zap.Error(err))
return
}
- if utils.GetApplicationIDFromPod(pod) == "" {
+ applicationID := utils.GetApplicationIDFromPod(pod)
+ if applicationID == "" {
ctx.updateForeignPod(pod)
} else {
- ctx.updateYuniKornPod(pod)
+ ctx.updateYuniKornPod(applicationID, pod)
}
}
-func (ctx *Context) updateYuniKornPod(pod *v1.Pod) {
- // treat terminated pods like a remove
- if utils.IsPodTerminated(pod) {
- if taskMeta, ok := getTaskMetadata(pod); ok {
- if app := ctx.getApplication(taskMeta.ApplicationID);
app != nil {
- ctx.notifyTaskComplete(taskMeta.ApplicationID,
taskMeta.TaskID)
- }
+func (ctx *Context) updateYuniKornPod(appID string, pod *v1.Pod) {
+ var app *Application
+ taskID := string(pod.UID)
+ if app = ctx.getApplication(appID); app != nil {
+ if task, err := app.GetTask(taskID); task != nil && err == nil {
+ task.setTaskPod(pod)
}
+ }
+ // treat terminated pods like a remove
+ if utils.IsPodTerminated(pod) {
+ ctx.notifyTaskComplete(appID, taskID)
log.Log(log.ShimContext).Debug("Request to update terminated
pod, removing from cache", zap.String("podName", pod.Name))
ctx.schedulerCache.RemovePod(pod)
return
diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go
index 002b3c81..028bd817 100644
--- a/pkg/cache/context_test.go
+++ b/pkg/cache/context_test.go
@@ -526,6 +526,11 @@ func TestUpdatePod(t *testing.T) {
context.UpdatePod(pod1, pod3)
pod = context.schedulerCache.GetPod(uid1)
assert.Check(t, pod == nil, "pod still found after termination")
+ app := context.getApplication("yunikorn-test-00001")
+ // ensure that an updated pod is updated inside the Task
+ task, err := app.GetTask("UID-00001")
+ assert.NilError(t, err)
+ assert.Assert(t, task.GetTaskPod() == pod3, "task pod has not been
updated")
// ensure a non-terminated pod is updated
context.UpdatePod(pod1, pod2)
diff --git a/pkg/cache/task.go b/pkg/cache/task.go
index 97f041ea..02b07d16 100644
--- a/pkg/cache/task.go
+++ b/pkg/cache/task.go
@@ -176,7 +176,7 @@ func (task *Task) getNodeName() string {
}
func (task *Task) DeleteTaskPod() error {
- return task.context.apiProvider.GetAPIs().KubeClient.Delete(task.pod)
+ return
task.context.apiProvider.GetAPIs().KubeClient.Delete(task.GetTaskPod())
}
func (task *Task) UpdateTaskPodStatus(pod *v1.Pod) (*v1.Pod, error) {
@@ -544,9 +544,11 @@ func (task *Task) releaseAllocation() {
// this reduces the scheduling overhead by blocking such
// request away from the core scheduler.
func (task *Task) sanityCheckBeforeScheduling() error {
+ task.lock.RLock()
// Check PVCs used by the pod
namespace := task.pod.Namespace
manifest := &(task.pod.Spec)
+ task.lock.RUnlock()
for i := range manifest.Volumes {
volume := &manifest.Volumes[i]
if volume.PersistentVolumeClaim == nil {
@@ -599,3 +601,9 @@ func (task *Task) failWithEvent(errorMessage, actionReason
string) {
events.GetRecorder().Eventf(task.pod.DeepCopy(),
nil, v1.EventTypeWarning, actionReason, actionReason,
errorMessage)
}
+
+func (task *Task) setTaskPod(pod *v1.Pod) {
+ task.lock.Lock()
+ defer task.lock.Unlock()
+ task.pod = pod
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]