This is an automated email from the ASF dual-hosted git repository.
wilfreds pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new ab7ab268 [YUNIKORN-2161] Metrics code cleanup (#719)
ab7ab268 is described below
commit ab7ab268aaa4db9c419e4a18e55de54edb82c050
Author: brandboat <[email protected]>
AuthorDate: Tue Nov 21 13:36:03 2023 +1100
[YUNIKORN-2161] Metrics code cleanup (#719)
Remove CoreQueueMetrics interface
Remove CoreSchedulerMetrics interface
Remove unused queue.go#addQueueResource
Rename cqm to qm in queue_test.go
Rename csm to sm in scheduler_test.go
Fix incorrect unregister metrics method
Remove unused testing parameter in unregisterQueueMetrics
Remove unused testing parameter in unregisterMetrics
Remove CoreEventMetrics interface
Rename eventMetrics to EventMetrics
Remove GoRuntimeMetrics interface
Closes: #719
Signed-off-by: Wilfred Spiegelenburg <[email protected]>
---
pkg/metrics/event.go | 24 ++++---
pkg/metrics/init.go | 149 +++-------------------------------------
pkg/metrics/queue.go | 19 +----
pkg/metrics/queue_test.go | 89 +++++++++++-------------
pkg/metrics/runtime.go | 2 +
pkg/metrics/scheduler.go | 65 +-----------------
pkg/metrics/scheduler_test.go | 40 +++++------
pkg/scheduler/health_checker.go | 6 +-
8 files changed, 90 insertions(+), 304 deletions(-)
diff --git a/pkg/metrics/event.go b/pkg/metrics/event.go
index 04124f01..7f0032cd 100644
--- a/pkg/metrics/event.go
+++ b/pkg/metrics/event.go
@@ -20,7 +20,7 @@ package metrics
import "github.com/prometheus/client_golang/prometheus"
-type eventMetrics struct {
+type EventMetrics struct {
totalEventsCreated prometheus.Gauge
totalEventsChanneled prometheus.Gauge
totalEventsNotChanneled prometheus.Gauge
@@ -30,8 +30,8 @@ type eventMetrics struct {
totalEventsCollected prometheus.Gauge
}
-func initEventMetrics() CoreEventMetrics {
- metrics := &eventMetrics{}
+func initEventMetrics() *EventMetrics {
+ metrics := &EventMetrics{}
metrics.totalEventsCreated = prometheus.NewGauge(
prometheus.GaugeOpts{
@@ -86,7 +86,9 @@ func initEventMetrics() CoreEventMetrics {
return metrics
}
-func (em *eventMetrics) Reset() {
+// Reset all metrics that implement the Set functionality.
+// Should only be used in tests
+func (em *EventMetrics) Reset() {
em.totalEventsCollected.Set(0)
em.totalEventsCreated.Set(0)
em.totalEventsChanneled.Set(0)
@@ -96,30 +98,30 @@ func (em *eventMetrics) Reset() {
em.totalEventsProcessed.Set(0)
}
-func (em *eventMetrics) IncEventsCreated() {
+func (em *EventMetrics) IncEventsCreated() {
em.totalEventsCreated.Inc()
}
-func (em *eventMetrics) IncEventsChanneled() {
+func (em *EventMetrics) IncEventsChanneled() {
em.totalEventsChanneled.Inc()
}
-func (em *eventMetrics) IncEventsNotChanneled() {
+func (em *EventMetrics) IncEventsNotChanneled() {
em.totalEventsNotChanneled.Inc()
}
-func (em *eventMetrics) IncEventsProcessed() {
+func (em *EventMetrics) IncEventsProcessed() {
em.totalEventsProcessed.Inc()
}
-func (em *eventMetrics) IncEventsStored() {
+func (em *EventMetrics) IncEventsStored() {
em.totalEventsStored.Inc()
}
-func (em *eventMetrics) IncEventsNotStored() {
+func (em *EventMetrics) IncEventsNotStored() {
em.totalEventsNotStored.Inc()
}
-func (em *eventMetrics) AddEventsCollected(collectedEvents int) {
+func (em *EventMetrics) AddEventsCollected(collectedEvents int) {
em.totalEventsCollected.Add(float64(collectedEvents))
}
diff --git a/pkg/metrics/init.go b/pkg/metrics/init.go
index 7a4f5b4e..39b2120e 100644
--- a/pkg/metrics/init.go
+++ b/pkg/metrics/init.go
@@ -20,7 +20,6 @@ package metrics
import (
"sync"
- "time"
)
const (
@@ -38,148 +37,18 @@ var once sync.Once
var m *Metrics
type Metrics struct {
- scheduler CoreSchedulerMetrics
- queues map[string]CoreQueueMetrics
- event CoreEventMetrics
- runtime GoRuntimeMetrics
+ scheduler *SchedulerMetrics
+ queues map[string]*QueueMetrics
+ event *EventMetrics
+ runtime *RuntimeMetrics
lock sync.RWMutex
}
-type CoreQueueMetrics interface {
- IncQueueApplicationsAccepted()
- GetQueueApplicationsAccepted() (int, error)
- IncQueueApplicationsRejected()
- GetQueueApplicationsRejected() (int, error)
- IncQueueApplicationsRunning()
- DecQueueApplicationsRunning()
- GetQueueApplicationsRunning() (int, error)
- IncQueueApplicationsFailed()
- GetQueueApplicationsFailed() (int, error)
- IncQueueApplicationsCompleted()
- GetQueueApplicationsCompleted() (int, error)
- IncAllocatedContainer()
- IncReleasedContainer()
- AddReleasedContainers(value int)
- SetQueueGuaranteedResourceMetrics(resourceName string, value float64)
- SetQueueMaxResourceMetrics(resourceName string, value float64)
- SetQueueAllocatedResourceMetrics(resourceName string, value float64)
- AddQueueAllocatedResourceMetrics(resourceName string, value float64)
- SetQueuePendingResourceMetrics(resourceName string, value float64)
- AddQueuePendingResourceMetrics(resourceName string, value float64)
- SetQueuePreemptingResourceMetrics(resourceName string, value float64)
- AddQueuePreemptingResourceMetrics(resourceName string, value float64)
- // Reset all metrics that implement the Reset functionality.
- // should only be used in tests
- Reset()
-}
-
-type GoRuntimeMetrics interface {
- Collect()
- // Reset all metrics that implement the Reset functionality.
- // should only be used in tests
- Reset()
-}
-
-// Declare all core metrics ops in this interface
-type CoreSchedulerMetrics interface {
- // Metrics Ops related to ScheduledAllocationSuccesses
- IncAllocatedContainer()
- AddAllocatedContainers(value int)
- getAllocatedContainers() (int, error)
-
- // Metrics Ops related to ScheduledAllocationFailures
- IncRejectedContainer()
- AddRejectedContainers(value int)
-
- // Metrics Ops related to ScheduledAllocationErrors
- IncSchedulingError()
- AddSchedulingErrors(value int)
- GetSchedulingErrors() (int, error)
-
- // Metrics Ops related to released allocations
- IncReleasedContainer()
- AddReleasedContainers(value int)
- getReleasedContainers() (int, error)
- // Metrics Ops related to totalApplicationsAccepted
- IncTotalApplicationsAccepted()
- AddTotalApplicationsAccepted(value int)
-
- // Metrics Ops related to TotalApplicationsRejected
- IncTotalApplicationsRejected()
- AddTotalApplicationsRejected(value int)
- GetTotalApplicationsRejected() (int, error)
-
- // Metrics Ops related to TotalApplicationsRunning
- IncTotalApplicationsRunning()
- AddTotalApplicationsRunning(value int)
- DecTotalApplicationsRunning()
- SubTotalApplicationsRunning(value int)
- SetTotalApplicationsRunning(value int)
- GetTotalApplicationsRunning() (int, error)
-
- // Metrics Ops related to TotalApplicationsFailed
- IncTotalApplicationsFailed()
-
- // Metrics Ops related to TotalApplicationsCompleted
- IncTotalApplicationsCompleted()
- AddTotalApplicationsCompleted(value int)
- DecTotalApplicationsCompleted()
- SubTotalApplicationsCompleted(value int)
- SetTotalApplicationsCompleted(value int)
- GetTotalApplicationsCompleted() (int, error)
-
- // Metrics Ops related to ActiveNodes
- IncActiveNodes()
- AddActiveNodes(value int)
- DecActiveNodes()
- SubActiveNodes(value int)
- SetActiveNodes(value int)
- IncDrainingNodes()
- DecDrainingNodes()
- GetDrainingNodes() (int, error)
- IncUnhealthyNodes()
- DecUnhealthyNodes()
- IncTotalDecommissionedNodes()
-
- // Metrics Ops related to failedNodes
- IncFailedNodes()
- AddFailedNodes(value int)
- DecFailedNodes()
- SubFailedNodes(value int)
- SetFailedNodes(value int)
- SetNodeResourceUsage(resourceName string, rangeIdx int, value float64)
- GetFailedNodes() (int, error)
-
- // Metrics Ops related to latency change
- ObserveSchedulingLatency(start time.Time)
- ObserveNodeSortingLatency(start time.Time)
- ObserveAppSortingLatency(start time.Time)
- ObserveQueueSortingLatency(start time.Time)
- ObserveTryNodeLatency(start time.Time)
- ObserveTryPreemptionLatency(start time.Time)
- // Reset all metrics that implement the Reset functionality.
- // should only be used in tests
- Reset()
-}
-
-type CoreEventMetrics interface {
- IncEventsCreated()
- IncEventsChanneled()
- IncEventsNotChanneled()
- IncEventsProcessed()
- IncEventsStored()
- IncEventsNotStored()
- AddEventsCollected(collectedEvents int)
- // Reset all metrics that implement the Set functionality.
- // Should only be used in tests
- Reset()
-}
-
func init() {
once.Do(func() {
m = &Metrics{
scheduler: InitSchedulerMetrics(),
- queues: make(map[string]CoreQueueMetrics),
+ queues: make(map[string]*QueueMetrics),
event: initEventMetrics(),
lock: sync.RWMutex{},
runtime: initRuntimeMetrics(),
@@ -198,11 +67,11 @@ func Reset() {
m.runtime.Reset()
}
-func GetSchedulerMetrics() CoreSchedulerMetrics {
+func GetSchedulerMetrics() *SchedulerMetrics {
return m.scheduler
}
-func GetQueueMetrics(name string) CoreQueueMetrics {
+func GetQueueMetrics(name string) *QueueMetrics {
m.lock.Lock()
defer m.lock.Unlock()
if qm, ok := m.queues[name]; ok {
@@ -213,11 +82,11 @@ func GetQueueMetrics(name string) CoreQueueMetrics {
return queueMetrics
}
-func GetEventMetrics() CoreEventMetrics {
+func GetEventMetrics() *EventMetrics {
return m.event
}
-func GetRuntimeMetrics() GoRuntimeMetrics {
+func GetRuntimeMetrics() *RuntimeMetrics {
return m.runtime
}
diff --git a/pkg/metrics/queue.go b/pkg/metrics/queue.go
index 08ff05ce..d8127b2f 100644
--- a/pkg/metrics/queue.go
+++ b/pkg/metrics/queue.go
@@ -38,7 +38,7 @@ type QueueMetrics struct {
}
// InitQueueMetrics to initialize queue metrics
-func InitQueueMetrics(name string) CoreQueueMetrics {
+func InitQueueMetrics(name string) *QueueMetrics {
q := &QueueMetrics{}
replaceStr := formatMetricName(name)
@@ -114,11 +114,6 @@ func (m *QueueMetrics) decQueueApplications(state string) {
m.appMetricsSubsystem.With(prometheus.Labels{"state": state}).Dec()
}
-func (m *QueueMetrics) addQueueResource(state string, resourceName string,
value float64) {
- m.resourceMetricsLabel.With(prometheus.Labels{"state": state,
"resource": resourceName}).Add(value)
- m.resourceMetricsSubsystem.With(prometheus.Labels{"state": state,
"resource": resourceName}).Add(value)
-}
-
func (m *QueueMetrics) setQueueResource(state string, resourceName string,
value float64) {
m.resourceMetricsLabel.With(prometheus.Labels{"state": state,
"resource": resourceName}).Set(value)
m.resourceMetricsSubsystem.With(prometheus.Labels{"state": state,
"resource": resourceName}).Set(value)
@@ -224,22 +219,10 @@ func (m *QueueMetrics)
SetQueueAllocatedResourceMetrics(resourceName string, val
m.setQueueResource("allocated", resourceName, value)
}
-func (m *QueueMetrics) AddQueueAllocatedResourceMetrics(resourceName string,
value float64) {
- m.addQueueResource("allocated", resourceName, value)
-}
-
func (m *QueueMetrics) SetQueuePendingResourceMetrics(resourceName string,
value float64) {
m.setQueueResource("pending", resourceName, value)
}
-func (m *QueueMetrics) AddQueuePendingResourceMetrics(resourceName string,
value float64) {
- m.addQueueResource("pending", resourceName, value)
-}
-
func (m *QueueMetrics) SetQueuePreemptingResourceMetrics(resourceName string,
value float64) {
m.setQueueResource("preempting", resourceName, value)
}
-
-func (m *QueueMetrics) AddQueuePreemptingResourceMetrics(resourceName string,
value float64) {
- m.addQueueResource("preempting", resourceName, value)
-}
diff --git a/pkg/metrics/queue_test.go b/pkg/metrics/queue_test.go
index 419efc82..e0a0f94b 100644
--- a/pkg/metrics/queue_test.go
+++ b/pkg/metrics/queue_test.go
@@ -28,113 +28,113 @@ import (
dto "github.com/prometheus/client_model/go"
)
-var cqm CoreQueueMetrics
+var qm *QueueMetrics
func TestApplicationsRunning(t *testing.T) {
- cqm = getQueueMetrics()
- defer unregisterQueueMetrics(t)
+ qm = getQueueMetrics()
+ defer unregisterQueueMetrics()
- cqm.IncQueueApplicationsRunning()
+ qm.IncQueueApplicationsRunning()
verifyAppMetrics(t, "running")
}
func TestApplicationsAccepted(t *testing.T) {
- cqm = getQueueMetrics()
- defer unregisterQueueMetrics(t)
+ qm = getQueueMetrics()
+ defer unregisterQueueMetrics()
- cqm.IncQueueApplicationsAccepted()
+ qm.IncQueueApplicationsAccepted()
verifyAppMetrics(t, "accepted")
}
func TestApplicationsRejected(t *testing.T) {
- cqm = getQueueMetrics()
- defer unregisterQueueMetrics(t)
+ qm = getQueueMetrics()
+ defer unregisterQueueMetrics()
- cqm.IncQueueApplicationsRejected()
+ qm.IncQueueApplicationsRejected()
verifyAppMetrics(t, "rejected")
}
func TestApplicationsFailed(t *testing.T) {
- cqm = getQueueMetrics()
- defer unregisterQueueMetrics(t)
+ qm = getQueueMetrics()
+ defer unregisterQueueMetrics()
- cqm.IncQueueApplicationsFailed()
+ qm.IncQueueApplicationsFailed()
verifyAppMetrics(t, "failed")
}
func TestApplicationsCompleted(t *testing.T) {
- cqm = getQueueMetrics()
- defer unregisterQueueMetrics(t)
+ qm = getQueueMetrics()
+ defer unregisterQueueMetrics()
- cqm.IncQueueApplicationsCompleted()
+ qm.IncQueueApplicationsCompleted()
verifyAppMetrics(t, "completed")
}
func TestAllocatedContainers(t *testing.T) {
- cqm = getQueueMetrics()
- defer unregisterQueueMetrics(t)
+ qm = getQueueMetrics()
+ defer unregisterQueueMetrics()
- cqm.IncAllocatedContainer()
+ qm.IncAllocatedContainer()
verifyContainerMetrics(t, "allocated", float64(1))
}
func TestReleasedContainers(t *testing.T) {
- cqm = getQueueMetrics()
- defer unregisterQueueMetrics(t)
+ qm = getQueueMetrics()
+ defer unregisterQueueMetrics()
- cqm.IncReleasedContainer()
+ qm.IncReleasedContainer()
verifyContainerMetrics(t, "released", float64(1))
}
func TestAddReleasedContainers(t *testing.T) {
- cqm = getQueueMetrics()
- defer unregisterQueueMetrics(t)
+ qm = getQueueMetrics()
+ defer unregisterQueueMetrics()
- cqm.AddReleasedContainers(2)
+ qm.AddReleasedContainers(2)
verifyContainerMetrics(t, "released", float64(2))
}
func TestQueueGuaranteedResourceMetrics(t *testing.T) {
- cqm = getQueueMetrics()
- defer unregisterQueueMetrics(t)
+ qm = getQueueMetrics()
+ defer unregisterQueueMetrics()
- cqm.SetQueueGuaranteedResourceMetrics("cpu", 1)
+ qm.SetQueueGuaranteedResourceMetrics("cpu", 1)
verifyResourceMetrics(t, "guaranteed", "cpu")
}
func TestQueueMaxResourceMetrics(t *testing.T) {
- cqm = getQueueMetrics()
- defer unregisterQueueMetrics(t)
+ qm = getQueueMetrics()
+ defer unregisterQueueMetrics()
- cqm.SetQueueMaxResourceMetrics("cpu", 1)
+ qm.SetQueueMaxResourceMetrics("cpu", 1)
verifyResourceMetrics(t, "max", "cpu")
}
func TestQueueAllocatedResourceMetrics(t *testing.T) {
- cqm = getQueueMetrics()
- defer unregisterQueueMetrics(t)
+ qm = getQueueMetrics()
+ defer unregisterQueueMetrics()
- cqm.SetQueueAllocatedResourceMetrics("cpu", 1)
+ qm.SetQueueAllocatedResourceMetrics("cpu", 1)
verifyResourceMetrics(t, "allocated", "cpu")
}
func TestQueuePendingResourceMetrics(t *testing.T) {
- cqm = getQueueMetrics()
- defer unregisterQueueMetrics(t)
+ qm = getQueueMetrics()
+ defer unregisterQueueMetrics()
- cqm.SetQueuePendingResourceMetrics("cpu", 1)
+ qm.SetQueuePendingResourceMetrics("cpu", 1)
verifyResourceMetrics(t, "pending", "cpu")
}
func TestQueuePreemptingResourceMetrics(t *testing.T) {
- cqm = getQueueMetrics()
- defer unregisterMetrics(t)
+ qm = getQueueMetrics()
+ defer unregisterQueueMetrics()
- cqm.SetQueuePreemptingResourceMetrics("cpu", 1)
+ qm.SetQueuePreemptingResourceMetrics("cpu", 1)
verifyResourceMetrics(t, "preempting", "cpu")
}
-func getQueueMetrics() CoreQueueMetrics {
+func getQueueMetrics() *QueueMetrics {
return InitQueueMetrics("root.test")
}
@@ -260,12 +260,7 @@ func verifyMetricsSubsytem(t *testing.T, checkLabel
func(label []*dto.LabelPair)
assert.Assert(t, checked, "Failed to find metric")
}
-func unregisterQueueMetrics(t *testing.T) {
- qm, ok := cqm.(*QueueMetrics)
- if !ok {
- t.Fatalf("Type assertion failed, metrics is not QueueMetrics")
- }
-
+func unregisterQueueMetrics() {
prometheus.Unregister(qm.appMetricsLabel)
prometheus.Unregister(qm.appMetricsSubsystem)
prometheus.Unregister(qm.containerMetrics)
diff --git a/pkg/metrics/runtime.go b/pkg/metrics/runtime.go
index 19448142..725cac73 100644
--- a/pkg/metrics/runtime.go
+++ b/pkg/metrics/runtime.go
@@ -47,6 +47,8 @@ type RuntimeMetrics struct {
*GenericMetrics
}
+// Reset all metrics that implement the Reset functionality.
+// should only be used in tests
func (a *RuntimeMetrics) Reset() {
a.MStatsMetrics.Reset()
a.GenericMetrics.Reset()
diff --git a/pkg/metrics/scheduler.go b/pkg/metrics/scheduler.go
index 0134c3c7..1a0e5e4b 100644
--- a/pkg/metrics/scheduler.go
+++ b/pkg/metrics/scheduler.go
@@ -154,6 +154,8 @@ func InitSchedulerMetrics() *SchedulerMetrics {
return s
}
+// Reset all metrics that implement the Reset functionality.
+// should only be used in tests
func (m *SchedulerMetrics) Reset() {
m.node.Reset()
m.application.Reset()
@@ -169,10 +171,6 @@ func (m *SchedulerMetrics) ObserveSchedulingLatency(start
time.Time) {
m.schedulingLatency.Observe(SinceInSeconds(start))
}
-func (m *SchedulerMetrics) ObserveNodeSortingLatency(start time.Time) {
- m.sortingLatency.With(prometheus.Labels{"level":
"node"}).Observe(SinceInSeconds(start))
-}
-
func (m *SchedulerMetrics) ObserveAppSortingLatency(start time.Time) {
m.sortingLatency.With(prometheus.Labels{"level":
"app"}).Observe(SinceInSeconds(start))
}
@@ -189,10 +187,6 @@ func (m *SchedulerMetrics)
ObserveTryPreemptionLatency(start time.Time) {
m.tryPreemptionLatency.Observe(SinceInSeconds(start))
}
-func (m *SchedulerMetrics) IncAllocatedContainer() {
- m.containerAllocation.With(prometheus.Labels{"state":
"allocated"}).Inc()
-}
-
func (m *SchedulerMetrics) AddAllocatedContainers(value int) {
m.containerAllocation.With(prometheus.Labels{"state":
"allocated"}).Add(float64(value))
}
@@ -206,10 +200,6 @@ func (m *SchedulerMetrics) getAllocatedContainers() (int,
error) {
return -1, err
}
-func (m *SchedulerMetrics) IncReleasedContainer() {
- m.containerAllocation.With(prometheus.Labels{"state": "released"}).Inc()
-}
-
func (m *SchedulerMetrics) AddReleasedContainers(value int) {
m.containerAllocation.With(prometheus.Labels{"state":
"released"}).Add(float64(value))
}
@@ -223,10 +213,6 @@ func (m *SchedulerMetrics) getReleasedContainers() (int,
error) {
return -1, err
}
-func (m *SchedulerMetrics) IncRejectedContainer() {
- m.containerAllocation.With(prometheus.Labels{"state": "rejected"}).Inc()
-}
-
func (m *SchedulerMetrics) AddRejectedContainers(value int) {
m.containerAllocation.With(prometheus.Labels{"state":
"rejected"}).Add(float64(value))
}
@@ -235,10 +221,6 @@ func (m *SchedulerMetrics) IncSchedulingError() {
m.containerAllocation.With(prometheus.Labels{"state": "error"}).Inc()
}
-func (m *SchedulerMetrics) AddSchedulingErrors(value int) {
- m.containerAllocation.With(prometheus.Labels{"state":
"error"}).Add(float64(value))
-}
-
func (m *SchedulerMetrics) GetSchedulingErrors() (int, error) {
metricDto := &dto.Metric{}
err := m.containerAllocation.With(prometheus.Labels{"state":
"error"}).Write(metricDto)
@@ -277,10 +259,6 @@ func (m *SchedulerMetrics) IncTotalApplicationsRunning() {
m.application.With(prometheus.Labels{"state": "running"}).Inc()
}
-func (m *SchedulerMetrics) AddTotalApplicationsRunning(value int) {
- m.application.With(prometheus.Labels{"state":
"running"}).Add(float64(value))
-}
-
func (m *SchedulerMetrics) DecTotalApplicationsRunning() {
m.application.With(prometheus.Labels{"state": "running"}).Dec()
}
@@ -289,10 +267,6 @@ func (m *SchedulerMetrics)
SubTotalApplicationsRunning(value int) {
m.application.With(prometheus.Labels{"state":
"running"}).Sub(float64(value))
}
-func (m *SchedulerMetrics) SetTotalApplicationsRunning(value int) {
- m.application.With(prometheus.Labels{"state":
"running"}).Set(float64(value))
-}
-
func (m *SchedulerMetrics) GetTotalApplicationsRunning() (int, error) {
metricDto := &dto.Metric{}
err := m.application.With(prometheus.Labels{"state":
"running"}).Write(metricDto)
@@ -314,18 +288,6 @@ func (m *SchedulerMetrics)
AddTotalApplicationsCompleted(value int) {
m.application.With(prometheus.Labels{"state":
"completed"}).Add(float64(value))
}
-func (m *SchedulerMetrics) DecTotalApplicationsCompleted() {
- m.application.With(prometheus.Labels{"state": "completed"}).Dec()
-}
-
-func (m *SchedulerMetrics) SubTotalApplicationsCompleted(value int) {
- m.application.With(prometheus.Labels{"state":
"completed"}).Sub(float64(value))
-}
-
-func (m *SchedulerMetrics) SetTotalApplicationsCompleted(value int) {
- m.application.With(prometheus.Labels{"state":
"completed"}).Set(float64(value))
-}
-
func (m *SchedulerMetrics) GetTotalApplicationsCompleted() (int, error) {
metricDto := &dto.Metric{}
err := m.application.With(prometheus.Labels{"state":
"completed"}).Write(metricDto)
@@ -339,41 +301,18 @@ func (m *SchedulerMetrics) IncActiveNodes() {
m.node.With(prometheus.Labels{"state": "active"}).Inc()
}
-func (m *SchedulerMetrics) AddActiveNodes(value int) {
- m.node.With(prometheus.Labels{"state": "active"}).Add(float64(value))
-}
-
func (m *SchedulerMetrics) DecActiveNodes() {
m.node.With(prometheus.Labels{"state": "active"}).Dec()
}
-func (m *SchedulerMetrics) SubActiveNodes(value int) {
- m.node.With(prometheus.Labels{"state": "active"}).Sub(float64(value))
-}
-
-func (m *SchedulerMetrics) SetActiveNodes(value int) {
- m.node.With(prometheus.Labels{"state": "active"}).Set(float64(value))
-}
-
func (m *SchedulerMetrics) IncFailedNodes() {
m.node.With(prometheus.Labels{"state": "failed"}).Inc()
}
-func (m *SchedulerMetrics) AddFailedNodes(value int) {
- m.node.With(prometheus.Labels{"state": "failed"}).Add(float64(value))
-}
-
func (m *SchedulerMetrics) DecFailedNodes() {
m.node.With(prometheus.Labels{"state": "failed"}).Dec()
}
-func (m *SchedulerMetrics) SubFailedNodes(value int) {
- m.node.With(prometheus.Labels{"state": "failed"}).Sub(float64(value))
-}
-
-func (m *SchedulerMetrics) SetFailedNodes(value int) {
- m.node.With(prometheus.Labels{"state": "failed"}).Set(float64(value))
-}
func (m *SchedulerMetrics) GetFailedNodes() (int, error) {
metricDto := &dto.Metric{}
err := m.node.With(prometheus.Labels{"state":
"failed"}).Write(metricDto)
diff --git a/pkg/metrics/scheduler_test.go b/pkg/metrics/scheduler_test.go
index b8cdbcd8..5384b31b 100644
--- a/pkg/metrics/scheduler_test.go
+++ b/pkg/metrics/scheduler_test.go
@@ -30,48 +30,48 @@ import (
"gotest.tools/v3/assert"
)
-var csm CoreSchedulerMetrics
+var sm *SchedulerMetrics
func TestDrainingNodes(t *testing.T) {
- csm = getSchedulerMetrics(t)
- defer unregisterMetrics(t)
+ sm = getSchedulerMetrics(t)
+ defer unregisterMetrics()
- csm.IncDrainingNodes()
+ sm.IncDrainingNodes()
verifyMetric(t, 1, "draining")
- csm.DecDrainingNodes()
+ sm.DecDrainingNodes()
verifyMetric(t, 0, "draining")
}
func TestTotalDecommissionedNodes(t *testing.T) {
- csm = getSchedulerMetrics(t)
- defer unregisterMetrics(t)
+ sm = getSchedulerMetrics(t)
+ defer unregisterMetrics()
- csm.IncTotalDecommissionedNodes()
+ sm.IncTotalDecommissionedNodes()
verifyMetric(t, 1, "decommissioned")
}
func TestUnhealthyNodes(t *testing.T) {
- csm = getSchedulerMetrics(t)
- defer unregisterMetrics(t)
+ sm = getSchedulerMetrics(t)
+ defer unregisterMetrics()
- csm.IncUnhealthyNodes()
+ sm.IncUnhealthyNodes()
verifyMetric(t, 1, "unhealthy")
- csm.DecUnhealthyNodes()
+ sm.DecUnhealthyNodes()
verifyMetric(t, 0, "unhealthy")
}
func TestTryPreemptionLatency(t *testing.T) {
- csm = getSchedulerMetrics(t)
- defer unregisterMetrics(t)
+ sm = getSchedulerMetrics(t)
+ defer unregisterMetrics()
- csm.ObserveTryPreemptionLatency(time.Now().Add(-1 * time.Minute))
+ sm.ObserveTryPreemptionLatency(time.Now().Add(-1 * time.Minute))
verifyHistogram(t, "trypreemption_latency_milliseconds", 60, 1)
}
func getSchedulerMetrics(t *testing.T) *SchedulerMetrics {
- unregisterMetrics(t)
+ unregisterMetrics()
return InitSchedulerMetrics()
}
@@ -112,12 +112,8 @@ func verifyMetric(t *testing.T, expectedCounter float64,
expectedState string) {
assert.Assert(t, checked, "Failed to find metric")
}
-func unregisterMetrics(t *testing.T) {
- sm, ok := GetSchedulerMetrics().(*SchedulerMetrics)
- if !ok {
- t.Fatalf("Type assertion failed, metrics is not
SchedulerMetrics")
- }
-
+func unregisterMetrics() {
+ sm := GetSchedulerMetrics()
prometheus.Unregister(sm.containerAllocation)
prometheus.Unregister(sm.applicationSubmission)
prometheus.Unregister(sm.application)
diff --git a/pkg/scheduler/health_checker.go b/pkg/scheduler/health_checker.go
index 4019a89c..92a758ce 100644
--- a/pkg/scheduler/health_checker.go
+++ b/pkg/scheduler/health_checker.go
@@ -189,7 +189,7 @@ func updateSchedulerLastHealthStatus(latest
*dao.SchedulerHealthDAOInfo, schedul
schedulerContext.SetLastHealthCheckResult(latest)
}
-func GetSchedulerHealthStatus(metrics metrics.CoreSchedulerMetrics,
schedulerContext *ClusterContext) dao.SchedulerHealthDAOInfo {
+func GetSchedulerHealthStatus(metrics *metrics.SchedulerMetrics,
schedulerContext *ClusterContext) dao.SchedulerHealthDAOInfo {
var healthInfo []dao.HealthCheckInfo
healthInfo = append(healthInfo, checkSchedulingErrors(metrics))
healthInfo = append(healthInfo, checkFailedNodes(metrics))
@@ -214,7 +214,7 @@ func CreateCheckInfo(succeeded bool, name, description,
message string) dao.Heal
DiagnosisMessage: message,
}
}
-func checkSchedulingErrors(metrics metrics.CoreSchedulerMetrics)
dao.HealthCheckInfo {
+func checkSchedulingErrors(metrics *metrics.SchedulerMetrics)
dao.HealthCheckInfo {
schedulingErrors, err := metrics.GetSchedulingErrors()
if err != nil {
return CreateCheckInfo(false, "Scheduling errors", "Check for
scheduling error entries in metrics", err.Error())
@@ -223,7 +223,7 @@ func checkSchedulingErrors(metrics
metrics.CoreSchedulerMetrics) dao.HealthCheck
return CreateCheckInfo(schedulingErrors == 0, "Scheduling errors",
"Check for scheduling error entries in metrics", diagnosisMsg)
}
-func checkFailedNodes(metrics metrics.CoreSchedulerMetrics)
dao.HealthCheckInfo {
+func checkFailedNodes(metrics *metrics.SchedulerMetrics) dao.HealthCheckInfo {
failedNodes, err := metrics.GetFailedNodes()
if err != nil {
return CreateCheckInfo(false, "Failed nodes", "Check for failed
nodes entries in metrics", err.Error())
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]