This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git


The following commit(s) were added to refs/heads/master by this push:
     new 286abb64 [YUNIKORN-2724] Improve the signature of methods 
notifyTaskComplete() and ensureAppAndTaskCreated() (#873)
286abb64 is described below

commit 286abb6403d65ad8098e9299c9f66027c5a10def
Author: ryankert <[email protected]>
AuthorDate: Mon Aug 12 04:44:45 2024 +0800

    [YUNIKORN-2724] Improve the signature of methods notifyTaskComplete() and 
ensureAppAndTaskCreated() (#873)
    
    Closes: #873
    
    Signed-off-by: Chia-Ping Tsai <[email protected]>
---
 pkg/cache/context.go      | 56 ++++++++++++++++++++++-------------------------
 pkg/cache/context_test.go |  4 ++--
 2 files changed, 28 insertions(+), 32 deletions(-)

diff --git a/pkg/cache/context.go b/pkg/cache/context.go
index 211bf0e9..6b791923 100644
--- a/pkg/cache/context.go
+++ b/pkg/cache/context.go
@@ -295,9 +295,9 @@ func (ctx *Context) UpdatePod(_, newObj interface{}) {
 }
 
 func (ctx *Context) updateYuniKornPod(appID string, pod *v1.Pod) {
-       var app *Application
        taskID := string(pod.UID)
-       if app = ctx.getApplication(appID); app != nil {
+       app := ctx.getApplication(appID)
+       if app != nil {
                if task := app.GetTask(taskID); task != nil {
                        task.setTaskPod(pod)
                }
@@ -305,7 +305,7 @@ func (ctx *Context) updateYuniKornPod(appID string, pod 
*v1.Pod) {
 
        // treat terminated pods like a remove
        if utils.IsPodTerminated(pod) {
-               ctx.notifyTaskComplete(appID, taskID)
+               ctx.notifyTaskComplete(app, taskID)
                log.Log(log.ShimContext).Debug("Request to update terminated 
pod, removing from cache", zap.String("podName", pod.Name))
                ctx.schedulerCache.RemovePod(pod)
                return
@@ -313,23 +313,21 @@ func (ctx *Context) updateYuniKornPod(appID string, pod 
*v1.Pod) {
 
        if ctx.schedulerCache.UpdatePod(pod) {
                // pod was accepted; ensure the application and task objects 
have been created
-               ctx.ensureAppAndTaskCreated(pod)
+               ctx.ensureAppAndTaskCreated(pod, app)
        }
 }
 
-func (ctx *Context) ensureAppAndTaskCreated(pod *v1.Pod) {
-       // get app metadata
-       appMeta, ok := getAppMetadata(pod)
-       if !ok {
-               log.Log(log.ShimContext).Warn("BUG: Unable to retrieve 
application metadata from YuniKorn-managed Pod",
-                       zap.String("namespace", pod.Namespace),
-                       zap.String("name", pod.Name))
-               return
-       }
-
+func (ctx *Context) ensureAppAndTaskCreated(pod *v1.Pod, app *Application) {
        // add app if it doesn't already exist
-       app := ctx.GetApplication(appMeta.ApplicationID)
        if app == nil {
+               // get app metadata
+               appMeta, ok := getAppMetadata(pod)
+               if !ok {
+                       log.Log(log.ShimContext).Warn("BUG: Unable to retrieve 
application metadata from YuniKorn-managed Pod",
+                               zap.String("namespace", pod.Namespace),
+                               zap.String("name", pod.Name))
+                       return
+               }
                app = ctx.AddApplication(&AddApplicationRequest{
                        Metadata: appMeta,
                })
@@ -435,9 +433,7 @@ func (ctx *Context) DeletePod(obj interface{}) {
 
 func (ctx *Context) deleteYuniKornPod(pod *v1.Pod) {
        if taskMeta, ok := getTaskMetadata(pod); ok {
-               if app := ctx.GetApplication(taskMeta.ApplicationID); app != 
nil {
-                       ctx.notifyTaskComplete(taskMeta.ApplicationID, 
taskMeta.TaskID)
-               }
+               
ctx.notifyTaskComplete(ctx.GetApplication(taskMeta.ApplicationID), 
taskMeta.TaskID)
        }
 
        log.Log(log.ShimContext).Debug("removing pod from cache", 
zap.String("podName", pod.Name))
@@ -873,19 +869,19 @@ func (ctx *Context) StartPodAllocation(podKey string, 
nodeID string) bool {
        return ctx.schedulerCache.StartPodAllocation(podKey, nodeID)
 }
 
-func (ctx *Context) notifyTaskComplete(appID, taskID string) {
-       log.Log(log.ShimContext).Debug("NotifyTaskComplete",
-               zap.String("appID", appID),
-               zap.String("taskID", taskID))
-       if app := ctx.GetApplication(appID); app != nil {
-               log.Log(log.ShimContext).Debug("release allocation",
-                       zap.String("appID", appID),
+func (ctx *Context) notifyTaskComplete(app *Application, taskID string) {
+       if app == nil {
+               log.Log(log.ShimContext).Debug("In notifyTaskComplete but app 
is nil",
                        zap.String("taskID", taskID))
-               ev := NewSimpleTaskEvent(appID, taskID, CompleteTask)
-               dispatcher.Dispatch(ev)
-               if app.GetApplicationState() == ApplicationStates().Resuming {
-                       dispatcher.Dispatch(NewSimpleApplicationEvent(appID, 
AppTaskCompleted))
-               }
+               return
+       }
+       log.Log(log.ShimContext).Debug("release allocation in 
notifyTaskComplete",
+               zap.String("appID", app.applicationID),
+               zap.String("taskID", taskID))
+       ev := NewSimpleTaskEvent(app.applicationID, taskID, CompleteTask)
+       dispatcher.Dispatch(ev)
+       if app.GetApplicationState() == ApplicationStates().Resuming {
+               
dispatcher.Dispatch(NewSimpleApplicationEvent(app.applicationID, 
AppTaskCompleted))
        }
 }
 
diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go
index ba661c39..6d7bfafa 100644
--- a/pkg/cache/context_test.go
+++ b/pkg/cache/context_test.go
@@ -1034,7 +1034,7 @@ func TestTaskReleaseAfterRecovery(t *testing.T) {
        assert.Equal(t, len(app.GetBoundTasks()), 2)
 
        // release one of the tasks
-       context.notifyTaskComplete(appID, pod2UID)
+       context.notifyTaskComplete(app, pod2UID)
 
        // wait for release
        err = utils.WaitForCondition(func() bool {
@@ -2075,7 +2075,7 @@ func TestTaskRemoveOnCompletion(t *testing.T) {
        assert.NilError(t, err)
 
        // mark completion
-       context.notifyTaskComplete(appID, taskUID1)
+       context.notifyTaskComplete(app, taskUID1)
        err = utils.WaitForCondition(func() bool {
                return task.GetTaskState() == TaskStates().Completed
        }, 100*time.Millisecond, time.Second)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to