This is an automated email from the ASF dual-hosted git repository.

ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git


The following commit(s) were added to refs/heads/master by this push:
     new 8add17d6 [YUNIKORN-2368] Shim: Send updated resource requests to core 
(#912)
8add17d6 is described below

commit 8add17d60653caadccde605c9e9a394335e70cee
Author: Craig Condit <[email protected]>
AuthorDate: Mon Sep 9 14:16:22 2024 -0500

    [YUNIKORN-2368] Shim: Send updated resource requests to core (#912)
    
    Closes: #912
---
 go.mod                           |   2 +-
 go.sum                           |   4 +-
 pkg/cache/application.go         |  12 +++--
 pkg/cache/context.go             |   6 +--
 pkg/cache/placeholder_manager.go |   2 +-
 pkg/cache/scheduler_callback.go  |   2 +-
 pkg/cache/task.go                | 110 +++++++++++++++++++++++++--------------
 pkg/cache/task_test.go           |   4 +-
 8 files changed, 88 insertions(+), 54 deletions(-)

diff --git a/go.mod b/go.mod
index 3b7c2d11..13ad673e 100644
--- a/go.mod
+++ b/go.mod
@@ -23,7 +23,7 @@ go 1.22.0
 toolchain go1.22.5
 
 require (
-       github.com/apache/yunikorn-core v0.0.0-20240827175300-6939b13d1d0e
+       github.com/apache/yunikorn-core v0.0.0-20240908061623-6f06490bcfa3
        github.com/apache/yunikorn-scheduler-interface 
v0.0.0-20240827015655-68e8c6cca28a
        github.com/google/go-cmp v0.6.0
        github.com/google/uuid v1.6.0
diff --git a/go.sum b/go.sum
index 2f048f15..f1203810 100644
--- a/go.sum
+++ b/go.sum
@@ -8,8 +8,8 @@ github.com/NYTimes/gziphandler v1.1.1 
h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cq
 github.com/NYTimes/gziphandler v1.1.1/go.mod 
h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
 github.com/antlr4-go/antlr/v4 v4.13.0 
h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI=
 github.com/antlr4-go/antlr/v4 v4.13.0/go.mod 
h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g=
-github.com/apache/yunikorn-core v0.0.0-20240827175300-6939b13d1d0e 
h1:VaihjHjtmsDK7HEOjlX8KCz7QDxmZSf71CSCuOgjqcc=
-github.com/apache/yunikorn-core v0.0.0-20240827175300-6939b13d1d0e/go.mod 
h1:HYeyzHhZt43oG54pasKHrwHM+Jeji8nFoAE2bcLWLYg=
+github.com/apache/yunikorn-core v0.0.0-20240908061623-6f06490bcfa3 
h1:ySu0cpFSYFGNtf+PZw4ulzO+cWOyJMYJs+AjmwGWM80=
+github.com/apache/yunikorn-core v0.0.0-20240908061623-6f06490bcfa3/go.mod 
h1:HYeyzHhZt43oG54pasKHrwHM+Jeji8nFoAE2bcLWLYg=
 github.com/apache/yunikorn-scheduler-interface 
v0.0.0-20240827015655-68e8c6cca28a 
h1:3WRXGTvhunGBZj8AVZDxx7Bs/AXiH9mvf2jYcuDyklA=
 github.com/apache/yunikorn-scheduler-interface 
v0.0.0-20240827015655-68e8c6cca28a/go.mod 
h1:co3uU98sj1CUTPNTM13lTyi+CY0DOgDndDW2KiUjktU=
 github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 
h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
diff --git a/pkg/cache/application.go b/pkg/cache/application.go
index 085465ce..8438957b 100644
--- a/pkg/cache/application.go
+++ b/pkg/cache/application.go
@@ -546,13 +546,14 @@ func (app *Application) onReservationStateChange() {
 
        for _, t := range app.getTasks(TaskStates().Bound) {
                if t.placeholder {
-                       if _, ok := desireCounts[t.taskGroupName]; ok {
-                               desireCounts[t.taskGroupName]--
+                       taskGroupName := t.GetTaskGroupName()
+                       if _, ok := desireCounts[taskGroupName]; ok {
+                               desireCounts[taskGroupName]--
                        } else {
                                
log.Log(log.ShimCacheApplication).Debug("placeholder taskGroupName set on pod 
is unknown for application",
                                        zap.String("application", 
app.applicationID),
                                        zap.String("podName", 
t.GetTaskPod().Name),
-                                       zap.String("taskGroupName", 
t.taskGroupName))
+                                       zap.String("taskGroupName", 
taskGroupName))
                        }
                }
        }
@@ -659,12 +660,13 @@ func (app *Application) handleAppTaskCompletedEvent() {
 }
 
 func (app *Application) publishPlaceholderTimeoutEvents(task *Task) {
-       if app.originatingTask != nil && task.IsPlaceholder() && 
task.terminationType == 
si.TerminationType_name[int32(si.TerminationType_TIMEOUT)] {
+       taskTerminationType := task.GetTaskTerminationType()
+       if app.originatingTask != nil && task.IsPlaceholder() && 
taskTerminationType == 
si.TerminationType_name[int32(si.TerminationType_TIMEOUT)] {
                log.Log(log.ShimCacheApplication).Debug("trying to send 
placeholder timeout events to the original pod from application",
                        zap.String("appID", app.applicationID),
                        zap.Stringer("app request originating pod", 
app.originatingTask.GetTaskPod()),
                        zap.String("taskID", task.taskID),
-                       zap.String("terminationType", task.terminationType))
+                       zap.String("terminationType", taskTerminationType))
                
events.GetRecorder().Eventf(app.originatingTask.GetTaskPod().DeepCopy(), nil, 
v1.EventTypeWarning, "GangScheduling",
                        "PlaceholderTimeOut", "Application %s placeholder has 
been timed out", app.applicationID)
        }
diff --git a/pkg/cache/context.go b/pkg/cache/context.go
index 61fe1ea0..aaea690a 100644
--- a/pkg/cache/context.go
+++ b/pkg/cache/context.go
@@ -299,7 +299,7 @@ func (ctx *Context) updateYuniKornPod(appID string, pod 
*v1.Pod) {
        app := ctx.getApplication(appID)
        if app != nil {
                if task := app.GetTask(taskID); task != nil {
-                       task.setTaskPod(pod)
+                       task.SetTaskPod(pod)
                }
        }
 
@@ -1194,7 +1194,7 @@ func (ctx *Context) HandleContainerStateUpdate(request 
*si.UpdateContainerSchedu
                                        Reason:  "SchedulingSkipped",
                                        Message: request.Reason,
                                }) {
-                               
events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
+                               
events.GetRecorder().Eventf(task.GetTaskPod().DeepCopy(), nil,
                                        v1.EventTypeNormal, "PodUnschedulable", 
"PodUnschedulable",
                                        "Task %s is skipped from scheduling 
because the queue quota has been exceed", task.alias)
                        }
@@ -1209,7 +1209,7 @@ func (ctx *Context) HandleContainerStateUpdate(request 
*si.UpdateContainerSchedu
                                        Reason:  v1.PodReasonUnschedulable,
                                        Message: request.Reason,
                                }) {
-                               
events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
+                               
events.GetRecorder().Eventf(task.GetTaskPod().DeepCopy(), nil,
                                        v1.EventTypeNormal, "PodUnschedulable", 
"PodUnschedulable",
                                        "Task %s is pending for the requested 
resources become available", task.alias)
                        }
diff --git a/pkg/cache/placeholder_manager.go b/pkg/cache/placeholder_manager.go
index 230c77c6..7a44ea06 100644
--- a/pkg/cache/placeholder_manager.go
+++ b/pkg/cache/placeholder_manager.go
@@ -79,7 +79,7 @@ func (mgr *PlaceholderManager) createAppPlaceholders(app 
*Application) error {
        // map task group to count of already created placeholders
        tgCounts := make(map[string]int32)
        for _, ph := range app.getPlaceHolderTasks() {
-               tgCounts[ph.getTaskGroupName()]++
+               tgCounts[ph.GetTaskGroupName()]++
        }
 
        // iterate all task groups, create placeholders for all the min members
diff --git a/pkg/cache/scheduler_callback.go b/pkg/cache/scheduler_callback.go
index d058be39..643b4c71 100644
--- a/pkg/cache/scheduler_callback.go
+++ b/pkg/cache/scheduler_callback.go
@@ -64,7 +64,7 @@ func (callback *AsyncRMCallback) UpdateAllocation(response 
*si.AllocationRespons
                task.setAllocationKey(alloc.AllocationKey)
 
                if err := callback.context.AssumePod(alloc.AllocationKey, 
alloc.NodeID); err != nil {
-                       task.failWithEvent(err.Error(), "AssumePodError")
+                       task.FailWithEvent(err.Error(), "AssumePodError")
                        return err
                }
 
diff --git a/pkg/cache/task.go b/pkg/cache/task.go
index b3d6915c..5c0be9de 100644
--- a/pkg/cache/task.go
+++ b/pkg/cache/task.go
@@ -40,24 +40,27 @@ import (
 )
 
 type Task struct {
-       taskID          string
-       alias           string
-       applicationID   string
-       application     *Application
+       taskID        string
+       alias         string
+       applicationID string
+       application   *Application
+       podStatus     v1.PodStatus // pod status, maintained separately for 
efficiency reasons
+       context       *Context
+       createTime    time.Time
+       placeholder   bool
+       originator    bool
+       sm            *fsm.FSM
+
+       // mutable resources, require locking
        allocationKey   string
-       resource        *si.Resource
-       pod             *v1.Pod
-       podStatus       v1.PodStatus // pod status, maintained separately for 
efficiency reasons
-       context         *Context
        nodeName        string
-       createTime      time.Time
        taskGroupName   string
-       placeholder     bool
        terminationType string
-       originator      bool
        schedulingState TaskSchedulingState
-       sm              *fsm.FSM
-       lock            *locking.RWMutex
+       resource        *si.Resource
+       pod             *v1.Pod
+
+       lock *locking.RWMutex
 }
 
 func NewTask(tid string, app *Application, ctx *Context, pod *v1.Pod) *Task {
@@ -135,14 +138,10 @@ func (task *Task) GetTaskPod() *v1.Pod {
 }
 
 func (task *Task) GetTaskID() string {
-       task.lock.RLock()
-       defer task.lock.RUnlock()
        return task.taskID
 }
 
 func (task *Task) IsPlaceholder() bool {
-       task.lock.RLock()
-       defer task.lock.RUnlock()
        return task.placeholder
 }
 
@@ -157,19 +156,25 @@ func (task *Task) setTaskGroupName(groupName string) {
        task.taskGroupName = groupName
 }
 
-func (task *Task) setTaskTerminationType(terminationTyp string) {
+func (task *Task) setTaskTerminationType(terminationType string) {
        task.lock.Lock()
        defer task.lock.Unlock()
-       task.terminationType = terminationTyp
+       task.terminationType = terminationType
 }
 
-func (task *Task) getTaskGroupName() string {
+func (task *Task) GetTaskTerminationType() string {
+       task.lock.RLock()
+       defer task.lock.RUnlock()
+       return task.terminationType
+}
+
+func (task *Task) GetTaskGroupName() string {
        task.lock.RLock()
        defer task.lock.RUnlock()
        return task.taskGroupName
 }
 
-func (task *Task) getNodeName() string {
+func (task *Task) GetNodeName() string {
        task.lock.RLock()
        defer task.lock.RUnlock()
        return task.nodeName
@@ -222,8 +227,6 @@ func (task *Task) initialize() {
 }
 
 func (task *Task) IsOriginator() bool {
-       task.lock.RLock()
-       defer task.lock.RUnlock()
        return task.originator
 }
 
@@ -286,13 +289,33 @@ func (task *Task) handleSubmitTaskEvent() {
        log.Log(log.ShimCacheTask).Debug("scheduling pod",
                zap.String("podName", task.pod.Name))
 
+       // send update allocation event to core
+       task.updateAllocation()
+
+       if !utils.PodAlreadyBound(task.pod) {
+               // if this is a new request, add events to pod
+               events.GetRecorder().Eventf(task.pod.DeepCopy(), nil, 
v1.EventTypeNormal, "Scheduling", "Scheduling",
+                       "%s is queued and waiting for allocation", task.alias)
+               // if this task belongs to a task group, that means the app has 
gang scheduling enabled
+               // in this case, post an event to indicate the task is being 
gang scheduled
+               if !task.placeholder && task.taskGroupName != "" {
+                       events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
+                               v1.EventTypeNormal, "GangScheduling", 
"TaskGroupMatch",
+                               "Pod belongs to the taskGroup %s, it will be 
scheduled as a gang member", task.taskGroupName)
+               }
+       }
+}
+
+// updateAllocation updates the core scheduler when task information changes.
+// This function must be called with the task lock held.
+func (task *Task) updateAllocation() {
        // build preemption policy
        preemptionPolicy := &si.PreemptionPolicy{
                AllowPreemptSelf:  task.isPreemptSelfAllowed(),
                AllowPreemptOther: task.isPreemptOtherAllowed(),
        }
 
-       // submit allocation ask
+       // submit allocation
        rr := common.CreateAllocationForTask(
                task.applicationID,
                task.taskID,
@@ -305,22 +328,9 @@ func (task *Task) handleSubmitTaskEvent() {
                preemptionPolicy)
        log.Log(log.ShimCacheTask).Debug("send update request", 
zap.Stringer("request", rr))
        if err := 
task.context.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(rr); err != 
nil {
-               log.Log(log.ShimCacheTask).Debug("failed to send scheduling 
request to scheduler", zap.Error(err))
+               log.Log(log.ShimCacheTask).Debug("failed to send allocation to 
scheduler", zap.Error(err))
                return
        }
-
-       if !utils.PodAlreadyBound(task.pod) {
-               // if this is a new request, add events to pod
-               events.GetRecorder().Eventf(task.pod.DeepCopy(), nil, 
v1.EventTypeNormal, "Scheduling", "Scheduling",
-                       "%s is queued and waiting for allocation", task.alias)
-               // if this task belongs to a task group, that means the app has 
gang scheduling enabled
-               // in this case, post an event to indicate the task is being 
gang scheduled
-               if !task.placeholder && task.taskGroupName != "" {
-                       events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
-                               v1.EventTypeNormal, "GangScheduling", 
"TaskGroupMatch",
-                               "Pod belongs to the taskGroup %s, it will be 
scheduled as a gang member", task.taskGroupName)
-               }
-       }
 }
 
 // this is called after task reaches PENDING state,
@@ -604,20 +614,42 @@ func (task *Task) UpdatePodCondition(podCondition 
*v1.PodCondition) (bool, *v1.P
        return false, pod
 }
 
+func (task *Task) GetAllocationKey() string {
+       task.lock.RLock()
+       defer task.lock.RUnlock()
+       return task.allocationKey
+}
+
 func (task *Task) setAllocationKey(allocationKey string) {
        task.lock.Lock()
        defer task.lock.Unlock()
        task.allocationKey = allocationKey
 }
 
+func (task *Task) FailWithEvent(errorMessage, actionReason string) {
+       task.lock.RLock()
+       defer task.lock.RUnlock()
+       task.failWithEvent(errorMessage, actionReason)
+}
+
 func (task *Task) failWithEvent(errorMessage, actionReason string) {
        dispatcher.Dispatch(NewFailTaskEvent(task.applicationID, task.taskID, 
errorMessage))
        events.GetRecorder().Eventf(task.pod.DeepCopy(),
                nil, v1.EventTypeWarning, actionReason, actionReason, 
errorMessage)
 }
 
-func (task *Task) setTaskPod(pod *v1.Pod) {
+func (task *Task) SetTaskPod(pod *v1.Pod) {
        task.lock.Lock()
        defer task.lock.Unlock()
+
        task.pod = pod
+       oldResource := task.resource
+       newResource := common.GetPodResource(pod)
+       if !common.Equals(oldResource, newResource) {
+               // pod resources have changed
+               task.resource = newResource
+
+               // update allocation in core
+               task.updateAllocation()
+       }
 }
diff --git a/pkg/cache/task_test.go b/pkg/cache/task_test.go
index 1ec5f7d8..d1fcc336 100644
--- a/pkg/cache/task_test.go
+++ b/pkg/cache/task_test.go
@@ -203,7 +203,7 @@ func TestReleaseTaskAllocation(t *testing.T) {
        // bind a task is a async process, wait for it to happen
        err = utils.WaitForCondition(
                func() bool {
-                       return task.getNodeName() == "node-1"
+                       return task.GetNodeName() == "node-1"
                },
                100*time.Millisecond,
                3*time.Second,
@@ -481,7 +481,7 @@ func TestSetTaskGroup(t *testing.T) {
        }
        task := NewTask("task01", app, mockedContext, pod)
        task.setTaskGroupName("test-group")
-       assert.Equal(t, task.getTaskGroupName(), "test-group")
+       assert.Equal(t, task.GetTaskGroupName(), "test-group")
 }
 
 //nolint:funlen


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to