This is an automated email from the ASF dual-hosted git repository.
ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 375895b9 [YUNIKORN-2818] Fix state tracking metrics for app and queue
(#951)
375895b9 is described below
commit 375895b9ff65f17f33e64fe43b1c7435b6cf3664
Author: qzhu <[email protected]>
AuthorDate: Wed Aug 21 13:50:15 2024 -0500
[YUNIKORN-2818] Fix state tracking metrics for app and queue (#951)
Closes: #951
Signed-off-by: Craig Condit <[email protected]>
---
pkg/metrics/queue.go | 91 ++++++++++++++--
pkg/metrics/queue_test.go | 98 +++++++++++++++++
pkg/metrics/scheduler.go | 75 +++++++++----
pkg/metrics/scheduler_test.go | 133 ++++++++++++++++++++++--
pkg/rmproxy/rmproxy.go | 8 --
pkg/scheduler/context.go | 2 -
pkg/scheduler/objects/application.go | 6 ++
pkg/scheduler/objects/application_state.go | 61 ++++++++---
pkg/scheduler/objects/application_state_test.go | 92 ++++++++++++++--
9 files changed, 499 insertions(+), 67 deletions(-)
diff --git a/pkg/metrics/queue.go b/pkg/metrics/queue.go
index 596ca142..1a2c3670 100644
--- a/pkg/metrics/queue.go
+++ b/pkg/metrics/queue.go
@@ -27,11 +27,16 @@ import (
)
const (
- AppAccepted = "accepted"
- AppRunning = "running"
- AppFailed = "failed"
- AppRejected = "rejected"
- AppCompleted = "completed"
+ AppNew = "new"
+ AppAccepted = "accepted"
+ AppRunning = "running"
+ AppFailing = "failing"
+ AppFailed = "failed"
+ AppRejected = "rejected"
+ AppResuming = "resuming"
+ AppCompleting = "completing"
+ AppCompleted = "completed"
+ AppExpired = "expired"
ContainerReleased = "released"
ContainerAllocated = "allocated"
@@ -65,7 +70,7 @@ func InitQueueMetrics(name string) *QueueMetrics {
Namespace: Namespace,
Name: "queue_app",
ConstLabels: prometheus.Labels{"queue": name},
- Help: "Queue application metrics. State of the
application includes `accepted`, `rejected`, `running`, `failed`, `completed`.",
+ Help: "Queue application metrics. State of the
application includes `new`, `accepted`, `rejected`, `running`, `failing`,
`failed`, `resuming`, `completing`, `completed`.",
}, []string{"state"})
q.appMetricsSubsystem = prometheus.NewGaugeVec(
@@ -73,7 +78,7 @@ func InitQueueMetrics(name string) *QueueMetrics {
Namespace: Namespace,
Subsystem: replaceStr,
Name: "queue_app",
- Help: "Queue application metrics. State of the
application includes `accepted`, `rejected`, `running`, `failed`, `completed`.",
+ Help: "Queue application metrics. State of the
application includes `new`, `accepted`, `rejected`, `running`, `failing`,
`failed`, `resuming`, `completing`, `completed`.",
}, []string{"state"})
q.containerMetrics = prometheus.NewCounterVec(
@@ -160,10 +165,31 @@ func (m *QueueMetrics) GetQueueApplicationsRunning()
(int, error) {
return -1, err
}
+func (m *QueueMetrics) IncQueueApplicationsNew() {
+ m.incQueueApplications(AppNew)
+}
+
+func (m *QueueMetrics) DecQueueApplicationsNew() {
+ m.decQueueApplications(AppNew)
+}
+
+func (m *QueueMetrics) GetQueueApplicationsNew() (int, error) {
+ metricDto := &dto.Metric{}
+ err := m.appMetricsLabel.WithLabelValues(AppNew).Write(metricDto)
+ if err == nil {
+ return int(*metricDto.Gauge.Value), nil
+ }
+ return -1, err
+}
+
func (m *QueueMetrics) IncQueueApplicationsAccepted() {
m.incQueueApplications(AppAccepted)
}
+func (m *QueueMetrics) DecQueueApplicationsAccepted() {
+ m.decQueueApplications(AppAccepted)
+}
+
func (m *QueueMetrics) GetQueueApplicationsAccepted() (int, error) {
metricDto := &dto.Metric{}
err := m.appMetricsLabel.WithLabelValues(AppAccepted).Write(metricDto)
@@ -186,6 +212,40 @@ func (m *QueueMetrics) GetQueueApplicationsRejected()
(int, error) {
return -1, err
}
+func (m *QueueMetrics) IncQueueApplicationsResuming() {
+ m.incQueueApplications(AppResuming)
+}
+
+func (m *QueueMetrics) DecQueueApplicationsResuming() {
+ m.decQueueApplications(AppResuming)
+}
+
+func (m *QueueMetrics) GetQueueApplicationsResuming() (int, error) {
+ metricDto := &dto.Metric{}
+ err := m.appMetricsLabel.WithLabelValues(AppResuming).Write(metricDto)
+ if err == nil {
+ return int(*metricDto.Gauge.Value), nil
+ }
+ return -1, err
+}
+
+func (m *QueueMetrics) IncQueueApplicationsFailing() {
+ m.incQueueApplications(AppFailing)
+}
+
+func (m *QueueMetrics) DecQueueApplicationsFailing() {
+ m.decQueueApplications(AppFailing)
+}
+
+func (m *QueueMetrics) GetQueueApplicationsFailing() (int, error) {
+ metricDto := &dto.Metric{}
+ err := m.appMetricsLabel.WithLabelValues(AppFailing).Write(metricDto)
+ if err == nil {
+ return int(*metricDto.Gauge.Value), nil
+ }
+ return -1, err
+}
+
func (m *QueueMetrics) IncQueueApplicationsFailed() {
m.incQueueApplications(AppFailed)
}
@@ -199,6 +259,23 @@ func (m *QueueMetrics) GetQueueApplicationsFailed() (int,
error) {
return -1, err
}
+func (m *QueueMetrics) IncQueueApplicationsCompleting() {
+ m.incQueueApplications(AppCompleting)
+}
+
+func (m *QueueMetrics) DecQueueApplicationsCompleting() {
+ m.decQueueApplications(AppCompleting)
+}
+
+func (m *QueueMetrics) GetQueueApplicationsCompleting() (int, error) {
+ metricDto := &dto.Metric{}
+ err := m.appMetricsLabel.WithLabelValues(AppCompleting).Write(metricDto)
+ if err == nil {
+ return int(*metricDto.Gauge.Value), nil
+ }
+ return -1, err
+}
+
func (m *QueueMetrics) IncQueueApplicationsCompleted() {
m.incQueueApplications(AppCompleted)
}
diff --git a/pkg/metrics/queue_test.go b/pkg/metrics/queue_test.go
index e0a0f94b..ea2376bd 100644
--- a/pkg/metrics/queue_test.go
+++ b/pkg/metrics/queue_test.go
@@ -30,12 +30,38 @@ import (
var qm *QueueMetrics
+func TestApplicationsNew(t *testing.T) {
+ qm = getQueueMetrics()
+ defer unregisterQueueMetrics()
+
+ qm.IncQueueApplicationsNew()
+ verifyAppMetrics(t, "new")
+
+ curr, err := qm.GetQueueApplicationsNew()
+ assert.NilError(t, err)
+ assert.Equal(t, 1, curr)
+
+ qm.DecQueueApplicationsNew()
+ curr, err = qm.GetQueueApplicationsNew()
+ assert.NilError(t, err)
+ assert.Equal(t, 0, curr)
+}
+
func TestApplicationsRunning(t *testing.T) {
qm = getQueueMetrics()
defer unregisterQueueMetrics()
qm.IncQueueApplicationsRunning()
verifyAppMetrics(t, "running")
+
+ curr, err := qm.GetQueueApplicationsRunning()
+ assert.NilError(t, err)
+ assert.Equal(t, 1, curr)
+
+ qm.DecQueueApplicationsRunning()
+ curr, err = qm.GetQueueApplicationsRunning()
+ assert.NilError(t, err)
+ assert.Equal(t, 0, curr)
}
func TestApplicationsAccepted(t *testing.T) {
@@ -44,6 +70,49 @@ func TestApplicationsAccepted(t *testing.T) {
qm.IncQueueApplicationsAccepted()
verifyAppMetrics(t, "accepted")
+
+ curr, err := qm.GetQueueApplicationsAccepted()
+ assert.NilError(t, err)
+ assert.Equal(t, 1, curr)
+
+ qm.DecQueueApplicationsAccepted()
+ curr, err = qm.GetQueueApplicationsAccepted()
+ assert.NilError(t, err)
+ assert.Equal(t, 0, curr)
+}
+
+func TestApplicationsResuming(t *testing.T) {
+ qm = getQueueMetrics()
+ defer unregisterQueueMetrics()
+
+ qm.IncQueueApplicationsResuming()
+ verifyAppMetrics(t, "resuming")
+
+ curr, err := qm.GetQueueApplicationsResuming()
+ assert.NilError(t, err)
+ assert.Equal(t, 1, curr)
+
+ qm.DecQueueApplicationsResuming()
+ curr, err = qm.GetQueueApplicationsResuming()
+ assert.NilError(t, err)
+ assert.Equal(t, 0, curr)
+}
+
+func TestApplicationsFailing(t *testing.T) {
+ qm = getQueueMetrics()
+ defer unregisterQueueMetrics()
+
+ qm.IncQueueApplicationsFailing()
+ verifyAppMetrics(t, "failing")
+
+ curr, err := qm.GetQueueApplicationsFailing()
+ assert.NilError(t, err)
+ assert.Equal(t, 1, curr)
+
+ qm.DecQueueApplicationsFailing()
+ curr, err = qm.GetQueueApplicationsFailing()
+ assert.NilError(t, err)
+ assert.Equal(t, 0, curr)
}
func TestApplicationsRejected(t *testing.T) {
@@ -52,6 +121,10 @@ func TestApplicationsRejected(t *testing.T) {
qm.IncQueueApplicationsRejected()
verifyAppMetrics(t, "rejected")
+
+ curr, err := qm.GetQueueApplicationsRejected()
+ assert.NilError(t, err)
+ assert.Equal(t, 1, curr)
}
func TestApplicationsFailed(t *testing.T) {
@@ -60,6 +133,27 @@ func TestApplicationsFailed(t *testing.T) {
qm.IncQueueApplicationsFailed()
verifyAppMetrics(t, "failed")
+
+ curr, err := qm.GetQueueApplicationsFailed()
+ assert.NilError(t, err)
+ assert.Equal(t, 1, curr)
+}
+
+func TestApplicationsCompleting(t *testing.T) {
+ qm = getQueueMetrics()
+ defer unregisterQueueMetrics()
+
+ qm.IncQueueApplicationsCompleting()
+ verifyAppMetrics(t, "completing")
+
+ curr, err := qm.GetQueueApplicationsCompleting()
+ assert.NilError(t, err)
+ assert.Equal(t, 1, curr)
+
+ qm.DecQueueApplicationsCompleting()
+ curr, err = qm.GetQueueApplicationsCompleting()
+ assert.NilError(t, err)
+ assert.Equal(t, 0, curr)
}
func TestApplicationsCompleted(t *testing.T) {
@@ -68,6 +162,10 @@ func TestApplicationsCompleted(t *testing.T) {
qm.IncQueueApplicationsCompleted()
verifyAppMetrics(t, "completed")
+
+ curr, err := qm.GetQueueApplicationsCompleted()
+ assert.NilError(t, err)
+ assert.Equal(t, 1, curr)
}
func TestAllocatedContainers(t *testing.T) {
diff --git a/pkg/metrics/scheduler.go b/pkg/metrics/scheduler.go
index 23fcc850..a5c7c054 100644
--- a/pkg/metrics/scheduler.go
+++ b/pkg/metrics/scheduler.go
@@ -89,7 +89,7 @@ func InitSchedulerMetrics() *SchedulerMetrics {
Namespace: Namespace,
Subsystem: SchedulerSubsystem,
Name: "application_submission_total",
- Help: "Total number of application submissions.
State of the attempt includes `accepted` and `rejected`.",
+ Help: "Total number of application submissions.
State of the attempt includes `new`, `accepted` and `rejected`.",
}, []string{"result"})
s.application = prometheus.NewGaugeVec(
@@ -97,7 +97,7 @@ func InitSchedulerMetrics() *SchedulerMetrics {
Namespace: Namespace,
Subsystem: SchedulerSubsystem,
Name: "application_total",
- Help: "Total number of applications. State of the
application includes `running`, `completed` and `failed`.",
+ Help: "Total number of applications. State of the
application includes `running`, `resuming`, `failing`, `completing`,
`completed` and `failed`.",
}, []string{"state"})
s.node = prometheus.NewGaugeVec(
@@ -241,25 +241,39 @@ func (m *SchedulerMetrics) GetSchedulingErrors() (int,
error) {
return -1, err
}
+func (m *SchedulerMetrics) IncTotalApplicationsNew() {
+ m.applicationSubmission.WithLabelValues(AppNew).Inc()
+}
+
+func (m *SchedulerMetrics) GetTotalApplicationsNew() (int, error) {
+ metricDto := &dto.Metric{}
+ err := m.applicationSubmission.WithLabelValues(AppNew).Write(metricDto)
+ if err == nil {
+ return int(*metricDto.Counter.Value), nil
+ }
+ return -1, err
+}
+
func (m *SchedulerMetrics) IncTotalApplicationsAccepted() {
m.applicationSubmission.WithLabelValues(AppAccepted).Inc()
}
-func (m *SchedulerMetrics) AddTotalApplicationsAccepted(value int) {
- m.applicationSubmission.WithLabelValues(AppAccepted).Add(float64(value))
+func (m *SchedulerMetrics) GetTotalApplicationsAccepted() (int, error) {
+ metricDto := &dto.Metric{}
+ err :=
m.applicationSubmission.WithLabelValues(AppAccepted).Write(metricDto)
+ if err == nil {
+ return int(*metricDto.Counter.Value), nil
+ }
+ return -1, err
}
func (m *SchedulerMetrics) IncTotalApplicationsRejected() {
- m.applicationSubmission.WithLabelValues(ContainerRejected).Inc()
-}
-
-func (m *SchedulerMetrics) AddTotalApplicationsRejected(value int) {
-
m.applicationSubmission.WithLabelValues(ContainerRejected).Add(float64(value))
+ m.applicationSubmission.WithLabelValues(AppRejected).Inc()
}
func (m *SchedulerMetrics) GetTotalApplicationsRejected() (int, error) {
metricDto := &dto.Metric{}
- err :=
m.applicationSubmission.WithLabelValues(ContainerRejected).Write(metricDto)
+ err :=
m.applicationSubmission.WithLabelValues(AppRejected).Write(metricDto)
if err == nil {
return int(*metricDto.Counter.Value), nil
}
@@ -274,10 +288,6 @@ func (m *SchedulerMetrics) DecTotalApplicationsRunning() {
m.application.WithLabelValues(AppRunning).Dec()
}
-func (m *SchedulerMetrics) SubTotalApplicationsRunning(value int) {
- m.application.WithLabelValues(AppRunning).Sub(float64(value))
-}
-
func (m *SchedulerMetrics) GetTotalApplicationsRunning() (int, error) {
metricDto := &dto.Metric{}
err := m.application.WithLabelValues(AppRunning).Write(metricDto)
@@ -287,16 +297,45 @@ func (m *SchedulerMetrics) GetTotalApplicationsRunning()
(int, error) {
return -1, err
}
+func (m *SchedulerMetrics) IncTotalApplicationsFailing() {
+ m.application.WithLabelValues(AppFailing).Inc()
+}
+
+func (m *SchedulerMetrics) DecTotalApplicationsFailing() {
+ m.application.WithLabelValues(AppFailing).Dec()
+}
+
func (m *SchedulerMetrics) IncTotalApplicationsFailed() {
m.application.WithLabelValues(AppFailed).Inc()
}
-func (m *SchedulerMetrics) IncTotalApplicationsCompleted() {
- m.application.WithLabelValues(AppCompleted).Inc()
+func (m *SchedulerMetrics) IncTotalApplicationsCompleting() {
+ m.application.WithLabelValues(AppCompleting).Inc()
+}
+
+func (m *SchedulerMetrics) DecTotalApplicationsCompleting() {
+ m.application.WithLabelValues(AppCompleting).Dec()
+}
+
+func (m *SchedulerMetrics) IncTotalApplicationsResuming() {
+ m.application.WithLabelValues(AppResuming).Inc()
+}
+
+func (m *SchedulerMetrics) DecTotalApplicationsResuming() {
+ m.application.WithLabelValues(AppResuming).Dec()
}
-func (m *SchedulerMetrics) AddTotalApplicationsCompleted(value int) {
- m.application.WithLabelValues(AppCompleted).Add(float64(value))
+func (m *SchedulerMetrics) GetTotalApplicationsResuming() (int, error) {
+ metricDto := &dto.Metric{}
+ err := m.application.WithLabelValues(AppResuming).Write(metricDto)
+ if err == nil {
+ return int(*metricDto.Gauge.Value), nil
+ }
+ return -1, err
+}
+
+func (m *SchedulerMetrics) IncTotalApplicationsCompleted() {
+ m.application.WithLabelValues(AppCompleted).Inc()
}
func (m *SchedulerMetrics) GetTotalApplicationsCompleted() (int, error) {
diff --git a/pkg/metrics/scheduler_test.go b/pkg/metrics/scheduler_test.go
index b63e4831..42879b67 100644
--- a/pkg/metrics/scheduler_test.go
+++ b/pkg/metrics/scheduler_test.go
@@ -27,6 +27,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
+
"gotest.tools/v3/assert"
)
@@ -37,10 +38,10 @@ func TestDrainingNodes(t *testing.T) {
defer unregisterMetrics()
sm.IncDrainingNodes()
- verifyMetric(t, 1, "draining")
+ verifyMetric(t, 1, "draining", "yunikorn_scheduler_node",
dto.MetricType_GAUGE, "state")
sm.DecDrainingNodes()
- verifyMetric(t, 0, "draining")
+ verifyMetric(t, 0, "draining", "yunikorn_scheduler_node",
dto.MetricType_GAUGE, "state")
}
func TestTotalDecommissionedNodes(t *testing.T) {
@@ -48,7 +49,7 @@ func TestTotalDecommissionedNodes(t *testing.T) {
defer unregisterMetrics()
sm.IncTotalDecommissionedNodes()
- verifyMetric(t, 1, "decommissioned")
+ verifyMetric(t, 1, "decommissioned", "yunikorn_scheduler_node",
dto.MetricType_GAUGE, "state")
}
func TestTryPreemptionLatency(t *testing.T) {
@@ -59,6 +60,114 @@ func TestTryPreemptionLatency(t *testing.T) {
verifyHistogram(t, "trypreemption_latency_milliseconds", 60, 1)
}
+func TestSchedulerApplicationsNew(t *testing.T) {
+ sm = getSchedulerMetrics(t)
+ defer unregisterMetrics()
+
+ sm.IncTotalApplicationsNew()
+ verifyMetric(t, 1, "new",
"yunikorn_scheduler_application_submission_total", dto.MetricType_COUNTER,
"result")
+
+ curr, err := sm.GetTotalApplicationsNew()
+ assert.NilError(t, err)
+ assert.Equal(t, curr, 1)
+}
+
+func TestSchedulerApplicationsAccepted(t *testing.T) {
+ sm = getSchedulerMetrics(t)
+ defer unregisterMetrics()
+
+ sm.IncTotalApplicationsAccepted()
+ verifyMetric(t, 1, "accepted",
"yunikorn_scheduler_application_submission_total", dto.MetricType_COUNTER,
"result")
+
+ curr, err := sm.GetTotalApplicationsAccepted()
+ assert.NilError(t, err)
+ assert.Equal(t, curr, 1)
+}
+
+func TestSchedulerApplicationsRejected(t *testing.T) {
+ sm = getSchedulerMetrics(t)
+ defer unregisterMetrics()
+
+ sm.IncTotalApplicationsRejected()
+ verifyMetric(t, 1, "rejected",
"yunikorn_scheduler_application_submission_total", dto.MetricType_COUNTER,
"result")
+
+ curr, err := sm.GetTotalApplicationsRejected()
+ assert.NilError(t, err)
+ assert.Equal(t, curr, 1)
+}
+
+func TestSchedulerApplicationsRunning(t *testing.T) {
+ sm = getSchedulerMetrics(t)
+ defer unregisterMetrics()
+
+ sm.IncTotalApplicationsRunning()
+ verifyMetric(t, 1, "running", "yunikorn_scheduler_application_total",
dto.MetricType_GAUGE, "state")
+
+ curr, err := sm.GetTotalApplicationsRunning()
+ assert.NilError(t, err)
+ assert.Equal(t, curr, 1)
+
+ sm.DecTotalApplicationsRunning()
+ verifyMetric(t, 0, "running", "yunikorn_scheduler_application_total",
dto.MetricType_GAUGE, "state")
+}
+
+func TestSchedulerApplicationsCompleting(t *testing.T) {
+ sm = getSchedulerMetrics(t)
+ defer unregisterMetrics()
+
+ sm.IncTotalApplicationsCompleting()
+ verifyMetric(t, 1, "completing",
"yunikorn_scheduler_application_total", dto.MetricType_GAUGE, "state")
+
+ sm.DecTotalApplicationsCompleting()
+ verifyMetric(t, 0, "completing",
"yunikorn_scheduler_application_total", dto.MetricType_GAUGE, "state")
+}
+
+func TestSchedulerApplicationsResuming(t *testing.T) {
+ sm = getSchedulerMetrics(t)
+ defer unregisterMetrics()
+
+ sm.IncTotalApplicationsResuming()
+ verifyMetric(t, 1, "resuming", "yunikorn_scheduler_application_total",
dto.MetricType_GAUGE, "state")
+
+ curr, err := sm.GetTotalApplicationsResuming()
+ assert.NilError(t, err)
+ assert.Equal(t, curr, 1)
+
+ sm.DecTotalApplicationsResuming()
+ verifyMetric(t, 0, "resuming", "yunikorn_scheduler_application_total",
dto.MetricType_GAUGE, "state")
+}
+
+func TestSchedulerApplicationsFailing(t *testing.T) {
+ sm = getSchedulerMetrics(t)
+ defer unregisterMetrics()
+
+ sm.IncTotalApplicationsFailing()
+ verifyMetric(t, 1, "failing", "yunikorn_scheduler_application_total",
dto.MetricType_GAUGE, "state")
+
+ sm.DecTotalApplicationsFailing()
+ verifyMetric(t, 0, "failing", "yunikorn_scheduler_application_total",
dto.MetricType_GAUGE, "state")
+}
+
+func TestSchedulerApplicationsCompleted(t *testing.T) {
+ sm = getSchedulerMetrics(t)
+ defer unregisterMetrics()
+
+ sm.IncTotalApplicationsCompleted()
+ verifyMetric(t, 1, "completed", "yunikorn_scheduler_application_total",
dto.MetricType_GAUGE, "state")
+
+ curr, err := sm.GetTotalApplicationsCompleted()
+ assert.NilError(t, err)
+ assert.Equal(t, curr, 1)
+}
+
+func TestSchedulerApplicationsFailed(t *testing.T) {
+ sm = getSchedulerMetrics(t)
+ defer unregisterMetrics()
+
+ sm.IncTotalApplicationsFailed()
+ verifyMetric(t, 1, "failed", "yunikorn_scheduler_application_total",
dto.MetricType_GAUGE, "state")
+}
+
func getSchedulerMetrics(t *testing.T) *SchedulerMetrics {
unregisterMetrics()
return InitSchedulerMetrics()
@@ -78,21 +187,27 @@ func verifyHistogram(t *testing.T, name string, value
float64, delta float64) {
}
}
-func verifyMetric(t *testing.T, expectedCounter float64, expectedState string)
{
+func verifyMetric(t *testing.T, expectedCounter float64, expectedState string,
name string, metricType dto.MetricType, labelName string) {
mfs, err := prometheus.DefaultGatherer.Gather()
assert.NilError(t, err)
var checked bool
for _, metric := range mfs {
- if strings.Contains(metric.GetName(),
"yunikorn_scheduler_node") {
+ if strings.Contains(metric.GetName(), name) {
assert.Equal(t, 1, len(metric.Metric))
- assert.Equal(t, dto.MetricType_GAUGE, metric.GetType())
+ assert.Equal(t, metricType, metric.GetType())
m := metric.Metric[0]
assert.Equal(t, 1, len(m.Label))
- assert.Equal(t, "state", *m.Label[0].Name)
+ assert.Equal(t, labelName, *m.Label[0].Name)
assert.Equal(t, expectedState, *m.Label[0].Value)
- assert.Assert(t, m.Gauge != nil)
- assert.Equal(t, expectedCounter, *m.Gauge.Value)
+ switch metricType {
+ case dto.MetricType_GAUGE:
+ assert.Equal(t, expectedCounter, *m.Gauge.Value)
+ case dto.MetricType_COUNTER:
+ assert.Equal(t, expectedCounter,
*m.Counter.Value)
+ default:
+ assert.Assert(t, false, "unsupported")
+ }
checked = true
break
}
diff --git a/pkg/rmproxy/rmproxy.go b/pkg/rmproxy/rmproxy.go
index 6816c10c..fa29a097 100644
--- a/pkg/rmproxy/rmproxy.go
+++ b/pkg/rmproxy/rmproxy.go
@@ -124,14 +124,6 @@ func (rmp *RMProxy) processApplicationUpdateEvent(event
*rmevent.RMApplicationUp
log.Log(log.RMProxy).DPanic("RM is not registered",
zap.String("rmID", event.RmID))
}
-
- // update app metrics
- if len(event.RejectedApplications) > 0 {
-
metrics.GetSchedulerMetrics().AddTotalApplicationsRejected(len(event.RejectedApplications))
- }
- if len(event.AcceptedApplications) > 0 {
-
metrics.GetSchedulerMetrics().AddTotalApplicationsAccepted(len(event.AcceptedApplications))
- }
}
func (rmp *RMProxy) processRMReleaseAllocationEvent(event
*rmevent.RMReleaseAllocationEvent) {
diff --git a/pkg/scheduler/context.go b/pkg/scheduler/context.go
index ee7257b7..b0df469a 100644
--- a/pkg/scheduler/context.go
+++ b/pkg/scheduler/context.go
@@ -555,8 +555,6 @@ func (cc *ClusterContext)
handleRMUpdateApplicationEvent(event *rmevent.RMUpdate
}
// Update metrics with removed applications
if len(request.Remove) > 0 {
-
metrics.GetSchedulerMetrics().SubTotalApplicationsRunning(len(request.Remove))
-
metrics.GetSchedulerMetrics().AddTotalApplicationsCompleted(len(request.Remove))
for _, app := range request.Remove {
partition := cc.GetPartition(app.PartitionName)
if partition == nil {
diff --git a/pkg/scheduler/objects/application.go
b/pkg/scheduler/objects/application.go
index 4b84c957..9ad81619 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -191,9 +191,15 @@ func NewApplication(siApp *si.AddApplicationRequest, ugi
security.UserGroup, eve
app.rmID = rmID
app.appEvents = schedEvt.NewApplicationEvents(events.GetEventSystem())
app.appEvents.SendNewApplicationEvent(app.ApplicationID)
+ app.setNewMetrics()
return app
}
+func (sa *Application) setNewMetrics() {
+ metrics.GetSchedulerMetrics().IncTotalApplicationsNew()
+ metrics.GetQueueMetrics(sa.GetQueuePath()).IncQueueApplicationsNew()
+}
+
func (sa *Application) String() string {
if sa == nil {
return "application is nil"
diff --git a/pkg/scheduler/objects/application_state.go
b/pkg/scheduler/objects/application_state.go
index dc600850..3360f1f5 100644
--- a/pkg/scheduler/objects/application_state.go
+++ b/pkg/scheduler/objects/application_state.go
@@ -75,6 +75,7 @@ const (
)
var stateEvents = map[string]si.EventRecord_ChangeDetail{
+ New.String(): si.EventRecord_APP_NEW,
Accepted.String(): si.EventRecord_APP_ACCEPTED,
Running.String(): si.EventRecord_APP_RUNNING,
Rejected.String(): si.EventRecord_APP_REJECT,
@@ -137,6 +138,8 @@ func eventDesc() fsm.Events {
// The first argument must always be an Application and if there is a second,
// that must be a string. If this precondition is not met, a runtime panic
// will occur.
+//
+//nolint:funlen
func callbacks() fsm.Callbacks {
return fsm.Callbacks{
"enter_state": func(_ context.Context, event *fsm.Event) {
@@ -167,16 +170,20 @@ func callbacks() fsm.Callbacks {
"leave_state": func(_ context.Context, event *fsm.Event) {
event.Args[0].(*Application).clearStateTimer()
//nolint:errcheck
},
- fmt.Sprintf("enter_%s", Completing.String()): func(_
context.Context, event *fsm.Event) {
+ fmt.Sprintf("leave_%s", New.String()): func(_ context.Context,
event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
- app.setStateTimer(completingTimeout,
app.stateMachine.Current(), CompleteApplication)
+ // only updated queue metrics because scheduler metrics
are increased only for submission count
+
metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsNew()
},
- fmt.Sprintf("leave_%s", New.String()): func(_ context.Context,
event *fsm.Event) {
- if event.Dst != Rejected.String() {
- app := event.Args[0].(*Application)
//nolint:errcheck
-
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsAccepted()
-
metrics.GetSchedulerMetrics().IncTotalApplicationsAccepted()
- }
+ fmt.Sprintf("enter_%s", Accepted.String()): func(_
context.Context, event *fsm.Event) {
+ app := event.Args[0].(*Application) //nolint:errcheck
+
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsAccepted()
+
metrics.GetSchedulerMetrics().IncTotalApplicationsAccepted()
+ },
+ fmt.Sprintf("leave_%s", Accepted.String()): func(_
context.Context, event *fsm.Event) {
+ app := event.Args[0].(*Application) //nolint:errcheck
+ // only updated queue metrics because scheduler metrics
are increased only for submission count
+
metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsAccepted()
},
fmt.Sprintf("enter_%s", Rejected.String()): func(_
context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
@@ -207,6 +214,37 @@ func callbacks() fsm.Callbacks {
metrics.GetSchedulerMetrics().DecTotalApplicationsRunning()
}
},
+ fmt.Sprintf("enter_%s", Resuming.String()): func(_
context.Context, event *fsm.Event) {
+ app := event.Args[0].(*Application) //nolint:errcheck
+
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsResuming()
+
metrics.GetSchedulerMetrics().IncTotalApplicationsResuming()
+ },
+ fmt.Sprintf("leave_%s", Resuming.String()): func(_
context.Context, event *fsm.Event) {
+ app := event.Args[0].(*Application) //nolint:errcheck
+
metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsResuming()
+
metrics.GetSchedulerMetrics().DecTotalApplicationsResuming()
+ },
+ fmt.Sprintf("enter_%s", Failing.String()): func(_
context.Context, event *fsm.Event) {
+ app := event.Args[0].(*Application) //nolint:errcheck
+
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsFailing()
+
metrics.GetSchedulerMetrics().IncTotalApplicationsFailing()
+ },
+ fmt.Sprintf("leave_%s", Failing.String()): func(_
context.Context, event *fsm.Event) {
+ app := event.Args[0].(*Application) //nolint:errcheck
+
metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsFailing()
+
metrics.GetSchedulerMetrics().DecTotalApplicationsFailing()
+ },
+ fmt.Sprintf("enter_%s", Completing.String()): func(_
context.Context, event *fsm.Event) {
+ app := event.Args[0].(*Application) //nolint:errcheck
+ app.setStateTimer(completingTimeout,
app.stateMachine.Current(), CompleteApplication)
+
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsCompleting()
+
metrics.GetSchedulerMetrics().IncTotalApplicationsCompleting()
+ },
+ fmt.Sprintf("leave_%s", Completing.String()): func(_
context.Context, event *fsm.Event) {
+ app := event.Args[0].(*Application) //nolint:errcheck
+
metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsCompleting()
+
metrics.GetSchedulerMetrics().DecTotalApplicationsCompleting()
+ },
fmt.Sprintf("enter_%s", Completed.String()): func(_
context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
metrics.GetSchedulerMetrics().IncTotalApplicationsCompleted()
@@ -216,13 +254,10 @@ func callbacks() fsm.Callbacks {
app.clearPlaceholderTimer()
app.cleanupAsks()
},
- fmt.Sprintf("enter_%s", Failing.String()): func(_
context.Context, event *fsm.Event) {
- app := event.Args[0].(*Application) //nolint:errcheck
-
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsFailed()
-
metrics.GetSchedulerMetrics().IncTotalApplicationsFailed()
- },
fmt.Sprintf("enter_%s", Failed.String()): func(_
context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
+
metrics.GetSchedulerMetrics().IncTotalApplicationsFailed()
+
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsFailed()
app.setStateTimer(terminatedTimeout,
app.stateMachine.Current(), ExpireApplication)
app.executeTerminatedCallback()
app.cleanupAsks()
diff --git a/pkg/scheduler/objects/application_state_test.go
b/pkg/scheduler/objects/application_state_test.go
index ba54aa37..33c218ff 100644
--- a/pkg/scheduler/objects/application_state_test.go
+++ b/pkg/scheduler/objects/application_state_test.go
@@ -280,89 +280,119 @@ func TestAppStateTransitionEvents(t *testing.T) {
// app-00002: New -> Accepted -> Running -> Completing -> Running -> Failing->
Failed
// app-00003: New -> Accepted -> Running -> Failing -> Failed
// app-00004: New -> Rejected
-// Final metrics will be: 0 running, 3 accepted, 1 completed, 2 failed and 1
rejected applications
+// Final queue metrics will be: 0 new, 0 running, 0 accepted, 1 completed, 2
failed and 1 rejected applications
+// Final scheduler metrics will be: 4 new, 0 running, 3 accepted, 1 completed,
2 failed and 1 rejected applications
+// Because the scheduler app submission state (new, accepted, rejected)is
counter based, and it will not be decremented when the state changes
func TestAppStateTransitionMetrics(t *testing.T) { //nolint:funlen
- queue := createQueue(t, "root.metrics")
+ queue := createQueue(t, "metrics")
metrics.GetSchedulerMetrics().Reset()
// app-00001: New -> Resuming -> Accepted --> Running -> Completing->
Completed
app := newApplication("app-00001", "default", "root.metrics")
app.SetQueue(queue)
assertState(t, app, nil, New.String())
+ assertTotalAppsNewMetrics(t, 1)
assertTotalAppsRunningMetrics(t, 0)
assertTotalAppsCompletedMetrics(t, 0)
assertTotalAppsRejectedMetrics(t, 0)
+ assertQueueRunningApps(t, app, 0)
+ assertQueueApplicationsRunningMetrics(t, app, 0)
+ assertQueueApplicationsAcceptedMetrics(t, app, 0)
+ assertQueueApplicationsRejectedMetrics(t, app, 0)
+ assertQueueApplicationsFailedMetrics(t, app, 0)
+ assertQueueApplicationsCompletedMetrics(t, app, 0)
+ assertQueueApplicationsNewMetrics(t, app, 1)
+
// New -> Resuming
err := app.HandleApplicationEvent(ResumeApplication)
assertState(t, app, err, Resuming.String())
+ assertTotalAppsNewMetrics(t, 1)
assertTotalAppsRunningMetrics(t, 0)
assertTotalAppsCompletedMetrics(t, 0)
assertTotalAppsRejectedMetrics(t, 0)
assertQueueRunningApps(t, app, 0)
assertQueueApplicationsRunningMetrics(t, app, 0)
- assertQueueApplicationsAcceptedMetrics(t, app, 1)
+ assertQueueApplicationsAcceptedMetrics(t, app, 0)
assertQueueApplicationsRejectedMetrics(t, app, 0)
assertQueueApplicationsFailedMetrics(t, app, 0)
assertQueueApplicationsCompletedMetrics(t, app, 0)
+ assertQueueApplicationsResumingMetrics(t, app, 1)
+ assertQueueApplicationsNewMetrics(t, app, 0)
// Resuming -> Accepted
err = app.HandleApplicationEvent(RunApplication)
assertState(t, app, err, Accepted.String())
assertTotalAppsRunningMetrics(t, 0)
assertTotalAppsCompletedMetrics(t, 0)
assertTotalAppsRejectedMetrics(t, 0)
+ assertTotalAppsNewMetrics(t, 1)
+ assertTotalAppsAcceptedMetrics(t, 1)
assertQueueRunningApps(t, app, 0)
assertQueueApplicationsRunningMetrics(t, app, 0)
assertQueueApplicationsAcceptedMetrics(t, app, 1)
assertQueueApplicationsRejectedMetrics(t, app, 0)
assertQueueApplicationsFailedMetrics(t, app, 0)
assertQueueApplicationsCompletedMetrics(t, app, 0)
+ assertQueueApplicationsNewMetrics(t, app, 0)
// Accepted -> Running
err = app.HandleApplicationEvent(RunApplication)
assertState(t, app, err, Running.String())
assertTotalAppsRunningMetrics(t, 1)
assertTotalAppsCompletedMetrics(t, 0)
assertTotalAppsRejectedMetrics(t, 0)
+ assertTotalAppsNewMetrics(t, 1)
+ assertTotalAppsAcceptedMetrics(t, 1)
assertQueueRunningApps(t, app, 1)
assertQueueApplicationsRunningMetrics(t, app, 1)
- assertQueueApplicationsAcceptedMetrics(t, app, 1)
+ assertQueueApplicationsAcceptedMetrics(t, app, 0)
assertQueueApplicationsRejectedMetrics(t, app, 0)
assertQueueApplicationsFailedMetrics(t, app, 0)
assertQueueApplicationsCompletedMetrics(t, app, 0)
+ assertQueueApplicationsNewMetrics(t, app, 0)
// Running -> Running
err = app.HandleApplicationEvent(RunApplication)
assertState(t, app, err, Running.String())
assertTotalAppsRunningMetrics(t, 1)
assertTotalAppsCompletedMetrics(t, 0)
assertTotalAppsRejectedMetrics(t, 0)
+ assertTotalAppsAcceptedMetrics(t, 1)
+ assertTotalAppsNewMetrics(t, 1)
+ assertTotalAppsAcceptedMetrics(t, 1)
assertQueueRunningApps(t, app, 1)
assertQueueApplicationsRunningMetrics(t, app, 1)
- assertQueueApplicationsAcceptedMetrics(t, app, 1)
+ assertQueueApplicationsAcceptedMetrics(t, app, 0)
assertQueueApplicationsRejectedMetrics(t, app, 0)
assertQueueApplicationsFailedMetrics(t, app, 0)
assertQueueApplicationsCompletedMetrics(t, app, 0)
+ assertQueueApplicationsNewMetrics(t, app, 0)
// Running -> Completing
err = app.HandleApplicationEvent(CompleteApplication)
assertState(t, app, err, Completing.String())
assertTotalAppsRunningMetrics(t, 0)
assertTotalAppsCompletedMetrics(t, 0)
assertTotalAppsRejectedMetrics(t, 0)
+ assertTotalAppsNewMetrics(t, 1)
+ assertTotalAppsAcceptedMetrics(t, 1)
assertQueueRunningApps(t, app, 0)
assertQueueApplicationsRunningMetrics(t, app, 0)
- assertQueueApplicationsAcceptedMetrics(t, app, 1)
+ assertQueueApplicationsAcceptedMetrics(t, app, 0)
assertQueueApplicationsRejectedMetrics(t, app, 0)
assertQueueApplicationsFailedMetrics(t, app, 0)
assertQueueApplicationsCompletedMetrics(t, app, 0)
+ assertQueueApplicationsCompletingMetrics(t, app, 1)
// Completing -> Completed
err = app.HandleApplicationEvent(CompleteApplication)
assertState(t, app, err, Completed.String())
assertTotalAppsRunningMetrics(t, 0)
assertTotalAppsCompletedMetrics(t, 1)
assertTotalAppsRejectedMetrics(t, 0)
+ assertTotalAppsNewMetrics(t, 1)
+ assertTotalAppsAcceptedMetrics(t, 1)
assertQueueRunningApps(t, app, 0)
assertQueueApplicationsRunningMetrics(t, app, 0)
- assertQueueApplicationsAcceptedMetrics(t, app, 1)
+ assertQueueApplicationsAcceptedMetrics(t, app, 0)
assertQueueApplicationsRejectedMetrics(t, app, 0)
assertQueueApplicationsFailedMetrics(t, app, 0)
assertQueueApplicationsCompletedMetrics(t, app, 1)
+ assertQueueApplicationsCompletingMetrics(t, app, 0)
// app-00002: New -> Accepted -> Completing -> Running -> Failing->
Failed
app = newApplication("app-00002", "default", "root.metrics")
@@ -389,9 +419,11 @@ func TestAppStateTransitionMetrics(t *testing.T) {
//nolint:funlen
assertTotalAppsRunningMetrics(t, 0)
assertTotalAppsCompletedMetrics(t, 1)
assertTotalAppsRejectedMetrics(t, 0)
+ assertTotalAppsNewMetrics(t, 2)
+ assertTotalAppsAcceptedMetrics(t, 2)
assertQueueRunningApps(t, app, 0)
assertQueueApplicationsRunningMetrics(t, app, 0)
- assertQueueApplicationsAcceptedMetrics(t, app, 2)
+ assertQueueApplicationsAcceptedMetrics(t, app, 0)
assertQueueApplicationsRejectedMetrics(t, app, 0)
assertQueueApplicationsFailedMetrics(t, app, 1)
assertQueueApplicationsCompletedMetrics(t, app, 1)
@@ -415,9 +447,11 @@ func TestAppStateTransitionMetrics(t *testing.T) {
//nolint:funlen
assertTotalAppsRunningMetrics(t, 0)
assertTotalAppsCompletedMetrics(t, 1)
assertTotalAppsRejectedMetrics(t, 0)
+ assertTotalAppsNewMetrics(t, 3)
+ assertTotalAppsAcceptedMetrics(t, 3)
assertQueueRunningApps(t, app, 0)
assertQueueApplicationsRunningMetrics(t, app, 0)
- assertQueueApplicationsAcceptedMetrics(t, app, 3)
+ assertQueueApplicationsAcceptedMetrics(t, app, 0)
assertQueueApplicationsRejectedMetrics(t, app, 0)
assertQueueApplicationsFailedMetrics(t, app, 2)
assertQueueApplicationsCompletedMetrics(t, app, 1)
@@ -432,9 +466,12 @@ func TestAppStateTransitionMetrics(t *testing.T) {
//nolint:funlen
assertTotalAppsRunningMetrics(t, 0)
assertTotalAppsCompletedMetrics(t, 1)
assertTotalAppsRejectedMetrics(t, 1)
+ assertTotalAppsNewMetrics(t, 4)
+ assertTotalAppsAcceptedMetrics(t, 3)
assertQueueRunningApps(t, app, 0)
+ assertQueueApplicationsNewMetrics(t, app, 0)
assertQueueApplicationsRunningMetrics(t, app, 0)
- assertQueueApplicationsAcceptedMetrics(t, app, 3)
+ assertQueueApplicationsAcceptedMetrics(t, app, 0)
assertQueueApplicationsRejectedMetrics(t, app, 1)
assertQueueApplicationsFailedMetrics(t, app, 2)
assertQueueApplicationsCompletedMetrics(t, app, 1)
@@ -446,6 +483,13 @@ func assertState(t testing.TB, app *Application, err
error, expected string) {
assert.Equal(t, app.CurrentState(), expected, "application not in
expected state.")
}
+func assertTotalAppsNewMetrics(t testing.TB, expected int) {
+ t.Helper()
+ totalAppsNew, err :=
metrics.GetSchedulerMetrics().GetTotalApplicationsNew()
+ assert.NilError(t, err, "no error expected when getting total new
application count.")
+ assert.Equal(t, totalAppsNew, expected, "total new application metrics
is not as expected.")
+}
+
func assertTotalAppsRunningMetrics(t testing.TB, expected int) {
t.Helper()
totalAppsRunning, err :=
metrics.GetSchedulerMetrics().GetTotalApplicationsRunning()
@@ -453,6 +497,13 @@ func assertTotalAppsRunningMetrics(t testing.TB, expected
int) {
assert.Equal(t, totalAppsRunning, expected, "total running application
metrics is not as expected.")
}
+func assertTotalAppsAcceptedMetrics(t testing.TB, expected int) {
+ t.Helper()
+ totalAppsAccepted, err :=
metrics.GetSchedulerMetrics().GetTotalApplicationsAccepted()
+ assert.NilError(t, err, "no error expected when getting total accepted
application count.")
+ assert.Equal(t, totalAppsAccepted, expected, "total accepted
application metrics is not as expected.")
+}
+
func assertTotalAppsCompletedMetrics(t testing.TB, expected int) {
t.Helper()
totalAppsCompleted, err :=
metrics.GetSchedulerMetrics().GetTotalApplicationsCompleted()
@@ -501,6 +552,13 @@ func assertQueueApplicationsFailedMetrics(t testing.TB,
app *Application, expect
assert.Equal(t, queueApplicationsFailed, expected, "total failed
application metrics in queue is not as expected.")
}
+func assertQueueApplicationsResumingMetrics(t testing.TB, app *Application,
expected int) {
+ t.Helper()
+ queueApplicationsResuming, err :=
metrics.GetQueueMetrics(app.queuePath).GetQueueApplicationsResuming()
+ assert.NilError(t, err, "no error expected when getting total resuming
application count in queue.")
+ assert.Equal(t, queueApplicationsResuming, expected, "total resuming
application metrics in queue is not as expected.")
+}
+
func assertQueueApplicationsCompletedMetrics(t testing.TB, app *Application,
expected int) {
t.Helper()
queueApplicationsCompleted, err :=
metrics.GetQueueMetrics(app.queuePath).GetQueueApplicationsCompleted()
@@ -508,6 +566,20 @@ func assertQueueApplicationsCompletedMetrics(t testing.TB,
app *Application, exp
assert.Equal(t, queueApplicationsCompleted, expected, "total completed
application metrics in queue is not as expected.")
}
+func assertQueueApplicationsCompletingMetrics(t testing.TB, app *Application,
expected int) {
+ t.Helper()
+ queueApplicationsCompleting, err :=
metrics.GetQueueMetrics(app.queuePath).GetQueueApplicationsCompleting()
+ assert.NilError(t, err, "no error expected when getting total
completing application count in queue.")
+ assert.Equal(t, queueApplicationsCompleting, expected, "total
completing application metrics in queue is not as expected.")
+}
+
+func assertQueueApplicationsNewMetrics(t testing.TB, app *Application,
expected int) {
+ t.Helper()
+ queueApplicationsNew, err :=
metrics.GetQueueMetrics(app.queuePath).GetQueueApplicationsNew()
+ assert.NilError(t, err, "no error expected when getting total new
applications in queue.")
+ assert.Equal(t, queueApplicationsNew, expected, "total new applications
in queue is not as expected.")
+}
+
func createQueue(t *testing.T, queueName string) *Queue {
root, err := createRootQueue(nil)
assert.NilError(t, err, "failed to create queue: %v", err)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]