This is an automated email from the ASF dual-hosted git repository.

ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 89348e6f [YUNIKORN-2876] Initialize queue metrics for app after queue 
is set (#968)
89348e6f is described below

commit 89348e6f669d30390d1cfe2996b2cbddc4b014d5
Author: qzhu <[email protected]>
AuthorDate: Tue Sep 17 12:02:26 2024 -0500

    [YUNIKORN-2876] Initialize queue metrics for app after queue is set (#968)
    
    Closes: #968
    
    Signed-off-by: Craig Condit <[email protected]>
---
 pkg/scheduler/objects/application.go            | 10 ++----
 pkg/scheduler/objects/application_state_test.go |  7 +++++
 pkg/scheduler/partition_test.go                 | 42 +++++++++++++++++++++++++
 3 files changed, 52 insertions(+), 7 deletions(-)

diff --git a/pkg/scheduler/objects/application.go 
b/pkg/scheduler/objects/application.go
index a6840854..7df5c1e4 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -52,7 +52,6 @@ var (
        terminatedTimeout         = 3 * 24 * time.Hour
        defaultPlaceholderTimeout = 15 * time.Minute
 )
-
 var initAppLogOnce sync.Once
 var rateLimitedAppLog *log.RateLimitedLogger
 
@@ -191,15 +190,9 @@ func NewApplication(siApp *si.AddApplicationRequest, ugi 
security.UserGroup, eve
        app.rmID = rmID
        app.appEvents = schedEvt.NewApplicationEvents(events.GetEventSystem())
        app.appEvents.SendNewApplicationEvent(app.ApplicationID)
-       app.setNewMetrics()
        return app
 }
 
-func (sa *Application) setNewMetrics() {
-       metrics.GetSchedulerMetrics().IncTotalApplicationsNew()
-       metrics.GetQueueMetrics(sa.GetQueuePath()).IncQueueApplicationsNew()
-}
-
 func (sa *Application) String() string {
        if sa == nil {
                return "application is nil"
@@ -1641,6 +1634,9 @@ func (sa *Application) SetQueue(queue *Queue) {
        defer sa.Unlock()
        sa.queuePath = queue.QueuePath
        sa.queue = queue
+       // here we can make sure the queue is not empty
+       metrics.GetQueueMetrics(queue.QueuePath).IncQueueApplicationsNew()
+       metrics.GetSchedulerMetrics().IncTotalApplicationsNew()
 }
 
 // remove the leaf queue the application runs in, used when completing the app
diff --git a/pkg/scheduler/objects/application_state_test.go 
b/pkg/scheduler/objects/application_state_test.go
index 33c218ff..16b81144 100644
--- a/pkg/scheduler/objects/application_state_test.go
+++ b/pkg/scheduler/objects/application_state_test.go
@@ -286,6 +286,7 @@ func TestAppStateTransitionEvents(t *testing.T) {
 func TestAppStateTransitionMetrics(t *testing.T) { //nolint:funlen
        queue := createQueue(t, "metrics")
        metrics.GetSchedulerMetrics().Reset()
+       metrics.GetQueueMetrics("root.metrics").Reset()
        // app-00001: New -> Resuming -> Accepted --> Running -> Completing-> 
Completed
        app := newApplication("app-00001", "default", "root.metrics")
        app.SetQueue(queue)
@@ -475,6 +476,12 @@ func TestAppStateTransitionMetrics(t *testing.T) { 
//nolint:funlen
        assertQueueApplicationsRejectedMetrics(t, app, 1)
        assertQueueApplicationsFailedMetrics(t, app, 2)
        assertQueueApplicationsCompletedMetrics(t, app, 1)
+
+       // app-00005: the queuePath is empty, it will happen for dynamic queue 
when it before the queue is created
+       app = newApplication("app-00005", "default", "")
+       assertState(t, app, nil, New.String())
+       assertQueueApplicationsNewMetrics(t, app, 0)
+       assertTotalAppsNewMetrics(t, 4)
 }
 
 func assertState(t testing.TB, app *Application, err error, expected string) {
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index b47fb234..ed036502 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -31,6 +31,7 @@ import (
        "github.com/apache/yunikorn-core/pkg/common/resources"
        "github.com/apache/yunikorn-core/pkg/common/security"
        "github.com/apache/yunikorn-core/pkg/events"
+       "github.com/apache/yunikorn-core/pkg/metrics"
        "github.com/apache/yunikorn-core/pkg/mock"
        "github.com/apache/yunikorn-core/pkg/plugins"
        "github.com/apache/yunikorn-core/pkg/rmproxy/rmevent"
@@ -246,6 +247,9 @@ func TestRemoveNodeWithAllocations(t *testing.T) {
        partition, err := newBasePartition()
        assert.NilError(t, err, "partition create failed")
 
+       defer metrics.GetSchedulerMetrics().Reset()
+       defer metrics.GetQueueMetrics(defQueue).Reset()
+
        // add a new app
        app := newApplication(appID1, "default", defQueue)
        err = partition.AddApplication(app)
@@ -293,6 +297,9 @@ func TestRemoveNodeWithPlaceholders(t *testing.T) {
        partition, err := newBasePartition()
        assert.NilError(t, err, "partition create failed")
 
+       defer metrics.GetSchedulerMetrics().Reset()
+       defer metrics.GetQueueMetrics(defQueue).Reset()
+
        // add a new app
        app := newApplication(appID1, "default", defQueue)
        err = partition.AddApplication(app)
@@ -395,6 +402,9 @@ func TestPlaceholderDataWithPlaceholderPreemption(t 
*testing.T) {
        partition, err := newBasePartition()
        assert.NilError(t, err, "partition create failed")
 
+       defer metrics.GetSchedulerMetrics().Reset()
+       defer metrics.GetQueueMetrics(defQueue).Reset()
+
        // add a new app1
        app1, _ := newApplicationWithHandler(appID1, "default", defQueue)
        err = partition.AddApplication(app1)
@@ -522,6 +532,9 @@ func TestPlaceholderDataWithNodeRemoval(t *testing.T) {
        partition, err := newBasePartition()
        assert.NilError(t, err, "partition create failed")
 
+       defer metrics.GetSchedulerMetrics().Reset()
+       defer metrics.GetQueueMetrics(defQueue).Reset()
+
        // add a new app1
        app1, _ := newApplicationWithHandler(appID1, "default", defQueue)
        err = partition.AddApplication(app1)
@@ -605,6 +618,9 @@ func TestPlaceholderDataWithRemoval(t *testing.T) {
        partition, err := newBasePartition()
        assert.NilError(t, err, "partition create failed")
 
+       defer metrics.GetSchedulerMetrics().Reset()
+       defer metrics.GetQueueMetrics(defQueue).Reset()
+
        // add a new app1
        app1, _ := newApplicationWithHandler(appID1, "default", defQueue)
        err = partition.AddApplication(app1)
@@ -698,6 +714,8 @@ func TestRemoveNodeWithReplacement(t *testing.T) {
        partition, err := newBasePartition()
        assert.NilError(t, err, "partition create failed")
 
+       defer metrics.GetSchedulerMetrics().Reset()
+       defer metrics.GetQueueMetrics(defQueue).Reset()
        // add a new app
        app := newApplication(appID1, "default", defQueue)
        err = partition.AddApplication(app)
@@ -770,6 +788,9 @@ func TestRemoveNodeWithReal(t *testing.T) {
        partition, err := newBasePartition()
        assert.NilError(t, err, "partition create failed")
 
+       defer metrics.GetSchedulerMetrics().Reset()
+       defer metrics.GetQueueMetrics(defQueue).Reset()
+
        // add a new app
        app := newApplication(appID1, "default", defQueue)
        err = partition.AddApplication(app)
@@ -831,6 +852,8 @@ func TestRemoveNodeWithReal(t *testing.T) {
 }
 
 func TestAddApp(t *testing.T) {
+       defer metrics.GetSchedulerMetrics().Reset()
+       defer metrics.GetQueueMetrics(defQueue).Reset()
        partition, err := newBasePartition()
        assert.NilError(t, err, "partition create failed")
 
@@ -838,6 +861,13 @@ func TestAddApp(t *testing.T) {
        app := newApplication(appID1, "default", defQueue)
        err = partition.AddApplication(app)
        assert.NilError(t, err, "add application to partition should not have 
failed")
+       queueApplicationsNew, err := 
metrics.GetQueueMetrics(defQueue).GetQueueApplicationsNew()
+       assert.NilError(t, err, "get queue metrics failed")
+       assert.Equal(t, queueApplicationsNew, 1)
+       scheduleApplicationsNew, err := 
metrics.GetSchedulerMetrics().GetTotalApplicationsNew()
+       assert.NilError(t, err, "get scheduler metrics failed")
+       assert.Equal(t, scheduleApplicationsNew, 1)
+
        // add the same app
        err = partition.AddApplication(app)
        if err == nil {
@@ -853,6 +883,12 @@ func TestAddApp(t *testing.T) {
        if err == nil || partition.getApplication(appID2) != nil {
                t.Errorf("add application on stopped partition should have 
failed but did not")
        }
+       queueApplicationsNew, err = 
metrics.GetQueueMetrics(defQueue).GetQueueApplicationsNew()
+       assert.NilError(t, err, "get queue metrics failed")
+       assert.Equal(t, queueApplicationsNew, 1)
+       scheduleApplicationsNew, err = 
metrics.GetSchedulerMetrics().GetTotalApplicationsNew()
+       assert.NilError(t, err, "get scheduler metrics failed")
+       assert.Equal(t, scheduleApplicationsNew, 1)
 
        // mark partition for deletion, no new application can be added
        partition.stateMachine.SetState(objects.Active.String())
@@ -863,6 +899,12 @@ func TestAddApp(t *testing.T) {
        if err == nil || partition.getApplication(appID3) != nil {
                t.Errorf("add application on draining partition should have 
failed but did not")
        }
+       queueApplicationsNew, err = 
metrics.GetQueueMetrics(defQueue).GetQueueApplicationsNew()
+       assert.NilError(t, err, "get queue metrics failed")
+       assert.Equal(t, queueApplicationsNew, 1)
+       scheduleApplicationsNew, err = 
metrics.GetSchedulerMetrics().GetTotalApplicationsNew()
+       assert.NilError(t, err, "get scheduler metrics failed")
+       assert.Equal(t, scheduleApplicationsNew, 1)
 }
 
 func TestAddAppForced(t *testing.T) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to