This is an automated email from the ASF dual-hosted git repository. pbacsko pushed a commit to branch yunikorn-1.3.1 in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
commit 407b025efa7cfe9d007622870e3ba541b978534b Author: Peter Bacsko <[email protected]> AuthorDate: Thu Aug 31 19:46:55 2023 +0200 [YUNIKORN-1945] Add missing application state transition event (#629) Closes: #629 Signed-off-by: Peter Bacsko <[email protected]> (cherry picked from commit 2feb6ecc13849130c99f90a78e053fc6b319e725) --- pkg/scheduler/objects/application.go | 3 --- pkg/scheduler/objects/application_state.go | 27 ++++++++++++++++++------- pkg/scheduler/objects/application_state_test.go | 17 ++++++++++++++-- 3 files changed, 35 insertions(+), 12 deletions(-) diff --git a/pkg/scheduler/objects/application.go b/pkg/scheduler/objects/application.go index 3f58c1a7..56d6a6b8 100644 --- a/pkg/scheduler/objects/application.go +++ b/pkg/scheduler/objects/application.go @@ -308,9 +308,6 @@ func (sa *Application) HandleApplicationEventWithInfo(event applicationEvent, ev if err != nil && err.Error() == noTransition { return nil } - if event == RejectApplication { - sa.appEvents.sendRejectApplicationEvent(eventInfo) - } return err } diff --git a/pkg/scheduler/objects/application_state.go b/pkg/scheduler/objects/application_state.go index 98961884..35dbc6b3 100644 --- a/pkg/scheduler/objects/application_state.go +++ b/pkg/scheduler/objects/application_state.go @@ -74,6 +74,19 @@ const ( Resuming ) +var stateEvents = map[string]si.EventRecord_ChangeDetail{ + Accepted.String(): si.EventRecord_APP_ACCEPTED, + Starting.String(): si.EventRecord_APP_STARTING, + Running.String(): si.EventRecord_APP_RUNNING, + Rejected.String(): si.EventRecord_APP_REJECT, + Completing.String(): si.EventRecord_APP_COMPLETING, + Completed.String(): si.EventRecord_APP_COMPLETED, + Failing.String(): si.EventRecord_APP_FAILING, + Failed.String(): si.EventRecord_APP_FAILED, + Resuming.String(): si.EventRecord_APP_RESUMING, + Expired.String(): si.EventRecord_APP_EXPIRED, +} + func (as applicationState) String() string { return [...]string{"New", "Accepted", "Starting", "Running", "Rejected", "Completing", "Completed", "Failing", "Failed", "Expired", "Resuming"}[as] } @@ -142,6 +155,13 @@ func NewAppState() *fsm.FSM { } else { app.OnStateChange(event, "") } + eventDetails, ok := stateEvents[event.Dst] + if !ok { + log.Log(log.SchedFSM).Error("event details not found", + zap.String("state", event.Dst)) + return + } + app.appEvents.sendStateChangeEvent(eventDetails) }, "leave_state": func(_ context.Context, event *fsm.Event) { event.Args[0].(*Application).clearStateTimer() //nolint:errcheck @@ -153,7 +173,6 @@ func NewAppState() *fsm.FSM { app.setStateTimer(app.startTimeout, app.stateMachine.Current(), RunApplication) metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsRunning() metrics.GetSchedulerMetrics().IncTotalApplicationsRunning() - app.appEvents.sendStateChangeEvent(si.EventRecord_APP_STARTING) }, fmt.Sprintf("enter_%s", Resuming.String()): func(_ context.Context, event *fsm.Event) { app := event.Args[0].(*Application) //nolint:errcheck @@ -168,13 +187,11 @@ func NewAppState() *fsm.FSM { metrics.GetSchedulerMetrics().DecTotalApplicationsRunning() } app.setStateTimer(completingTimeout, app.stateMachine.Current(), CompleteApplication) - app.appEvents.sendStateChangeEvent(si.EventRecord_APP_COMPLETING) }, fmt.Sprintf("leave_%s", New.String()): func(_ context.Context, event *fsm.Event) { app := event.Args[0].(*Application) //nolint:errcheck metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsAccepted() metrics.GetSchedulerMetrics().IncTotalApplicationsAccepted() - app.appEvents.sendStateChangeEvent(si.EventRecord_APP_ACCEPTED) }, fmt.Sprintf("enter_%s", Rejected.String()): func(_ context.Context, event *fsm.Event) { app := event.Args[0].(*Application) //nolint:errcheck @@ -199,7 +216,6 @@ func NewAppState() *fsm.FSM { // account for going back into running state if event.Dst == Running.String() { app.queue.incRunningApps(app.ApplicationID) - app.appEvents.sendStateChangeEvent(si.EventRecord_APP_RUNNING) } }, fmt.Sprintf("enter_%s", Completed.String()): func(_ context.Context, event *fsm.Event) { @@ -207,7 +223,6 @@ func NewAppState() *fsm.FSM { metrics.GetSchedulerMetrics().IncTotalApplicationsCompleted() metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsCompleted() app.setStateTimer(terminatedTimeout, app.stateMachine.Current(), ExpireApplication) - app.appEvents.sendStateChangeEvent(si.EventRecord_APP_COMPLETED) app.executeTerminatedCallback() app.clearPlaceholderTimer() app.cleanupAsks() @@ -216,12 +231,10 @@ func NewAppState() *fsm.FSM { app := event.Args[0].(*Application) //nolint:errcheck metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsFailed() metrics.GetSchedulerMetrics().IncTotalApplicationsFailed() - app.appEvents.sendStateChangeEvent(si.EventRecord_APP_FAILING) }, fmt.Sprintf("enter_%s", Failed.String()): func(_ context.Context, event *fsm.Event) { app := event.Args[0].(*Application) //nolint:errcheck app.setStateTimer(terminatedTimeout, app.stateMachine.Current(), ExpireApplication) - app.appEvents.sendStateChangeEvent(si.EventRecord_APP_FAILED) app.executeTerminatedCallback() app.cleanupAsks() }, diff --git a/pkg/scheduler/objects/application_state_test.go b/pkg/scheduler/objects/application_state_test.go index cdd4b278..2f1bf564 100644 --- a/pkg/scheduler/objects/application_state_test.go +++ b/pkg/scheduler/objects/application_state_test.go @@ -272,21 +272,34 @@ func TestAppStateTransitionEvents(t *testing.T) { assert.NilError(t, err, "no error expected failing to failed") assert.Assert(t, appInfo.IsFailed(), "App should be in Failed state") + // failed to expired + err = appInfo.HandleApplicationEvent(ExpireApplication) + assert.NilError(t, err, "no error expected failed to expired") + assert.Assert(t, appInfo.IsExpired(), "App should be in Expired state") + + // accepted to resuming + appInfo.stateMachine.SetState(Accepted.String()) + err = appInfo.HandleApplicationEvent(ResumeApplication) + assert.NilError(t, err, "no error expected accepted to resuming") + assert.Assert(t, appInfo.IsResuming(), "App should be in Resuming state") + // Verify application events err = common.WaitFor(10*time.Millisecond, time.Second, func() bool { fmt.Printf("checking event length: %d\n", eventSystem.Store.CountStoredEvents()) - return eventSystem.Store.CountStoredEvents() == 6 + return eventSystem.Store.CountStoredEvents() == 8 }) assert.NilError(t, err, "the event should have been processed") records := eventSystem.Store.CollectEvents() if records == nil { t.Fatal("collecting eventChannel should return something") } - assert.Equal(t, 6, len(records), "expecting 6 events") + assert.Equal(t, 8, len(records), "expecting 8 events") isNewApplicationEvent(t, appInfo, records[0]) isStateChangeEvent(t, appInfo, si.EventRecord_APP_ACCEPTED, records[1]) isStateChangeEvent(t, appInfo, si.EventRecord_APP_COMPLETING, records[2]) isStateChangeEvent(t, appInfo, si.EventRecord_APP_RUNNING, records[3]) isStateChangeEvent(t, appInfo, si.EventRecord_APP_FAILING, records[4]) isStateChangeEvent(t, appInfo, si.EventRecord_APP_FAILED, records[5]) + isStateChangeEvent(t, appInfo, si.EventRecord_APP_EXPIRED, records[6]) + isStateChangeEvent(t, appInfo, si.EventRecord_APP_RESUMING, records[7]) } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
