This is an automated email from the ASF dual-hosted git repository.

pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git


The following commit(s) were added to refs/heads/master by this push:
     new 08a9b93d [YUNIKORN-2618]Update Streamline AsyncRMCallback 
UpdateAllocation (#846)
08a9b93d is described below

commit 08a9b93dcc308f1e70bbf9a6596664877977e1bd
Author: YUN SUN <[email protected]>
AuthorDate: Wed May 22 23:07:37 2024 +0200

    [YUNIKORN-2618]Update Streamline AsyncRMCallback UpdateAllocation (#846)
    
    Closes: #846
    
    Signed-off-by: Peter Bacsko <[email protected]>
---
 pkg/cache/scheduler_callback.go | 45 +++++++++++++++++------------------------
 1 file changed, 19 insertions(+), 26 deletions(-)

diff --git a/pkg/cache/scheduler_callback.go b/pkg/cache/scheduler_callback.go
index dd147ff0..8ed487c4 100644
--- a/pkg/cache/scheduler_callback.go
+++ b/pkg/cache/scheduler_callback.go
@@ -56,27 +56,24 @@ func (callback *AsyncRMCallback) UpdateAllocation(response 
*si.AllocationRespons
 
                // update cache
                task := callback.context.getTask(alloc.ApplicationID, 
alloc.AllocationKey)
-               if task != nil {
-                       task.setAllocationKey(alloc.AllocationKey)
-               } else {
+               if task == nil {
                        log.Log(log.ShimRMCallback).Warn("Unable to get task", 
zap.String("taskID", alloc.AllocationKey))
+                       continue
                }
+
+               task.setAllocationKey(alloc.AllocationKey)
+
                if err := callback.context.AssumePod(alloc.AllocationKey, 
alloc.NodeID); err != nil {
-                       if task != nil {
-                               task.failWithEvent(err.Error(), 
"AssumePodError")
-                       }
+                       task.failWithEvent(err.Error(), "AssumePodError")
                        return err
                }
-               if app := callback.context.GetApplication(alloc.ApplicationID); 
app != nil {
-                       if task != nil {
-                               if utils.IsAssignedPod(task.GetTaskPod()) {
-                                       // task is already bound, fixup state 
and continue
-                                       
task.MarkPreviouslyAllocated(alloc.AllocationKey, alloc.NodeID)
-                               } else {
-                                       ev := 
NewAllocateTaskEvent(app.GetApplicationID(), task.taskID, alloc.AllocationKey, 
alloc.NodeID)
-                                       dispatcher.Dispatch(ev)
-                               }
-                       }
+
+               if utils.IsAssignedPod(task.GetTaskPod()) {
+                       // task is already bound, fixup state and continue
+                       task.MarkPreviouslyAllocated(alloc.AllocationKey, 
alloc.NodeID)
+               } else {
+                       ev := NewAllocateTaskEvent(alloc.ApplicationID, 
task.taskID, alloc.AllocationKey, alloc.NodeID)
+                       dispatcher.Dispatch(ev)
                }
        }
 
@@ -84,22 +81,18 @@ func (callback *AsyncRMCallback) UpdateAllocation(response 
*si.AllocationRespons
                // request rejected by the scheduler, put it back and try 
scheduling again
                log.Log(log.ShimRMCallback).Debug("callback: response to 
rejected ask",
                        zap.String("allocationKey", reject.AllocationKey))
-               if app := 
callback.context.GetApplication(reject.ApplicationID); app != nil {
-                       
dispatcher.Dispatch(NewRejectTaskEvent(app.GetApplicationID(), 
reject.AllocationKey,
-                               fmt.Sprintf("task %s ask from application %s is 
rejected by scheduler",
-                                       reject.AllocationKey, 
reject.ApplicationID)))
-               }
+               dispatcher.Dispatch(NewRejectTaskEvent(reject.ApplicationID, 
reject.AllocationKey,
+                       fmt.Sprintf("task %s ask from application %s is 
rejected by scheduler",
+                               reject.AllocationKey, reject.ApplicationID)))
        }
 
        for _, reject := range response.RejectedAllocations {
                // request rejected by the scheduler, reject it
                log.Log(log.ShimRMCallback).Debug("callback: response to 
rejected allocation",
                        zap.String("allocationKey", reject.AllocationKey))
-               if app := 
callback.context.GetApplication(reject.ApplicationID); app != nil {
-                       
dispatcher.Dispatch(NewRejectTaskEvent(app.GetApplicationID(), 
reject.AllocationKey,
-                               fmt.Sprintf("task %s allocation from 
application %s is rejected by scheduler",
-                                       reject.AllocationKey, 
reject.ApplicationID)))
-               }
+               dispatcher.Dispatch(NewRejectTaskEvent(reject.ApplicationID, 
reject.AllocationKey,
+                       fmt.Sprintf("task %s allocation from application %s is 
rejected by scheduler",
+                               reject.AllocationKey, reject.ApplicationID)))
        }
 
        for _, release := range response.Released {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to