This is an automated email from the ASF dual-hosted git repository.
ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 89348e6f [YUNIKORN-2876] Initialize queue metrics for app after queue
is set (#968)
89348e6f is described below
commit 89348e6f669d30390d1cfe2996b2cbddc4b014d5
Author: qzhu <[email protected]>
AuthorDate: Tue Sep 17 12:02:26 2024 -0500
[YUNIKORN-2876] Initialize queue metrics for app after queue is set (#968)
Closes: #968
Signed-off-by: Craig Condit <[email protected]>
---
pkg/scheduler/objects/application.go | 10 ++----
pkg/scheduler/objects/application_state_test.go | 7 +++++
pkg/scheduler/partition_test.go | 42 +++++++++++++++++++++++++
3 files changed, 52 insertions(+), 7 deletions(-)
diff --git a/pkg/scheduler/objects/application.go
b/pkg/scheduler/objects/application.go
index a6840854..7df5c1e4 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -52,7 +52,6 @@ var (
terminatedTimeout = 3 * 24 * time.Hour
defaultPlaceholderTimeout = 15 * time.Minute
)
-
var initAppLogOnce sync.Once
var rateLimitedAppLog *log.RateLimitedLogger
@@ -191,15 +190,9 @@ func NewApplication(siApp *si.AddApplicationRequest, ugi
security.UserGroup, eve
app.rmID = rmID
app.appEvents = schedEvt.NewApplicationEvents(events.GetEventSystem())
app.appEvents.SendNewApplicationEvent(app.ApplicationID)
- app.setNewMetrics()
return app
}
-func (sa *Application) setNewMetrics() {
- metrics.GetSchedulerMetrics().IncTotalApplicationsNew()
- metrics.GetQueueMetrics(sa.GetQueuePath()).IncQueueApplicationsNew()
-}
-
func (sa *Application) String() string {
if sa == nil {
return "application is nil"
@@ -1641,6 +1634,9 @@ func (sa *Application) SetQueue(queue *Queue) {
defer sa.Unlock()
sa.queuePath = queue.QueuePath
sa.queue = queue
+ // here we can make sure the queue is not empty
+ metrics.GetQueueMetrics(queue.QueuePath).IncQueueApplicationsNew()
+ metrics.GetSchedulerMetrics().IncTotalApplicationsNew()
}
// remove the leaf queue the application runs in, used when completing the app
diff --git a/pkg/scheduler/objects/application_state_test.go
b/pkg/scheduler/objects/application_state_test.go
index 33c218ff..16b81144 100644
--- a/pkg/scheduler/objects/application_state_test.go
+++ b/pkg/scheduler/objects/application_state_test.go
@@ -286,6 +286,7 @@ func TestAppStateTransitionEvents(t *testing.T) {
func TestAppStateTransitionMetrics(t *testing.T) { //nolint:funlen
queue := createQueue(t, "metrics")
metrics.GetSchedulerMetrics().Reset()
+ metrics.GetQueueMetrics("root.metrics").Reset()
// app-00001: New -> Resuming -> Accepted --> Running -> Completing->
Completed
app := newApplication("app-00001", "default", "root.metrics")
app.SetQueue(queue)
@@ -475,6 +476,12 @@ func TestAppStateTransitionMetrics(t *testing.T) {
//nolint:funlen
assertQueueApplicationsRejectedMetrics(t, app, 1)
assertQueueApplicationsFailedMetrics(t, app, 2)
assertQueueApplicationsCompletedMetrics(t, app, 1)
+
+ // app-00005: the queuePath is empty, it will happen for dynamic queue
when it before the queue is created
+ app = newApplication("app-00005", "default", "")
+ assertState(t, app, nil, New.String())
+ assertQueueApplicationsNewMetrics(t, app, 0)
+ assertTotalAppsNewMetrics(t, 4)
}
func assertState(t testing.TB, app *Application, err error, expected string) {
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index b47fb234..ed036502 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -31,6 +31,7 @@ import (
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/common/security"
"github.com/apache/yunikorn-core/pkg/events"
+ "github.com/apache/yunikorn-core/pkg/metrics"
"github.com/apache/yunikorn-core/pkg/mock"
"github.com/apache/yunikorn-core/pkg/plugins"
"github.com/apache/yunikorn-core/pkg/rmproxy/rmevent"
@@ -246,6 +247,9 @@ func TestRemoveNodeWithAllocations(t *testing.T) {
partition, err := newBasePartition()
assert.NilError(t, err, "partition create failed")
+ defer metrics.GetSchedulerMetrics().Reset()
+ defer metrics.GetQueueMetrics(defQueue).Reset()
+
// add a new app
app := newApplication(appID1, "default", defQueue)
err = partition.AddApplication(app)
@@ -293,6 +297,9 @@ func TestRemoveNodeWithPlaceholders(t *testing.T) {
partition, err := newBasePartition()
assert.NilError(t, err, "partition create failed")
+ defer metrics.GetSchedulerMetrics().Reset()
+ defer metrics.GetQueueMetrics(defQueue).Reset()
+
// add a new app
app := newApplication(appID1, "default", defQueue)
err = partition.AddApplication(app)
@@ -395,6 +402,9 @@ func TestPlaceholderDataWithPlaceholderPreemption(t
*testing.T) {
partition, err := newBasePartition()
assert.NilError(t, err, "partition create failed")
+ defer metrics.GetSchedulerMetrics().Reset()
+ defer metrics.GetQueueMetrics(defQueue).Reset()
+
// add a new app1
app1, _ := newApplicationWithHandler(appID1, "default", defQueue)
err = partition.AddApplication(app1)
@@ -522,6 +532,9 @@ func TestPlaceholderDataWithNodeRemoval(t *testing.T) {
partition, err := newBasePartition()
assert.NilError(t, err, "partition create failed")
+ defer metrics.GetSchedulerMetrics().Reset()
+ defer metrics.GetQueueMetrics(defQueue).Reset()
+
// add a new app1
app1, _ := newApplicationWithHandler(appID1, "default", defQueue)
err = partition.AddApplication(app1)
@@ -605,6 +618,9 @@ func TestPlaceholderDataWithRemoval(t *testing.T) {
partition, err := newBasePartition()
assert.NilError(t, err, "partition create failed")
+ defer metrics.GetSchedulerMetrics().Reset()
+ defer metrics.GetQueueMetrics(defQueue).Reset()
+
// add a new app1
app1, _ := newApplicationWithHandler(appID1, "default", defQueue)
err = partition.AddApplication(app1)
@@ -698,6 +714,8 @@ func TestRemoveNodeWithReplacement(t *testing.T) {
partition, err := newBasePartition()
assert.NilError(t, err, "partition create failed")
+ defer metrics.GetSchedulerMetrics().Reset()
+ defer metrics.GetQueueMetrics(defQueue).Reset()
// add a new app
app := newApplication(appID1, "default", defQueue)
err = partition.AddApplication(app)
@@ -770,6 +788,9 @@ func TestRemoveNodeWithReal(t *testing.T) {
partition, err := newBasePartition()
assert.NilError(t, err, "partition create failed")
+ defer metrics.GetSchedulerMetrics().Reset()
+ defer metrics.GetQueueMetrics(defQueue).Reset()
+
// add a new app
app := newApplication(appID1, "default", defQueue)
err = partition.AddApplication(app)
@@ -831,6 +852,8 @@ func TestRemoveNodeWithReal(t *testing.T) {
}
func TestAddApp(t *testing.T) {
+ defer metrics.GetSchedulerMetrics().Reset()
+ defer metrics.GetQueueMetrics(defQueue).Reset()
partition, err := newBasePartition()
assert.NilError(t, err, "partition create failed")
@@ -838,6 +861,13 @@ func TestAddApp(t *testing.T) {
app := newApplication(appID1, "default", defQueue)
err = partition.AddApplication(app)
assert.NilError(t, err, "add application to partition should not have
failed")
+ queueApplicationsNew, err :=
metrics.GetQueueMetrics(defQueue).GetQueueApplicationsNew()
+ assert.NilError(t, err, "get queue metrics failed")
+ assert.Equal(t, queueApplicationsNew, 1)
+ scheduleApplicationsNew, err :=
metrics.GetSchedulerMetrics().GetTotalApplicationsNew()
+ assert.NilError(t, err, "get scheduler metrics failed")
+ assert.Equal(t, scheduleApplicationsNew, 1)
+
// add the same app
err = partition.AddApplication(app)
if err == nil {
@@ -853,6 +883,12 @@ func TestAddApp(t *testing.T) {
if err == nil || partition.getApplication(appID2) != nil {
t.Errorf("add application on stopped partition should have
failed but did not")
}
+ queueApplicationsNew, err =
metrics.GetQueueMetrics(defQueue).GetQueueApplicationsNew()
+ assert.NilError(t, err, "get queue metrics failed")
+ assert.Equal(t, queueApplicationsNew, 1)
+ scheduleApplicationsNew, err =
metrics.GetSchedulerMetrics().GetTotalApplicationsNew()
+ assert.NilError(t, err, "get scheduler metrics failed")
+ assert.Equal(t, scheduleApplicationsNew, 1)
// mark partition for deletion, no new application can be added
partition.stateMachine.SetState(objects.Active.String())
@@ -863,6 +899,12 @@ func TestAddApp(t *testing.T) {
if err == nil || partition.getApplication(appID3) != nil {
t.Errorf("add application on draining partition should have
failed but did not")
}
+ queueApplicationsNew, err =
metrics.GetQueueMetrics(defQueue).GetQueueApplicationsNew()
+ assert.NilError(t, err, "get queue metrics failed")
+ assert.Equal(t, queueApplicationsNew, 1)
+ scheduleApplicationsNew, err =
metrics.GetSchedulerMetrics().GetTotalApplicationsNew()
+ assert.NilError(t, err, "get scheduler metrics failed")
+ assert.Equal(t, scheduleApplicationsNew, 1)
}
func TestAddAppForced(t *testing.T) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]