This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push:
new 08a9b93d [YUNIKORN-2618]Update Streamline AsyncRMCallback
UpdateAllocation (#846)
08a9b93d is described below
commit 08a9b93dcc308f1e70bbf9a6596664877977e1bd
Author: YUN SUN <[email protected]>
AuthorDate: Wed May 22 23:07:37 2024 +0200
[YUNIKORN-2618]Update Streamline AsyncRMCallback UpdateAllocation (#846)
Closes: #846
Signed-off-by: Peter Bacsko <[email protected]>
---
pkg/cache/scheduler_callback.go | 45 +++++++++++++++++------------------------
1 file changed, 19 insertions(+), 26 deletions(-)
diff --git a/pkg/cache/scheduler_callback.go b/pkg/cache/scheduler_callback.go
index dd147ff0..8ed487c4 100644
--- a/pkg/cache/scheduler_callback.go
+++ b/pkg/cache/scheduler_callback.go
@@ -56,27 +56,24 @@ func (callback *AsyncRMCallback) UpdateAllocation(response
*si.AllocationRespons
// update cache
task := callback.context.getTask(alloc.ApplicationID,
alloc.AllocationKey)
- if task != nil {
- task.setAllocationKey(alloc.AllocationKey)
- } else {
+ if task == nil {
log.Log(log.ShimRMCallback).Warn("Unable to get task",
zap.String("taskID", alloc.AllocationKey))
+ continue
}
+
+ task.setAllocationKey(alloc.AllocationKey)
+
if err := callback.context.AssumePod(alloc.AllocationKey,
alloc.NodeID); err != nil {
- if task != nil {
- task.failWithEvent(err.Error(),
"AssumePodError")
- }
+ task.failWithEvent(err.Error(), "AssumePodError")
return err
}
- if app := callback.context.GetApplication(alloc.ApplicationID);
app != nil {
- if task != nil {
- if utils.IsAssignedPod(task.GetTaskPod()) {
- // task is already bound, fixup state
and continue
-
task.MarkPreviouslyAllocated(alloc.AllocationKey, alloc.NodeID)
- } else {
- ev :=
NewAllocateTaskEvent(app.GetApplicationID(), task.taskID, alloc.AllocationKey,
alloc.NodeID)
- dispatcher.Dispatch(ev)
- }
- }
+
+ if utils.IsAssignedPod(task.GetTaskPod()) {
+ // task is already bound, fixup state and continue
+ task.MarkPreviouslyAllocated(alloc.AllocationKey,
alloc.NodeID)
+ } else {
+ ev := NewAllocateTaskEvent(alloc.ApplicationID,
task.taskID, alloc.AllocationKey, alloc.NodeID)
+ dispatcher.Dispatch(ev)
}
}
@@ -84,22 +81,18 @@ func (callback *AsyncRMCallback) UpdateAllocation(response
*si.AllocationRespons
// request rejected by the scheduler, put it back and try
scheduling again
log.Log(log.ShimRMCallback).Debug("callback: response to
rejected ask",
zap.String("allocationKey", reject.AllocationKey))
- if app :=
callback.context.GetApplication(reject.ApplicationID); app != nil {
-
dispatcher.Dispatch(NewRejectTaskEvent(app.GetApplicationID(),
reject.AllocationKey,
- fmt.Sprintf("task %s ask from application %s is
rejected by scheduler",
- reject.AllocationKey,
reject.ApplicationID)))
- }
+ dispatcher.Dispatch(NewRejectTaskEvent(reject.ApplicationID,
reject.AllocationKey,
+ fmt.Sprintf("task %s ask from application %s is
rejected by scheduler",
+ reject.AllocationKey, reject.ApplicationID)))
}
for _, reject := range response.RejectedAllocations {
// request rejected by the scheduler, reject it
log.Log(log.ShimRMCallback).Debug("callback: response to
rejected allocation",
zap.String("allocationKey", reject.AllocationKey))
- if app :=
callback.context.GetApplication(reject.ApplicationID); app != nil {
-
dispatcher.Dispatch(NewRejectTaskEvent(app.GetApplicationID(),
reject.AllocationKey,
- fmt.Sprintf("task %s allocation from
application %s is rejected by scheduler",
- reject.AllocationKey,
reject.ApplicationID)))
- }
+ dispatcher.Dispatch(NewRejectTaskEvent(reject.ApplicationID,
reject.AllocationKey,
+ fmt.Sprintf("task %s allocation from application %s is
rejected by scheduler",
+ reject.AllocationKey, reject.ApplicationID)))
}
for _, release := range response.Released {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]