This is an automated email from the ASF dual-hosted git repository.

wilfreds pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b19450f [YUNIKORN-2323] Gang scheduling user experience new events 
(#876)
4b19450f is described below

commit 4b19450fff7642c428619f48b8dc9568dbfe39f6
Author: Wilfred Spiegelenburg <[email protected]>
AuthorDate: Fri Jul 12 18:19:24 2024 +1000

    [YUNIKORN-2323] Gang scheduling user experience new events (#876)
    
    Additional events for gang scheduling covering:
    * placeholder timeout (resuming state)
    * placeholder creation
    * placeholder create failure(s)
    
    Closes: #876
    
    Signed-off-by: Wilfred Spiegelenburg <[email protected]>
---
 pkg/cache/application.go       | 25 +++++++++++++++++++++++--
 pkg/cache/application_state.go |  4 ++++
 2 files changed, 27 insertions(+), 2 deletions(-)

diff --git a/pkg/cache/application.go b/pkg/cache/application.go
index ff4db058..9833a6f2 100644
--- a/pkg/cache/application.go
+++ b/pkg/cache/application.go
@@ -495,12 +495,28 @@ func (app *Application) postAppAccepted() {
        dispatcher.Dispatch(ev)
 }
 
+// onResuming triggered when entering the resuming state which is triggered by 
the time out of the gang placeholders
+// if SOFT gang scheduling is configured.
+func (app *Application) onResuming() {
+       if app.originatingTask != nil {
+               
events.GetRecorder().Eventf(app.originatingTask.GetTaskPod().DeepCopy(), nil, 
v1.EventTypeWarning, "GangScheduling",
+                       "GangSchedulingFailed", "Application %s resuming as 
non-gang application (SOFT)", app.applicationID)
+       }
+}
+
+// onReserving triggered when entering the reserving state.
+// During normal operation this creates all the placeholders. During recovery 
this call could cause the application
+// in the shim and core to progress to the next state.
 func (app *Application) onReserving() {
-       // happens after recovery - if placeholders already exist, we need to 
send
+       // if any placeholder already exist during recovery we might need to 
send
        // an event to trigger Application state change in the core
        if len(app.getPlaceHolderTasks()) > 0 {
                ev := NewUpdateApplicationReservationEvent(app.applicationID)
                dispatcher.Dispatch(ev)
+       } else if app.originatingTask != nil {
+               // not recovery or no placeholders created yet add an event to 
the pod
+               
events.GetRecorder().Eventf(app.originatingTask.GetTaskPod().DeepCopy(), nil, 
v1.EventTypeNormal, "GangScheduling",
+                       "CreatingPlaceholders", "Application %s creating 
placeholders", app.applicationID)
        }
 
        go func() {
@@ -511,6 +527,11 @@ func (app *Application) onReserving() {
                        getPlaceholderManager().cleanUp(app)
                        ev := NewRunApplicationEvent(app.applicationID)
                        dispatcher.Dispatch(ev)
+                       // failed at least one placeholder creation progress as 
a normal application
+                       if app.originatingTask != nil {
+                               
events.GetRecorder().Eventf(app.originatingTask.GetTaskPod().DeepCopy(), nil, 
v1.EventTypeWarning, "GangScheduling",
+                                       "PlaceholderCreateFailed", "Application 
%s fall back to normal scheduling", app.applicationID)
+                       }
                }
        }()
 }
@@ -520,7 +541,7 @@ func (app *Application) onReserving() {
 func (app *Application) onReservationStateChange() {
        if app.originatingTask != nil {
                
events.GetRecorder().Eventf(app.originatingTask.GetTaskPod().DeepCopy(), nil, 
v1.EventTypeNormal, "GangScheduling",
-                       "Placeholder Allocated", "Application %s placeholder 
has been allocated.", app.applicationID)
+                       "PlaceholderAllocated", "Application %s placeholder has 
been allocated.", app.applicationID)
        }
        desireCounts := make(map[string]int32, len(app.taskGroups))
        for _, tg := range app.taskGroups {
diff --git a/pkg/cache/application_state.go b/pkg/cache/application_state.go
index ef251e2f..aee1c3e1 100644
--- a/pkg/cache/application_state.go
+++ b/pkg/cache/application_state.go
@@ -506,6 +506,10 @@ func newAppState() *fsm.FSM { //nolint:funlen
                                app := event.Args[0].(*Application) 
//nolint:errcheck
                                app.onReserving()
                        },
+                       states.Resuming: func(_ context.Context, event 
*fsm.Event) {
+                               app := event.Args[0].(*Application) 
//nolint:errcheck
+                               app.onResuming()
+                       },
                        SubmitApplication.String(): func(_ context.Context, 
event *fsm.Event) {
                                app := event.Args[0].(*Application) 
//nolint:errcheck
                                event.Err = app.handleSubmitApplicationEvent()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to