This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 2feb6ecc [YUNIKORN-1945] Add missing application state transition
event (#629)
2feb6ecc is described below
commit 2feb6ecc13849130c99f90a78e053fc6b319e725
Author: Peter Bacsko <[email protected]>
AuthorDate: Thu Aug 31 19:46:55 2023 +0200
[YUNIKORN-1945] Add missing application state transition event (#629)
Closes: #629
Signed-off-by: Peter Bacsko <[email protected]>
---
pkg/scheduler/objects/application.go | 3 ---
pkg/scheduler/objects/application_state.go | 27 ++++++++++++++++++-------
pkg/scheduler/objects/application_state_test.go | 17 ++++++++++++++--
3 files changed, 35 insertions(+), 12 deletions(-)
diff --git a/pkg/scheduler/objects/application.go
b/pkg/scheduler/objects/application.go
index c4fe10b1..0124ddf1 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -308,9 +308,6 @@ func (sa *Application) HandleApplicationEventWithInfo(event
applicationEvent, ev
if err != nil && err.Error() == noTransition {
return nil
}
- if event == RejectApplication {
- sa.appEvents.sendRejectApplicationEvent(eventInfo)
- }
return err
}
diff --git a/pkg/scheduler/objects/application_state.go
b/pkg/scheduler/objects/application_state.go
index 98961884..35dbc6b3 100644
--- a/pkg/scheduler/objects/application_state.go
+++ b/pkg/scheduler/objects/application_state.go
@@ -74,6 +74,19 @@ const (
Resuming
)
+var stateEvents = map[string]si.EventRecord_ChangeDetail{
+ Accepted.String(): si.EventRecord_APP_ACCEPTED,
+ Starting.String(): si.EventRecord_APP_STARTING,
+ Running.String(): si.EventRecord_APP_RUNNING,
+ Rejected.String(): si.EventRecord_APP_REJECT,
+ Completing.String(): si.EventRecord_APP_COMPLETING,
+ Completed.String(): si.EventRecord_APP_COMPLETED,
+ Failing.String(): si.EventRecord_APP_FAILING,
+ Failed.String(): si.EventRecord_APP_FAILED,
+ Resuming.String(): si.EventRecord_APP_RESUMING,
+ Expired.String(): si.EventRecord_APP_EXPIRED,
+}
+
func (as applicationState) String() string {
return [...]string{"New", "Accepted", "Starting", "Running",
"Rejected", "Completing", "Completed", "Failing", "Failed", "Expired",
"Resuming"}[as]
}
@@ -142,6 +155,13 @@ func NewAppState() *fsm.FSM {
} else {
app.OnStateChange(event, "")
}
+ eventDetails, ok := stateEvents[event.Dst]
+ if !ok {
+ log.Log(log.SchedFSM).Error("event
details not found",
+ zap.String("state", event.Dst))
+ return
+ }
+ app.appEvents.sendStateChangeEvent(eventDetails)
},
"leave_state": func(_ context.Context, event
*fsm.Event) {
event.Args[0].(*Application).clearStateTimer()
//nolint:errcheck
@@ -153,7 +173,6 @@ func NewAppState() *fsm.FSM {
app.setStateTimer(app.startTimeout,
app.stateMachine.Current(), RunApplication)
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsRunning()
metrics.GetSchedulerMetrics().IncTotalApplicationsRunning()
-
app.appEvents.sendStateChangeEvent(si.EventRecord_APP_STARTING)
},
fmt.Sprintf("enter_%s", Resuming.String()): func(_
context.Context, event *fsm.Event) {
app := event.Args[0].(*Application)
//nolint:errcheck
@@ -168,13 +187,11 @@ func NewAppState() *fsm.FSM {
metrics.GetSchedulerMetrics().DecTotalApplicationsRunning()
}
app.setStateTimer(completingTimeout,
app.stateMachine.Current(), CompleteApplication)
-
app.appEvents.sendStateChangeEvent(si.EventRecord_APP_COMPLETING)
},
fmt.Sprintf("leave_%s", New.String()): func(_
context.Context, event *fsm.Event) {
app := event.Args[0].(*Application)
//nolint:errcheck
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsAccepted()
metrics.GetSchedulerMetrics().IncTotalApplicationsAccepted()
-
app.appEvents.sendStateChangeEvent(si.EventRecord_APP_ACCEPTED)
},
fmt.Sprintf("enter_%s", Rejected.String()): func(_
context.Context, event *fsm.Event) {
app := event.Args[0].(*Application)
//nolint:errcheck
@@ -199,7 +216,6 @@ func NewAppState() *fsm.FSM {
// account for going back into running state
if event.Dst == Running.String() {
app.queue.incRunningApps(app.ApplicationID)
-
app.appEvents.sendStateChangeEvent(si.EventRecord_APP_RUNNING)
}
},
fmt.Sprintf("enter_%s", Completed.String()): func(_
context.Context, event *fsm.Event) {
@@ -207,7 +223,6 @@ func NewAppState() *fsm.FSM {
metrics.GetSchedulerMetrics().IncTotalApplicationsCompleted()
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsCompleted()
app.setStateTimer(terminatedTimeout,
app.stateMachine.Current(), ExpireApplication)
-
app.appEvents.sendStateChangeEvent(si.EventRecord_APP_COMPLETED)
app.executeTerminatedCallback()
app.clearPlaceholderTimer()
app.cleanupAsks()
@@ -216,12 +231,10 @@ func NewAppState() *fsm.FSM {
app := event.Args[0].(*Application)
//nolint:errcheck
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsFailed()
metrics.GetSchedulerMetrics().IncTotalApplicationsFailed()
-
app.appEvents.sendStateChangeEvent(si.EventRecord_APP_FAILING)
},
fmt.Sprintf("enter_%s", Failed.String()): func(_
context.Context, event *fsm.Event) {
app := event.Args[0].(*Application)
//nolint:errcheck
app.setStateTimer(terminatedTimeout,
app.stateMachine.Current(), ExpireApplication)
-
app.appEvents.sendStateChangeEvent(si.EventRecord_APP_FAILED)
app.executeTerminatedCallback()
app.cleanupAsks()
},
diff --git a/pkg/scheduler/objects/application_state_test.go
b/pkg/scheduler/objects/application_state_test.go
index cdd4b278..2f1bf564 100644
--- a/pkg/scheduler/objects/application_state_test.go
+++ b/pkg/scheduler/objects/application_state_test.go
@@ -272,21 +272,34 @@ func TestAppStateTransitionEvents(t *testing.T) {
assert.NilError(t, err, "no error expected failing to failed")
assert.Assert(t, appInfo.IsFailed(), "App should be in Failed state")
+ // failed to expired
+ err = appInfo.HandleApplicationEvent(ExpireApplication)
+ assert.NilError(t, err, "no error expected failed to expired")
+ assert.Assert(t, appInfo.IsExpired(), "App should be in Expired state")
+
+ // accepted to resuming
+ appInfo.stateMachine.SetState(Accepted.String())
+ err = appInfo.HandleApplicationEvent(ResumeApplication)
+ assert.NilError(t, err, "no error expected accepted to resuming")
+ assert.Assert(t, appInfo.IsResuming(), "App should be in Resuming
state")
+
// Verify application events
err = common.WaitFor(10*time.Millisecond, time.Second, func() bool {
fmt.Printf("checking event length: %d\n",
eventSystem.Store.CountStoredEvents())
- return eventSystem.Store.CountStoredEvents() == 6
+ return eventSystem.Store.CountStoredEvents() == 8
})
assert.NilError(t, err, "the event should have been processed")
records := eventSystem.Store.CollectEvents()
if records == nil {
t.Fatal("collecting eventChannel should return something")
}
- assert.Equal(t, 6, len(records), "expecting 6 events")
+ assert.Equal(t, 8, len(records), "expecting 8 events")
isNewApplicationEvent(t, appInfo, records[0])
isStateChangeEvent(t, appInfo, si.EventRecord_APP_ACCEPTED, records[1])
isStateChangeEvent(t, appInfo, si.EventRecord_APP_COMPLETING,
records[2])
isStateChangeEvent(t, appInfo, si.EventRecord_APP_RUNNING, records[3])
isStateChangeEvent(t, appInfo, si.EventRecord_APP_FAILING, records[4])
isStateChangeEvent(t, appInfo, si.EventRecord_APP_FAILED, records[5])
+ isStateChangeEvent(t, appInfo, si.EventRecord_APP_EXPIRED, records[6])
+ isStateChangeEvent(t, appInfo, si.EventRecord_APP_RESUMING, records[7])
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]