This is an automated email from the ASF dual-hosted git repository.

pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 2feb6ecc [YUNIKORN-1945] Add missing application state transition 
event (#629)
2feb6ecc is described below

commit 2feb6ecc13849130c99f90a78e053fc6b319e725
Author: Peter Bacsko <[email protected]>
AuthorDate: Thu Aug 31 19:46:55 2023 +0200

    [YUNIKORN-1945] Add missing application state transition event (#629)
    
    Closes: #629
    
    Signed-off-by: Peter Bacsko <[email protected]>
---
 pkg/scheduler/objects/application.go            |  3 ---
 pkg/scheduler/objects/application_state.go      | 27 ++++++++++++++++++-------
 pkg/scheduler/objects/application_state_test.go | 17 ++++++++++++++--
 3 files changed, 35 insertions(+), 12 deletions(-)

diff --git a/pkg/scheduler/objects/application.go 
b/pkg/scheduler/objects/application.go
index c4fe10b1..0124ddf1 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -308,9 +308,6 @@ func (sa *Application) HandleApplicationEventWithInfo(event 
applicationEvent, ev
        if err != nil && err.Error() == noTransition {
                return nil
        }
-       if event == RejectApplication {
-               sa.appEvents.sendRejectApplicationEvent(eventInfo)
-       }
        return err
 }
 
diff --git a/pkg/scheduler/objects/application_state.go 
b/pkg/scheduler/objects/application_state.go
index 98961884..35dbc6b3 100644
--- a/pkg/scheduler/objects/application_state.go
+++ b/pkg/scheduler/objects/application_state.go
@@ -74,6 +74,19 @@ const (
        Resuming
 )
 
+var stateEvents = map[string]si.EventRecord_ChangeDetail{
+       Accepted.String():   si.EventRecord_APP_ACCEPTED,
+       Starting.String():   si.EventRecord_APP_STARTING,
+       Running.String():    si.EventRecord_APP_RUNNING,
+       Rejected.String():   si.EventRecord_APP_REJECT,
+       Completing.String(): si.EventRecord_APP_COMPLETING,
+       Completed.String():  si.EventRecord_APP_COMPLETED,
+       Failing.String():    si.EventRecord_APP_FAILING,
+       Failed.String():     si.EventRecord_APP_FAILED,
+       Resuming.String():   si.EventRecord_APP_RESUMING,
+       Expired.String():    si.EventRecord_APP_EXPIRED,
+}
+
 func (as applicationState) String() string {
        return [...]string{"New", "Accepted", "Starting", "Running", 
"Rejected", "Completing", "Completed", "Failing", "Failed", "Expired", 
"Resuming"}[as]
 }
@@ -142,6 +155,13 @@ func NewAppState() *fsm.FSM {
                                } else {
                                        app.OnStateChange(event, "")
                                }
+                               eventDetails, ok := stateEvents[event.Dst]
+                               if !ok {
+                                       log.Log(log.SchedFSM).Error("event 
details not found",
+                                               zap.String("state", event.Dst))
+                                       return
+                               }
+                               app.appEvents.sendStateChangeEvent(eventDetails)
                        },
                        "leave_state": func(_ context.Context, event 
*fsm.Event) {
                                event.Args[0].(*Application).clearStateTimer() 
//nolint:errcheck
@@ -153,7 +173,6 @@ func NewAppState() *fsm.FSM {
                                app.setStateTimer(app.startTimeout, 
app.stateMachine.Current(), RunApplication)
                                
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsRunning()
                                
metrics.GetSchedulerMetrics().IncTotalApplicationsRunning()
-                               
app.appEvents.sendStateChangeEvent(si.EventRecord_APP_STARTING)
                        },
                        fmt.Sprintf("enter_%s", Resuming.String()): func(_ 
context.Context, event *fsm.Event) {
                                app := event.Args[0].(*Application) 
//nolint:errcheck
@@ -168,13 +187,11 @@ func NewAppState() *fsm.FSM {
                                        
metrics.GetSchedulerMetrics().DecTotalApplicationsRunning()
                                }
                                app.setStateTimer(completingTimeout, 
app.stateMachine.Current(), CompleteApplication)
-                               
app.appEvents.sendStateChangeEvent(si.EventRecord_APP_COMPLETING)
                        },
                        fmt.Sprintf("leave_%s", New.String()): func(_ 
context.Context, event *fsm.Event) {
                                app := event.Args[0].(*Application) 
//nolint:errcheck
                                
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsAccepted()
                                
metrics.GetSchedulerMetrics().IncTotalApplicationsAccepted()
-                               
app.appEvents.sendStateChangeEvent(si.EventRecord_APP_ACCEPTED)
                        },
                        fmt.Sprintf("enter_%s", Rejected.String()): func(_ 
context.Context, event *fsm.Event) {
                                app := event.Args[0].(*Application) 
//nolint:errcheck
@@ -199,7 +216,6 @@ func NewAppState() *fsm.FSM {
                                // account for going back into running state
                                if event.Dst == Running.String() {
                                        
app.queue.incRunningApps(app.ApplicationID)
-                                       
app.appEvents.sendStateChangeEvent(si.EventRecord_APP_RUNNING)
                                }
                        },
                        fmt.Sprintf("enter_%s", Completed.String()): func(_ 
context.Context, event *fsm.Event) {
@@ -207,7 +223,6 @@ func NewAppState() *fsm.FSM {
                                
metrics.GetSchedulerMetrics().IncTotalApplicationsCompleted()
                                
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsCompleted()
                                app.setStateTimer(terminatedTimeout, 
app.stateMachine.Current(), ExpireApplication)
-                               
app.appEvents.sendStateChangeEvent(si.EventRecord_APP_COMPLETED)
                                app.executeTerminatedCallback()
                                app.clearPlaceholderTimer()
                                app.cleanupAsks()
@@ -216,12 +231,10 @@ func NewAppState() *fsm.FSM {
                                app := event.Args[0].(*Application) 
//nolint:errcheck
                                
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsFailed()
                                
metrics.GetSchedulerMetrics().IncTotalApplicationsFailed()
-                               
app.appEvents.sendStateChangeEvent(si.EventRecord_APP_FAILING)
                        },
                        fmt.Sprintf("enter_%s", Failed.String()): func(_ 
context.Context, event *fsm.Event) {
                                app := event.Args[0].(*Application) 
//nolint:errcheck
                                app.setStateTimer(terminatedTimeout, 
app.stateMachine.Current(), ExpireApplication)
-                               
app.appEvents.sendStateChangeEvent(si.EventRecord_APP_FAILED)
                                app.executeTerminatedCallback()
                                app.cleanupAsks()
                        },
diff --git a/pkg/scheduler/objects/application_state_test.go 
b/pkg/scheduler/objects/application_state_test.go
index cdd4b278..2f1bf564 100644
--- a/pkg/scheduler/objects/application_state_test.go
+++ b/pkg/scheduler/objects/application_state_test.go
@@ -272,21 +272,34 @@ func TestAppStateTransitionEvents(t *testing.T) {
        assert.NilError(t, err, "no error expected failing to failed")
        assert.Assert(t, appInfo.IsFailed(), "App should be in Failed state")
 
+       // failed to expired
+       err = appInfo.HandleApplicationEvent(ExpireApplication)
+       assert.NilError(t, err, "no error expected failed to expired")
+       assert.Assert(t, appInfo.IsExpired(), "App should be in Expired state")
+
+       // accepted to resuming
+       appInfo.stateMachine.SetState(Accepted.String())
+       err = appInfo.HandleApplicationEvent(ResumeApplication)
+       assert.NilError(t, err, "no error expected accepted to resuming")
+       assert.Assert(t, appInfo.IsResuming(), "App should be in Resuming 
state")
+
        // Verify application events
        err = common.WaitFor(10*time.Millisecond, time.Second, func() bool {
                fmt.Printf("checking event length: %d\n", 
eventSystem.Store.CountStoredEvents())
-               return eventSystem.Store.CountStoredEvents() == 6
+               return eventSystem.Store.CountStoredEvents() == 8
        })
        assert.NilError(t, err, "the event should have been processed")
        records := eventSystem.Store.CollectEvents()
        if records == nil {
                t.Fatal("collecting eventChannel should return something")
        }
-       assert.Equal(t, 6, len(records), "expecting 6 events")
+       assert.Equal(t, 8, len(records), "expecting 8 events")
        isNewApplicationEvent(t, appInfo, records[0])
        isStateChangeEvent(t, appInfo, si.EventRecord_APP_ACCEPTED, records[1])
        isStateChangeEvent(t, appInfo, si.EventRecord_APP_COMPLETING, 
records[2])
        isStateChangeEvent(t, appInfo, si.EventRecord_APP_RUNNING, records[3])
        isStateChangeEvent(t, appInfo, si.EventRecord_APP_FAILING, records[4])
        isStateChangeEvent(t, appInfo, si.EventRecord_APP_FAILED, records[5])
+       isStateChangeEvent(t, appInfo, si.EventRecord_APP_EXPIRED, records[6])
+       isStateChangeEvent(t, appInfo, si.EventRecord_APP_RESUMING, records[7])
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to