This is an automated email from the ASF dual-hosted git repository.
wilfreds pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push:
new 4b19450f [YUNIKORN-2323] Gang scheduling user experience new events
(#876)
4b19450f is described below
commit 4b19450fff7642c428619f48b8dc9568dbfe39f6
Author: Wilfred Spiegelenburg <[email protected]>
AuthorDate: Fri Jul 12 18:19:24 2024 +1000
[YUNIKORN-2323] Gang scheduling user experience new events (#876)
Additional events for gang scheduling covering:
* placeholder timeout (resuming state)
* placeholder creation
* placeholder create failure(s)
Closes: #876
Signed-off-by: Wilfred Spiegelenburg <[email protected]>
---
pkg/cache/application.go | 25 +++++++++++++++++++++++--
pkg/cache/application_state.go | 4 ++++
2 files changed, 27 insertions(+), 2 deletions(-)
diff --git a/pkg/cache/application.go b/pkg/cache/application.go
index ff4db058..9833a6f2 100644
--- a/pkg/cache/application.go
+++ b/pkg/cache/application.go
@@ -495,12 +495,28 @@ func (app *Application) postAppAccepted() {
dispatcher.Dispatch(ev)
}
+// onResuming triggered when entering the resuming state which is triggered by
the time out of the gang placeholders
+// if SOFT gang scheduling is configured.
+func (app *Application) onResuming() {
+ if app.originatingTask != nil {
+
events.GetRecorder().Eventf(app.originatingTask.GetTaskPod().DeepCopy(), nil,
v1.EventTypeWarning, "GangScheduling",
+ "GangSchedulingFailed", "Application %s resuming as
non-gang application (SOFT)", app.applicationID)
+ }
+}
+
+// onReserving triggered when entering the reserving state.
+// During normal operation this creates all the placeholders. During recovery
this call could cause the application
+// in the shim and core to progress to the next state.
func (app *Application) onReserving() {
- // happens after recovery - if placeholders already exist, we need to
send
+ // if any placeholder already exist during recovery we might need to
send
// an event to trigger Application state change in the core
if len(app.getPlaceHolderTasks()) > 0 {
ev := NewUpdateApplicationReservationEvent(app.applicationID)
dispatcher.Dispatch(ev)
+ } else if app.originatingTask != nil {
+ // not recovery or no placeholders created yet add an event to
the pod
+
events.GetRecorder().Eventf(app.originatingTask.GetTaskPod().DeepCopy(), nil,
v1.EventTypeNormal, "GangScheduling",
+ "CreatingPlaceholders", "Application %s creating
placeholders", app.applicationID)
}
go func() {
@@ -511,6 +527,11 @@ func (app *Application) onReserving() {
getPlaceholderManager().cleanUp(app)
ev := NewRunApplicationEvent(app.applicationID)
dispatcher.Dispatch(ev)
+ // failed at least one placeholder creation progress as
a normal application
+ if app.originatingTask != nil {
+
events.GetRecorder().Eventf(app.originatingTask.GetTaskPod().DeepCopy(), nil,
v1.EventTypeWarning, "GangScheduling",
+ "PlaceholderCreateFailed", "Application
%s fall back to normal scheduling", app.applicationID)
+ }
}
}()
}
@@ -520,7 +541,7 @@ func (app *Application) onReserving() {
func (app *Application) onReservationStateChange() {
if app.originatingTask != nil {
events.GetRecorder().Eventf(app.originatingTask.GetTaskPod().DeepCopy(), nil,
v1.EventTypeNormal, "GangScheduling",
- "Placeholder Allocated", "Application %s placeholder
has been allocated.", app.applicationID)
+ "PlaceholderAllocated", "Application %s placeholder has
been allocated.", app.applicationID)
}
desireCounts := make(map[string]int32, len(app.taskGroups))
for _, tg := range app.taskGroups {
diff --git a/pkg/cache/application_state.go b/pkg/cache/application_state.go
index ef251e2f..aee1c3e1 100644
--- a/pkg/cache/application_state.go
+++ b/pkg/cache/application_state.go
@@ -506,6 +506,10 @@ func newAppState() *fsm.FSM { //nolint:funlen
app := event.Args[0].(*Application)
//nolint:errcheck
app.onReserving()
},
+ states.Resuming: func(_ context.Context, event
*fsm.Event) {
+ app := event.Args[0].(*Application)
//nolint:errcheck
+ app.onResuming()
+ },
SubmitApplication.String(): func(_ context.Context,
event *fsm.Event) {
app := event.Args[0].(*Application)
//nolint:errcheck
event.Err = app.handleSubmitApplicationEvent()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]