This is an automated email from the ASF dual-hosted git repository.
wilfreds pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 039f28f7 [YUNIKORN-2010] Application metric incorrect (#667)
039f28f7 is described below
commit 039f28f7268e7e78054f64bd876a30b317adc294
Author: Yu-Lin Chen <[email protected]>
AuthorDate: Fri Nov 3 11:58:09 2023 +1100
[YUNIKORN-2010] Application metric incorrect (#667)
The web UI shows negative number of applications running in the
application history. Root cause is an incorrect tracking of running
applications in the metrics.
* remove 'enter_Resuming' metrics changes
* remove '*_Completing' metrics changes
* use 'enter_Running' for metrics changes
* enhance metrics trace and add more test cases
* correctly update accepted application metric
Closes: #667
Signed-off-by: Wilfred Spiegelenburg <[email protected]>
---
pkg/metrics/init.go | 9 +-
pkg/metrics/metrics_collector.go | 2 +-
pkg/metrics/queue.go | 46 +++++
pkg/metrics/scheduler.go | 20 +-
pkg/scheduler/objects/application_state.go | 40 ++--
pkg/scheduler/objects/application_state_test.go | 242 ++++++++++++++++++++++++
6 files changed, 337 insertions(+), 22 deletions(-)
diff --git a/pkg/metrics/init.go b/pkg/metrics/init.go
index e7cdeb56..7a4f5b4e 100644
--- a/pkg/metrics/init.go
+++ b/pkg/metrics/init.go
@@ -47,11 +47,16 @@ type Metrics struct {
type CoreQueueMetrics interface {
IncQueueApplicationsAccepted()
+ GetQueueApplicationsAccepted() (int, error)
IncQueueApplicationsRejected()
+ GetQueueApplicationsRejected() (int, error)
IncQueueApplicationsRunning()
DecQueueApplicationsRunning()
+ GetQueueApplicationsRunning() (int, error)
IncQueueApplicationsFailed()
+ GetQueueApplicationsFailed() (int, error)
IncQueueApplicationsCompleted()
+ GetQueueApplicationsCompleted() (int, error)
IncAllocatedContainer()
IncReleasedContainer()
AddReleasedContainers(value int)
@@ -102,6 +107,7 @@ type CoreSchedulerMetrics interface {
// Metrics Ops related to TotalApplicationsRejected
IncTotalApplicationsRejected()
AddTotalApplicationsRejected(value int)
+ GetTotalApplicationsRejected() (int, error)
// Metrics Ops related to TotalApplicationsRunning
IncTotalApplicationsRunning()
@@ -109,7 +115,7 @@ type CoreSchedulerMetrics interface {
DecTotalApplicationsRunning()
SubTotalApplicationsRunning(value int)
SetTotalApplicationsRunning(value int)
- getTotalApplicationsRunning() (int, error)
+ GetTotalApplicationsRunning() (int, error)
// Metrics Ops related to TotalApplicationsFailed
IncTotalApplicationsFailed()
@@ -120,6 +126,7 @@ type CoreSchedulerMetrics interface {
DecTotalApplicationsCompleted()
SubTotalApplicationsCompleted(value int)
SetTotalApplicationsCompleted(value int)
+ GetTotalApplicationsCompleted() (int, error)
// Metrics Ops related to ActiveNodes
IncActiveNodes()
diff --git a/pkg/metrics/metrics_collector.go b/pkg/metrics/metrics_collector.go
index ee9651ed..12bcb13e 100644
--- a/pkg/metrics/metrics_collector.go
+++ b/pkg/metrics/metrics_collector.go
@@ -67,7 +67,7 @@ func (u *internalMetricsCollector) StartService() {
func (u *internalMetricsCollector) store() {
log.Log(log.Metrics).Debug("Adding current status to historical
partition data")
- totalAppsRunning, err := m.scheduler.getTotalApplicationsRunning()
+ totalAppsRunning, err := m.scheduler.GetTotalApplicationsRunning()
if err != nil {
log.Log(log.Metrics).Warn("Could not encode totalApplications
metric.", zap.Error(err))
totalAppsRunning = -1
diff --git a/pkg/metrics/queue.go b/pkg/metrics/queue.go
index 8e0dc70a..cb227cf4 100644
--- a/pkg/metrics/queue.go
+++ b/pkg/metrics/queue.go
@@ -20,6 +20,7 @@ package metrics
import (
"github.com/prometheus/client_golang/prometheus"
+ dto "github.com/prometheus/client_model/go"
"go.uber.org/zap"
"github.com/apache/yunikorn-core/pkg/log"
@@ -94,22 +95,67 @@ func (m *QueueMetrics) DecQueueApplicationsRunning() {
m.appMetrics.With(prometheus.Labels{"state": "running"}).Dec()
}
+func (m *QueueMetrics) GetQueueApplicationsRunning() (int, error) {
+ metricDto := &dto.Metric{}
+ err := m.appMetrics.With(prometheus.Labels{"state":
"running"}).Write(metricDto)
+ if err == nil {
+ return int(*metricDto.Gauge.Value), nil
+ }
+ return -1, err
+}
+
func (m *QueueMetrics) IncQueueApplicationsAccepted() {
m.appMetrics.With(prometheus.Labels{"state": "accepted"}).Inc()
}
+func (m *QueueMetrics) GetQueueApplicationsAccepted() (int, error) {
+ metricDto := &dto.Metric{}
+ err := m.appMetrics.With(prometheus.Labels{"state":
"accepted"}).Write(metricDto)
+ if err == nil {
+ return int(*metricDto.Gauge.Value), nil
+ }
+ return -1, err
+}
+
func (m *QueueMetrics) IncQueueApplicationsRejected() {
m.appMetrics.With(prometheus.Labels{"state": "rejected"}).Inc()
}
+func (m *QueueMetrics) GetQueueApplicationsRejected() (int, error) {
+ metricDto := &dto.Metric{}
+ err := m.appMetrics.With(prometheus.Labels{"state":
"rejected"}).Write(metricDto)
+ if err == nil {
+ return int(*metricDto.Gauge.Value), nil
+ }
+ return -1, err
+}
+
func (m *QueueMetrics) IncQueueApplicationsFailed() {
m.appMetrics.With(prometheus.Labels{"state": "failed"}).Inc()
}
+func (m *QueueMetrics) GetQueueApplicationsFailed() (int, error) {
+ metricDto := &dto.Metric{}
+ err := m.appMetrics.With(prometheus.Labels{"state":
"failed"}).Write(metricDto)
+ if err == nil {
+ return int(*metricDto.Gauge.Value), nil
+ }
+ return -1, err
+}
+
func (m *QueueMetrics) IncQueueApplicationsCompleted() {
m.appMetrics.With(prometheus.Labels{"state": "completed"}).Inc()
}
+func (m *QueueMetrics) GetQueueApplicationsCompleted() (int, error) {
+ metricDto := &dto.Metric{}
+ err := m.appMetrics.With(prometheus.Labels{"state":
"completed"}).Write(metricDto)
+ if err == nil {
+ return int(*metricDto.Gauge.Value), nil
+ }
+ return -1, err
+}
+
func (m *QueueMetrics) IncAllocatedContainer() {
m.containerMetrics.With(prometheus.Labels{"state": "allocated"}).Inc()
}
diff --git a/pkg/metrics/scheduler.go b/pkg/metrics/scheduler.go
index b7721eae..0134c3c7 100644
--- a/pkg/metrics/scheduler.go
+++ b/pkg/metrics/scheduler.go
@@ -264,6 +264,15 @@ func (m *SchedulerMetrics)
AddTotalApplicationsRejected(value int) {
m.applicationSubmission.With(prometheus.Labels{"result":
"rejected"}).Add(float64(value))
}
+func (m *SchedulerMetrics) GetTotalApplicationsRejected() (int, error) {
+ metricDto := &dto.Metric{}
+ err := m.applicationSubmission.With(prometheus.Labels{"result":
"rejected"}).Write(metricDto)
+ if err == nil {
+ return int(*metricDto.Counter.Value), nil
+ }
+ return -1, err
+}
+
func (m *SchedulerMetrics) IncTotalApplicationsRunning() {
m.application.With(prometheus.Labels{"state": "running"}).Inc()
}
@@ -284,7 +293,7 @@ func (m *SchedulerMetrics)
SetTotalApplicationsRunning(value int) {
m.application.With(prometheus.Labels{"state":
"running"}).Set(float64(value))
}
-func (m *SchedulerMetrics) getTotalApplicationsRunning() (int, error) {
+func (m *SchedulerMetrics) GetTotalApplicationsRunning() (int, error) {
metricDto := &dto.Metric{}
err := m.application.With(prometheus.Labels{"state":
"running"}).Write(metricDto)
if err == nil {
@@ -317,6 +326,15 @@ func (m *SchedulerMetrics)
SetTotalApplicationsCompleted(value int) {
m.application.With(prometheus.Labels{"state":
"completed"}).Set(float64(value))
}
+func (m *SchedulerMetrics) GetTotalApplicationsCompleted() (int, error) {
+ metricDto := &dto.Metric{}
+ err := m.application.With(prometheus.Labels{"state":
"completed"}).Write(metricDto)
+ if err == nil {
+ return int(*metricDto.Gauge.Value), nil
+ }
+ return -1, err
+}
+
func (m *SchedulerMetrics) IncActiveNodes() {
m.node.With(prometheus.Labels{"state": "active"}).Inc()
}
diff --git a/pkg/scheduler/objects/application_state.go
b/pkg/scheduler/objects/application_state.go
index 35dbc6b3..925d80f4 100644
--- a/pkg/scheduler/objects/application_state.go
+++ b/pkg/scheduler/objects/application_state.go
@@ -169,29 +169,29 @@ func NewAppState() *fsm.FSM {
fmt.Sprintf("enter_%s", Starting.String()): func(_
context.Context, event *fsm.Event) {
app := event.Args[0].(*Application)
//nolint:errcheck
app.startTime = time.Now()
- app.queue.incRunningApps(app.ApplicationID)
app.setStateTimer(app.startTimeout,
app.stateMachine.Current(), RunApplication)
+ app.queue.incRunningApps(app.ApplicationID)
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsRunning()
metrics.GetSchedulerMetrics().IncTotalApplicationsRunning()
},
- fmt.Sprintf("enter_%s", Resuming.String()): func(_
context.Context, event *fsm.Event) {
- app := event.Args[0].(*Application)
//nolint:errcheck
-
metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsRunning()
-
metrics.GetSchedulerMetrics().DecTotalApplicationsRunning()
- },
- fmt.Sprintf("enter_%s", Completing.String()): func(_
context.Context, event *fsm.Event) {
- app := event.Args[0].(*Application)
//nolint:errcheck
- if event.Src == Starting.String() {
+ fmt.Sprintf("leave_%s", Starting.String()): func(_
context.Context, event *fsm.Event) {
+ if event.Dst != Running.String() {
+ app := event.Args[0].(*Application)
//nolint:errcheck
app.queue.decRunningApps()
metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsRunning()
metrics.GetSchedulerMetrics().DecTotalApplicationsRunning()
}
+ },
+ fmt.Sprintf("enter_%s", Completing.String()): func(_
context.Context, event *fsm.Event) {
+ app := event.Args[0].(*Application)
//nolint:errcheck
app.setStateTimer(completingTimeout,
app.stateMachine.Current(), CompleteApplication)
},
fmt.Sprintf("leave_%s", New.String()): func(_
context.Context, event *fsm.Event) {
- app := event.Args[0].(*Application)
//nolint:errcheck
-
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsAccepted()
-
metrics.GetSchedulerMetrics().IncTotalApplicationsAccepted()
+ if event.Dst != Rejected.String() {
+ app := event.Args[0].(*Application)
//nolint:errcheck
+
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsAccepted()
+
metrics.GetSchedulerMetrics().IncTotalApplicationsAccepted()
+ }
},
fmt.Sprintf("enter_%s", Rejected.String()): func(_
context.Context, event *fsm.Event) {
app := event.Args[0].(*Application)
//nolint:errcheck
@@ -205,19 +205,21 @@ func NewAppState() *fsm.FSM {
app.rejectedMessage =
event.Args[1].(string) //nolint:errcheck
}
},
+ fmt.Sprintf("enter_%s", Running.String()): func(_
context.Context, event *fsm.Event) {
+ app := event.Args[0].(*Application)
//nolint:errcheck
+ // account for going back into running state
+ if event.Src == Completing.String() {
+
app.queue.incRunningApps(app.ApplicationID)
+
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsRunning()
+
metrics.GetSchedulerMetrics().IncTotalApplicationsRunning()
+ }
+ },
fmt.Sprintf("leave_%s", Running.String()): func(_
context.Context, event *fsm.Event) {
app := event.Args[0].(*Application)
//nolint:errcheck
app.queue.decRunningApps()
metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsRunning()
metrics.GetSchedulerMetrics().DecTotalApplicationsRunning()
},
- fmt.Sprintf("leave_%s", Completing.String()): func(_
context.Context, event *fsm.Event) {
- app := event.Args[0].(*Application)
//nolint:errcheck
- // account for going back into running state
- if event.Dst == Running.String() {
-
app.queue.incRunningApps(app.ApplicationID)
- }
- },
fmt.Sprintf("enter_%s", Completed.String()): func(_
context.Context, event *fsm.Event) {
app := event.Args[0].(*Application)
//nolint:errcheck
metrics.GetSchedulerMetrics().IncTotalApplicationsCompleted()
diff --git a/pkg/scheduler/objects/application_state_test.go
b/pkg/scheduler/objects/application_state_test.go
index 2f1bf564..7a07f808 100644
--- a/pkg/scheduler/objects/application_state_test.go
+++ b/pkg/scheduler/objects/application_state_test.go
@@ -27,6 +27,7 @@ import (
"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/events"
+ "github.com/apache/yunikorn-core/pkg/metrics"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
@@ -303,3 +304,244 @@ func TestAppStateTransitionEvents(t *testing.T) {
isStateChangeEvent(t, appInfo, si.EventRecord_APP_EXPIRED, records[6])
isStateChangeEvent(t, appInfo, si.EventRecord_APP_RESUMING, records[7])
}
+
+// Test to verify metrics after applications state transition
+// app-00001: New -> Resuming -> Accepted -> Starting -> Running ->
Completing-> Completed
+// app-00002: New -> Accepted -> Starting -> Completing -> Running ->
Failing-> Failed
+// app-00003: New -> Accepted -> Starting -> Failing -> Failed
+// app-00004: New -> Rejected
+// Final metrics will be: 0 running, 3 accepted, 1 completed, 2 failed and 1
rejected applications
+func TestAppStateTransitionMetrics(t *testing.T) { //nolint:funlen
+ queue := createQueue(t, "root.metrics")
+ metrics.GetSchedulerMetrics().Reset()
+ // app-00001: New -> Resuming -> Accepted --> Starting -> Running ->
Completing-> Completed
+ app := newApplication("app-00001", "default", "root.metrics")
+ app.SetQueue(queue)
+ assertState(t, app, nil, New.String())
+ assertTotalAppsRunningMetrics(t, 0)
+ assertTotalAppsCompletedMetrics(t, 0)
+ assertTotalAppsRejectedMetrics(t, 0)
+ // New -> Resuming
+ err := app.HandleApplicationEvent(ResumeApplication)
+ assertState(t, app, err, Resuming.String())
+ assertTotalAppsRunningMetrics(t, 0)
+ assertTotalAppsCompletedMetrics(t, 0)
+ assertTotalAppsRejectedMetrics(t, 0)
+ assertQueueRunningApps(t, app, 0)
+ assertQueueApplicationsRunningMetrics(t, app, 0)
+ assertQueueApplicationsAcceptedMetrics(t, app, 1)
+ assertQueueApplicationsRejectedMetrics(t, app, 0)
+ assertQueueApplicationsFailedMetrics(t, app, 0)
+ assertQueueApplicationsCompletedMetrics(t, app, 0)
+ // Resuming -> Accepted
+ err = app.HandleApplicationEvent(RunApplication)
+ assertState(t, app, err, Accepted.String())
+ assertTotalAppsRunningMetrics(t, 0)
+ assertTotalAppsCompletedMetrics(t, 0)
+ assertTotalAppsRejectedMetrics(t, 0)
+ assertQueueRunningApps(t, app, 0)
+ assertQueueApplicationsRunningMetrics(t, app, 0)
+ assertQueueApplicationsAcceptedMetrics(t, app, 1)
+ assertQueueApplicationsRejectedMetrics(t, app, 0)
+ assertQueueApplicationsFailedMetrics(t, app, 0)
+ assertQueueApplicationsCompletedMetrics(t, app, 0)
+ // Accepted -> Starting
+ err = app.HandleApplicationEvent(RunApplication)
+ assertState(t, app, err, Starting.String())
+ assertTotalAppsRunningMetrics(t, 1)
+ assertTotalAppsCompletedMetrics(t, 0)
+ assertTotalAppsRejectedMetrics(t, 0)
+ assertQueueRunningApps(t, app, 1)
+ assertQueueApplicationsRunningMetrics(t, app, 1)
+ assertQueueApplicationsAcceptedMetrics(t, app, 1)
+ assertQueueApplicationsRejectedMetrics(t, app, 0)
+ assertQueueApplicationsFailedMetrics(t, app, 0)
+ assertQueueApplicationsCompletedMetrics(t, app, 0)
+ // Starting -> Running
+ err = app.HandleApplicationEvent(RunApplication)
+ assertState(t, app, err, Running.String())
+ assertTotalAppsRunningMetrics(t, 1)
+ assertTotalAppsCompletedMetrics(t, 0)
+ assertTotalAppsRejectedMetrics(t, 0)
+ assertQueueRunningApps(t, app, 1)
+ assertQueueApplicationsRunningMetrics(t, app, 1)
+ assertQueueApplicationsAcceptedMetrics(t, app, 1)
+ assertQueueApplicationsRejectedMetrics(t, app, 0)
+ assertQueueApplicationsFailedMetrics(t, app, 0)
+ assertQueueApplicationsCompletedMetrics(t, app, 0)
+ // Running -> Completing
+ err = app.HandleApplicationEvent(CompleteApplication)
+ assertState(t, app, err, Completing.String())
+ assertTotalAppsRunningMetrics(t, 0)
+ assertTotalAppsCompletedMetrics(t, 0)
+ assertTotalAppsRejectedMetrics(t, 0)
+ assertQueueRunningApps(t, app, 0)
+ assertQueueApplicationsRunningMetrics(t, app, 0)
+ assertQueueApplicationsAcceptedMetrics(t, app, 1)
+ assertQueueApplicationsRejectedMetrics(t, app, 0)
+ assertQueueApplicationsFailedMetrics(t, app, 0)
+ assertQueueApplicationsCompletedMetrics(t, app, 0)
+ // Completing -> Completed
+ err = app.HandleApplicationEvent(CompleteApplication)
+ assertState(t, app, err, Completed.String())
+ assertTotalAppsRunningMetrics(t, 0)
+ assertTotalAppsCompletedMetrics(t, 1)
+ assertTotalAppsRejectedMetrics(t, 0)
+ assertQueueRunningApps(t, app, 0)
+ assertQueueApplicationsRunningMetrics(t, app, 0)
+ assertQueueApplicationsAcceptedMetrics(t, app, 1)
+ assertQueueApplicationsRejectedMetrics(t, app, 0)
+ assertQueueApplicationsFailedMetrics(t, app, 0)
+ assertQueueApplicationsCompletedMetrics(t, app, 1)
+
+ // app-00002: New -> Accepted -> Starting -> Completing -> Running ->
Failing-> Failed
+ app = newApplication("app-00002", "default", "root.metrics")
+ app.SetQueue(queue)
+ assertState(t, app, nil, New.String())
+ // New -> Accepted
+ err = app.HandleApplicationEvent(RunApplication)
+ assertState(t, app, err, Accepted.String())
+ // Accepted -> Starting
+ err = app.HandleApplicationEvent(RunApplication)
+ assertState(t, app, err, Starting.String())
+ // Starting -> Completing
+ err = app.HandleApplicationEvent(CompleteApplication)
+ assertState(t, app, err, Completing.String())
+ // Completing -> Running
+ err = app.HandleApplicationEvent(RunApplication)
+ assertState(t, app, err, Running.String())
+ // Running -> Failing
+ err = app.HandleApplicationEvent(FailApplication)
+ assertState(t, app, err, Failing.String())
+ // Failing -> Failed
+ err = app.HandleApplicationEvent(FailApplication)
+ assertState(t, app, err, Failed.String())
+ assertTotalAppsRunningMetrics(t, 0)
+ assertTotalAppsCompletedMetrics(t, 1)
+ assertTotalAppsRejectedMetrics(t, 0)
+ assertQueueRunningApps(t, app, 0)
+ assertQueueApplicationsRunningMetrics(t, app, 0)
+ assertQueueApplicationsAcceptedMetrics(t, app, 2)
+ assertQueueApplicationsRejectedMetrics(t, app, 0)
+ assertQueueApplicationsFailedMetrics(t, app, 1)
+ assertQueueApplicationsCompletedMetrics(t, app, 1)
+
+ // app-00003: New -> Accepted -> Starting -> Failing -> Failed
+ app = newApplication("app-00003", "default", "root.metrics")
+ app.SetQueue(queue)
+ assertState(t, app, nil, New.String())
+ // New -> Accepted
+ err = app.HandleApplicationEvent(RunApplication)
+ assertState(t, app, err, Accepted.String())
+ // Accepted -> Starting
+ err = app.HandleApplicationEvent(RunApplication)
+ assertState(t, app, err, Starting.String())
+ // Starting -> Failing
+ err = app.HandleApplicationEvent(FailApplication)
+ assertState(t, app, err, Failing.String())
+ // Failing -> Failed
+ err = app.HandleApplicationEvent(FailApplication)
+ assertState(t, app, err, Failed.String())
+ assertTotalAppsRunningMetrics(t, 0)
+ assertTotalAppsCompletedMetrics(t, 1)
+ assertTotalAppsRejectedMetrics(t, 0)
+ assertQueueRunningApps(t, app, 0)
+ assertQueueApplicationsRunningMetrics(t, app, 0)
+ assertQueueApplicationsAcceptedMetrics(t, app, 3)
+ assertQueueApplicationsRejectedMetrics(t, app, 0)
+ assertQueueApplicationsFailedMetrics(t, app, 2)
+ assertQueueApplicationsCompletedMetrics(t, app, 1)
+
+ // app-00004: New -> Rejected
+ app = newApplication("app-00004", "default", "root.metrics")
+ app.SetQueue(queue)
+ assertState(t, app, nil, New.String())
+ // New -> Rejected
+ err = app.HandleApplicationEvent(RejectApplication)
+ assertState(t, app, err, Rejected.String())
+ assertTotalAppsRunningMetrics(t, 0)
+ assertTotalAppsCompletedMetrics(t, 1)
+ assertTotalAppsRejectedMetrics(t, 1)
+ assertQueueRunningApps(t, app, 0)
+ assertQueueApplicationsRunningMetrics(t, app, 0)
+ assertQueueApplicationsAcceptedMetrics(t, app, 3)
+ assertQueueApplicationsRejectedMetrics(t, app, 1)
+ assertQueueApplicationsFailedMetrics(t, app, 2)
+ assertQueueApplicationsCompletedMetrics(t, app, 1)
+}
+
+func assertState(t testing.TB, app *Application, err error, expected string) {
+ t.Helper()
+ assert.NilError(t, err, fmt.Sprintf("no error expected when change
state to %v", expected))
+ assert.Equal(t, app.CurrentState(), expected, "application not in
expected state.")
+}
+
+func assertTotalAppsRunningMetrics(t testing.TB, expected int) {
+ t.Helper()
+ totalAppsRunning, err :=
metrics.GetSchedulerMetrics().GetTotalApplicationsRunning()
+ assert.NilError(t, err, "no error expected when getting total running
application count.")
+ assert.Equal(t, totalAppsRunning, expected, "total running application
metrics is not as expected.")
+}
+
+func assertTotalAppsCompletedMetrics(t testing.TB, expected int) {
+ t.Helper()
+ totalAppsCompleted, err :=
metrics.GetSchedulerMetrics().GetTotalApplicationsCompleted()
+ assert.NilError(t, err, "no error expected when getting total completed
application count.")
+ assert.Equal(t, totalAppsCompleted, expected, "total completed
application metrics is not as expected.")
+}
+
+func assertTotalAppsRejectedMetrics(t testing.TB, expected int) {
+ t.Helper()
+ totalAppsRejected, err :=
metrics.GetSchedulerMetrics().GetTotalApplicationsRejected()
+ assert.NilError(t, err, "no error expected when getting total rejected
application count.")
+ assert.Equal(t, totalAppsRejected, expected, "total rejected
application metrics is not as expected.")
+}
+
+func assertQueueRunningApps(t testing.TB, app *Application, expected int) {
+ t.Helper()
+ runningApps := app.queue.runningApps
+ assert.Equal(t, runningApps, uint64(expected), "total running
application in queue is not as expected.")
+}
+
+func assertQueueApplicationsAcceptedMetrics(t testing.TB, app *Application,
expected int) {
+ t.Helper()
+ queueApplicationsAccepted, err :=
metrics.GetQueueMetrics(app.queuePath).GetQueueApplicationsAccepted()
+ assert.NilError(t, err, "no error expected when getting total accepted
application count in queue.")
+ assert.Equal(t, queueApplicationsAccepted, expected, "total accepted
application metrics in queue is not as expected.")
+}
+
+func assertQueueApplicationsRejectedMetrics(t testing.TB, app *Application,
expected int) {
+ t.Helper()
+ queueApplicationsRejected, err :=
metrics.GetQueueMetrics(app.queuePath).GetQueueApplicationsRejected()
+ assert.NilError(t, err, "no error expected when getting total rejected
application count in queue.")
+ assert.Equal(t, queueApplicationsRejected, expected, "total rejected
application metrics in queue is not as expected.")
+}
+
+func assertQueueApplicationsRunningMetrics(t testing.TB, app *Application,
expected int) {
+ t.Helper()
+ queueApplicationsRunning, err :=
metrics.GetQueueMetrics(app.queuePath).GetQueueApplicationsRunning()
+ assert.NilError(t, err, "no error expected when getting total running
application count in queue.")
+ assert.Equal(t, queueApplicationsRunning, expected, "total running
application metrics in queue is not as expected.")
+}
+
+func assertQueueApplicationsFailedMetrics(t testing.TB, app *Application,
expected int) {
+ t.Helper()
+ queueApplicationsFailed, err :=
metrics.GetQueueMetrics(app.queuePath).GetQueueApplicationsFailed()
+ assert.NilError(t, err, "no error expected when getting total failed
application count in queue.")
+ assert.Equal(t, queueApplicationsFailed, expected, "total failed
application metrics in queue is not as expected.")
+}
+
+func assertQueueApplicationsCompletedMetrics(t testing.TB, app *Application,
expected int) {
+ t.Helper()
+ queueApplicationsCompleted, err :=
metrics.GetQueueMetrics(app.queuePath).GetQueueApplicationsCompleted()
+ assert.NilError(t, err, "no error expected when getting total completed
application count in queue.")
+ assert.Equal(t, queueApplicationsCompleted, expected, "total completed
application metrics in queue is not as expected.")
+}
+
+func createQueue(t *testing.T, queueName string) *Queue {
+ root, err := createRootQueue(nil)
+ assert.NilError(t, err, "failed to create queue: %v", err)
+ queue, err := createManagedQueue(root, queueName, false,
map[string]string{"cpu": "10"})
+ assert.NilError(t, err, "failed to create queue: %v", err)
+ return queue
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]