This is an automated email from the ASF dual-hosted git repository.
ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push:
new 8add17d6 [YUNIKORN-2368] Shim: Send updated resource requests to core
(#912)
8add17d6 is described below
commit 8add17d60653caadccde605c9e9a394335e70cee
Author: Craig Condit <[email protected]>
AuthorDate: Mon Sep 9 14:16:22 2024 -0500
[YUNIKORN-2368] Shim: Send updated resource requests to core (#912)
Closes: #912
---
go.mod | 2 +-
go.sum | 4 +-
pkg/cache/application.go | 12 +++--
pkg/cache/context.go | 6 +--
pkg/cache/placeholder_manager.go | 2 +-
pkg/cache/scheduler_callback.go | 2 +-
pkg/cache/task.go | 110 +++++++++++++++++++++++++--------------
pkg/cache/task_test.go | 4 +-
8 files changed, 88 insertions(+), 54 deletions(-)
diff --git a/go.mod b/go.mod
index 3b7c2d11..13ad673e 100644
--- a/go.mod
+++ b/go.mod
@@ -23,7 +23,7 @@ go 1.22.0
toolchain go1.22.5
require (
- github.com/apache/yunikorn-core v0.0.0-20240827175300-6939b13d1d0e
+ github.com/apache/yunikorn-core v0.0.0-20240908061623-6f06490bcfa3
github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240827015655-68e8c6cca28a
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
diff --git a/go.sum b/go.sum
index 2f048f15..f1203810 100644
--- a/go.sum
+++ b/go.sum
@@ -8,8 +8,8 @@ github.com/NYTimes/gziphandler v1.1.1
h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cq
github.com/NYTimes/gziphandler v1.1.1/go.mod
h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/antlr4-go/antlr/v4 v4.13.0
h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI=
github.com/antlr4-go/antlr/v4 v4.13.0/go.mod
h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g=
-github.com/apache/yunikorn-core v0.0.0-20240827175300-6939b13d1d0e
h1:VaihjHjtmsDK7HEOjlX8KCz7QDxmZSf71CSCuOgjqcc=
-github.com/apache/yunikorn-core v0.0.0-20240827175300-6939b13d1d0e/go.mod
h1:HYeyzHhZt43oG54pasKHrwHM+Jeji8nFoAE2bcLWLYg=
+github.com/apache/yunikorn-core v0.0.0-20240908061623-6f06490bcfa3
h1:ySu0cpFSYFGNtf+PZw4ulzO+cWOyJMYJs+AjmwGWM80=
+github.com/apache/yunikorn-core v0.0.0-20240908061623-6f06490bcfa3/go.mod
h1:HYeyzHhZt43oG54pasKHrwHM+Jeji8nFoAE2bcLWLYg=
github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240827015655-68e8c6cca28a
h1:3WRXGTvhunGBZj8AVZDxx7Bs/AXiH9mvf2jYcuDyklA=
github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240827015655-68e8c6cca28a/go.mod
h1:co3uU98sj1CUTPNTM13lTyi+CY0DOgDndDW2KiUjktU=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5
h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
diff --git a/pkg/cache/application.go b/pkg/cache/application.go
index 085465ce..8438957b 100644
--- a/pkg/cache/application.go
+++ b/pkg/cache/application.go
@@ -546,13 +546,14 @@ func (app *Application) onReservationStateChange() {
for _, t := range app.getTasks(TaskStates().Bound) {
if t.placeholder {
- if _, ok := desireCounts[t.taskGroupName]; ok {
- desireCounts[t.taskGroupName]--
+ taskGroupName := t.GetTaskGroupName()
+ if _, ok := desireCounts[taskGroupName]; ok {
+ desireCounts[taskGroupName]--
} else {
log.Log(log.ShimCacheApplication).Debug("placeholder taskGroupName set on pod
is unknown for application",
zap.String("application",
app.applicationID),
zap.String("podName",
t.GetTaskPod().Name),
- zap.String("taskGroupName",
t.taskGroupName))
+ zap.String("taskGroupName",
taskGroupName))
}
}
}
@@ -659,12 +660,13 @@ func (app *Application) handleAppTaskCompletedEvent() {
}
func (app *Application) publishPlaceholderTimeoutEvents(task *Task) {
- if app.originatingTask != nil && task.IsPlaceholder() &&
task.terminationType ==
si.TerminationType_name[int32(si.TerminationType_TIMEOUT)] {
+ taskTerminationType := task.GetTaskTerminationType()
+ if app.originatingTask != nil && task.IsPlaceholder() &&
taskTerminationType ==
si.TerminationType_name[int32(si.TerminationType_TIMEOUT)] {
log.Log(log.ShimCacheApplication).Debug("trying to send
placeholder timeout events to the original pod from application",
zap.String("appID", app.applicationID),
zap.Stringer("app request originating pod",
app.originatingTask.GetTaskPod()),
zap.String("taskID", task.taskID),
- zap.String("terminationType", task.terminationType))
+ zap.String("terminationType", taskTerminationType))
events.GetRecorder().Eventf(app.originatingTask.GetTaskPod().DeepCopy(), nil,
v1.EventTypeWarning, "GangScheduling",
"PlaceholderTimeOut", "Application %s placeholder has
been timed out", app.applicationID)
}
diff --git a/pkg/cache/context.go b/pkg/cache/context.go
index 61fe1ea0..aaea690a 100644
--- a/pkg/cache/context.go
+++ b/pkg/cache/context.go
@@ -299,7 +299,7 @@ func (ctx *Context) updateYuniKornPod(appID string, pod
*v1.Pod) {
app := ctx.getApplication(appID)
if app != nil {
if task := app.GetTask(taskID); task != nil {
- task.setTaskPod(pod)
+ task.SetTaskPod(pod)
}
}
@@ -1194,7 +1194,7 @@ func (ctx *Context) HandleContainerStateUpdate(request
*si.UpdateContainerSchedu
Reason: "SchedulingSkipped",
Message: request.Reason,
}) {
-
events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
+
events.GetRecorder().Eventf(task.GetTaskPod().DeepCopy(), nil,
v1.EventTypeNormal, "PodUnschedulable",
"PodUnschedulable",
"Task %s is skipped from scheduling
because the queue quota has been exceed", task.alias)
}
@@ -1209,7 +1209,7 @@ func (ctx *Context) HandleContainerStateUpdate(request
*si.UpdateContainerSchedu
Reason: v1.PodReasonUnschedulable,
Message: request.Reason,
}) {
-
events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
+
events.GetRecorder().Eventf(task.GetTaskPod().DeepCopy(), nil,
v1.EventTypeNormal, "PodUnschedulable",
"PodUnschedulable",
"Task %s is pending for the requested
resources become available", task.alias)
}
diff --git a/pkg/cache/placeholder_manager.go b/pkg/cache/placeholder_manager.go
index 230c77c6..7a44ea06 100644
--- a/pkg/cache/placeholder_manager.go
+++ b/pkg/cache/placeholder_manager.go
@@ -79,7 +79,7 @@ func (mgr *PlaceholderManager) createAppPlaceholders(app
*Application) error {
// map task group to count of already created placeholders
tgCounts := make(map[string]int32)
for _, ph := range app.getPlaceHolderTasks() {
- tgCounts[ph.getTaskGroupName()]++
+ tgCounts[ph.GetTaskGroupName()]++
}
// iterate all task groups, create placeholders for all the min members
diff --git a/pkg/cache/scheduler_callback.go b/pkg/cache/scheduler_callback.go
index d058be39..643b4c71 100644
--- a/pkg/cache/scheduler_callback.go
+++ b/pkg/cache/scheduler_callback.go
@@ -64,7 +64,7 @@ func (callback *AsyncRMCallback) UpdateAllocation(response
*si.AllocationRespons
task.setAllocationKey(alloc.AllocationKey)
if err := callback.context.AssumePod(alloc.AllocationKey,
alloc.NodeID); err != nil {
- task.failWithEvent(err.Error(), "AssumePodError")
+ task.FailWithEvent(err.Error(), "AssumePodError")
return err
}
diff --git a/pkg/cache/task.go b/pkg/cache/task.go
index b3d6915c..5c0be9de 100644
--- a/pkg/cache/task.go
+++ b/pkg/cache/task.go
@@ -40,24 +40,27 @@ import (
)
type Task struct {
- taskID string
- alias string
- applicationID string
- application *Application
+ taskID string
+ alias string
+ applicationID string
+ application *Application
+ podStatus v1.PodStatus // pod status, maintained separately for
efficiency reasons
+ context *Context
+ createTime time.Time
+ placeholder bool
+ originator bool
+ sm *fsm.FSM
+
+ // mutable resources, require locking
allocationKey string
- resource *si.Resource
- pod *v1.Pod
- podStatus v1.PodStatus // pod status, maintained separately for
efficiency reasons
- context *Context
nodeName string
- createTime time.Time
taskGroupName string
- placeholder bool
terminationType string
- originator bool
schedulingState TaskSchedulingState
- sm *fsm.FSM
- lock *locking.RWMutex
+ resource *si.Resource
+ pod *v1.Pod
+
+ lock *locking.RWMutex
}
func NewTask(tid string, app *Application, ctx *Context, pod *v1.Pod) *Task {
@@ -135,14 +138,10 @@ func (task *Task) GetTaskPod() *v1.Pod {
}
func (task *Task) GetTaskID() string {
- task.lock.RLock()
- defer task.lock.RUnlock()
return task.taskID
}
func (task *Task) IsPlaceholder() bool {
- task.lock.RLock()
- defer task.lock.RUnlock()
return task.placeholder
}
@@ -157,19 +156,25 @@ func (task *Task) setTaskGroupName(groupName string) {
task.taskGroupName = groupName
}
-func (task *Task) setTaskTerminationType(terminationTyp string) {
+func (task *Task) setTaskTerminationType(terminationType string) {
task.lock.Lock()
defer task.lock.Unlock()
- task.terminationType = terminationTyp
+ task.terminationType = terminationType
}
-func (task *Task) getTaskGroupName() string {
+func (task *Task) GetTaskTerminationType() string {
+ task.lock.RLock()
+ defer task.lock.RUnlock()
+ return task.terminationType
+}
+
+func (task *Task) GetTaskGroupName() string {
task.lock.RLock()
defer task.lock.RUnlock()
return task.taskGroupName
}
-func (task *Task) getNodeName() string {
+func (task *Task) GetNodeName() string {
task.lock.RLock()
defer task.lock.RUnlock()
return task.nodeName
@@ -222,8 +227,6 @@ func (task *Task) initialize() {
}
func (task *Task) IsOriginator() bool {
- task.lock.RLock()
- defer task.lock.RUnlock()
return task.originator
}
@@ -286,13 +289,33 @@ func (task *Task) handleSubmitTaskEvent() {
log.Log(log.ShimCacheTask).Debug("scheduling pod",
zap.String("podName", task.pod.Name))
+ // send update allocation event to core
+ task.updateAllocation()
+
+ if !utils.PodAlreadyBound(task.pod) {
+ // if this is a new request, add events to pod
+ events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
v1.EventTypeNormal, "Scheduling", "Scheduling",
+ "%s is queued and waiting for allocation", task.alias)
+ // if this task belongs to a task group, that means the app has
gang scheduling enabled
+ // in this case, post an event to indicate the task is being
gang scheduled
+ if !task.placeholder && task.taskGroupName != "" {
+ events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
+ v1.EventTypeNormal, "GangScheduling",
"TaskGroupMatch",
+ "Pod belongs to the taskGroup %s, it will be
scheduled as a gang member", task.taskGroupName)
+ }
+ }
+}
+
+// updateAllocation updates the core scheduler when task information changes.
+// This function must be called with the task lock held.
+func (task *Task) updateAllocation() {
// build preemption policy
preemptionPolicy := &si.PreemptionPolicy{
AllowPreemptSelf: task.isPreemptSelfAllowed(),
AllowPreemptOther: task.isPreemptOtherAllowed(),
}
- // submit allocation ask
+ // submit allocation
rr := common.CreateAllocationForTask(
task.applicationID,
task.taskID,
@@ -305,22 +328,9 @@ func (task *Task) handleSubmitTaskEvent() {
preemptionPolicy)
log.Log(log.ShimCacheTask).Debug("send update request",
zap.Stringer("request", rr))
if err :=
task.context.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(rr); err !=
nil {
- log.Log(log.ShimCacheTask).Debug("failed to send scheduling
request to scheduler", zap.Error(err))
+ log.Log(log.ShimCacheTask).Debug("failed to send allocation to
scheduler", zap.Error(err))
return
}
-
- if !utils.PodAlreadyBound(task.pod) {
- // if this is a new request, add events to pod
- events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
v1.EventTypeNormal, "Scheduling", "Scheduling",
- "%s is queued and waiting for allocation", task.alias)
- // if this task belongs to a task group, that means the app has
gang scheduling enabled
- // in this case, post an event to indicate the task is being
gang scheduled
- if !task.placeholder && task.taskGroupName != "" {
- events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
- v1.EventTypeNormal, "GangScheduling",
"TaskGroupMatch",
- "Pod belongs to the taskGroup %s, it will be
scheduled as a gang member", task.taskGroupName)
- }
- }
}
// this is called after task reaches PENDING state,
@@ -604,20 +614,42 @@ func (task *Task) UpdatePodCondition(podCondition
*v1.PodCondition) (bool, *v1.P
return false, pod
}
+func (task *Task) GetAllocationKey() string {
+ task.lock.RLock()
+ defer task.lock.RUnlock()
+ return task.allocationKey
+}
+
func (task *Task) setAllocationKey(allocationKey string) {
task.lock.Lock()
defer task.lock.Unlock()
task.allocationKey = allocationKey
}
+func (task *Task) FailWithEvent(errorMessage, actionReason string) {
+ task.lock.RLock()
+ defer task.lock.RUnlock()
+ task.failWithEvent(errorMessage, actionReason)
+}
+
func (task *Task) failWithEvent(errorMessage, actionReason string) {
dispatcher.Dispatch(NewFailTaskEvent(task.applicationID, task.taskID,
errorMessage))
events.GetRecorder().Eventf(task.pod.DeepCopy(),
nil, v1.EventTypeWarning, actionReason, actionReason,
errorMessage)
}
-func (task *Task) setTaskPod(pod *v1.Pod) {
+func (task *Task) SetTaskPod(pod *v1.Pod) {
task.lock.Lock()
defer task.lock.Unlock()
+
task.pod = pod
+ oldResource := task.resource
+ newResource := common.GetPodResource(pod)
+ if !common.Equals(oldResource, newResource) {
+ // pod resources have changed
+ task.resource = newResource
+
+ // update allocation in core
+ task.updateAllocation()
+ }
}
diff --git a/pkg/cache/task_test.go b/pkg/cache/task_test.go
index 1ec5f7d8..d1fcc336 100644
--- a/pkg/cache/task_test.go
+++ b/pkg/cache/task_test.go
@@ -203,7 +203,7 @@ func TestReleaseTaskAllocation(t *testing.T) {
// bind a task is a async process, wait for it to happen
err = utils.WaitForCondition(
func() bool {
- return task.getNodeName() == "node-1"
+ return task.GetNodeName() == "node-1"
},
100*time.Millisecond,
3*time.Second,
@@ -481,7 +481,7 @@ func TestSetTaskGroup(t *testing.T) {
}
task := NewTask("task01", app, mockedContext, pod)
task.setTaskGroupName("test-group")
- assert.Equal(t, task.getTaskGroupName(), "test-group")
+ assert.Equal(t, task.GetTaskGroupName(), "test-group")
}
//nolint:funlen
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]