This is an automated email from the ASF dual-hosted git repository.

wilfreds pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 174c5fd8 [YUNIKORN-2116] Track user/group events (#800)
174c5fd8 is described below

commit 174c5fd8f1199c1f8a57e436dcc2bad8782e9723
Author: Peter Bacsko <[email protected]>
AuthorDate: Thu Feb 22 17:19:50 2024 +1100

    [YUNIKORN-2116] Track user/group events (#800)
    
    Add events t the event system for user and group quotas. Additional
    events are generated for configuration changes and usage changes for
    both the user and groups.
    The (un)linking of and app to a group is tracked in separate events.
    
    Closes: #800
    
    Signed-off-by: Wilfred Spiegelenburg <[email protected]>
---
 pkg/events/events.go                             |   4 +
 pkg/events/events_test.go                        |   3 +
 pkg/scheduler/objects/application.go             |  42 ++++
 pkg/scheduler/objects/application_events.go      |  40 +++-
 pkg/scheduler/objects/application_events_test.go |  92 +++++++++
 pkg/scheduler/objects/application_test.go        |  55 ++++++
 pkg/scheduler/objects/queue.go                   |   5 +-
 pkg/scheduler/tests/application_tracking_test.go |  62 +++---
 pkg/scheduler/ugm/group_tracker.go               |  28 ++-
 pkg/scheduler/ugm/group_tracker_test.go          | 137 +++++++++----
 pkg/scheduler/ugm/manager.go                     |  68 +++----
 pkg/scheduler/ugm/ugm_events.go                  | 116 +++++++++++
 pkg/scheduler/ugm/ugm_events_test.go             | 239 +++++++++++++++++++++++
 pkg/scheduler/ugm/user_tracker.go                |  41 +++-
 pkg/scheduler/ugm/user_tracker_test.go           | 164 ++++++++++++----
 15 files changed, 942 insertions(+), 154 deletions(-)

diff --git a/pkg/events/events.go b/pkg/events/events.go
index be6f72b8..819dd661 100644
--- a/pkg/events/events.go
+++ b/pkg/events/events.go
@@ -54,3 +54,7 @@ func CreateNodeEventRecord(objectID, message, referenceID 
string, changeType si.
 func CreateQueueEventRecord(objectID, message, referenceID string, changeType 
si.EventRecord_ChangeType, changeDetail si.EventRecord_ChangeDetail, resource 
*resources.Resource) *si.EventRecord {
        return createEventRecord(si.EventRecord_QUEUE, objectID, referenceID, 
message, changeType, changeDetail, resource)
 }
+
+func CreateUserGroupEventRecord(objectID, message, referenceID string, 
changeType si.EventRecord_ChangeType, changeDetail si.EventRecord_ChangeDetail, 
resource *resources.Resource) *si.EventRecord {
+       return createEventRecord(si.EventRecord_USERGROUP, objectID, 
referenceID, message, changeType, changeDetail, resource)
+}
diff --git a/pkg/events/events_test.go b/pkg/events/events_test.go
index dc86fcd1..11bca877 100644
--- a/pkg/events/events_test.go
+++ b/pkg/events/events_test.go
@@ -58,4 +58,7 @@ func TestCreateEventRecordTypes(t *testing.T) {
 
        record = CreateQueueEventRecord("queue", "message", "app", 
si.EventRecord_NONE, si.EventRecord_DETAILS_NONE, nil)
        assert.Equal(t, record.Type, si.EventRecord_QUEUE)
+
+       record = CreateUserGroupEventRecord("user", "message", "queue", 
si.EventRecord_NONE, si.EventRecord_DETAILS_NONE, nil)
+       assert.Equal(t, record.Type, si.EventRecord_USERGROUP)
 }
diff --git a/pkg/scheduler/objects/application.go 
b/pkg/scheduler/objects/application.go
index b8d8f60a..cde6fbd9 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -112,6 +112,8 @@ type Application struct {
        placeholderData      map[string]*PlaceholderData // track placeholder 
and gang related info
        askMaxPriority       int32                       // highest priority 
value of outstanding asks
        hasPlaceholderAlloc  bool                        // Whether there is at 
least one allocated placeholder
+       runnableInQueue      bool                        // whether the 
application is runnable/schedulable in the queue. Default is true.
+       runnableByUserLimit  bool                        // whether the 
application is runnable/schedulable based on user/group quota. Default is true.
 
        rmEventHandler        handler.EventHandler
        rmID                  string
@@ -171,6 +173,8 @@ func NewApplication(siApp *si.AddApplicationRequest, ugi 
security.UserGroup, eve
                askMaxPriority:        configs.MinPriority,
                sortedRequests:        sortedRequests{},
                sendStateChangeEvents: true,
+               runnableByUserLimit:   true,
+               runnableInQueue:       true,
        }
        placeholderTimeout := common.ConvertSITimeoutWithAdjustment(siApp, 
defaultPlaceholderTimeout)
        gangSchedStyle := siApp.GetGangSchedulingStyle()
@@ -2083,3 +2087,41 @@ func getRateLimitedAppLog() *log.RateLimitedLogger {
        })
        return rateLimitedAppLog
 }
+
+func (sa *Application) updateRunnableStatus(runnableInQueue, 
runnableByUserLimit bool) {
+       sa.Lock()
+       defer sa.Unlock()
+       if sa.runnableInQueue != runnableInQueue {
+               if runnableInQueue {
+                       log.Log(log.SchedApplication).Info("Application is now 
runnable in queue",
+                               zap.String("appID", sa.ApplicationID),
+                               zap.String("queue", sa.queuePath))
+                       sa.appEvents.sendAppRunnableInQueueEvent()
+               } else {
+                       log.Log(log.SchedApplication).Info("Maximum number of 
running applications reached the queue limit",
+                               zap.String("appID", sa.ApplicationID),
+                               zap.String("queue", sa.queuePath))
+                       sa.appEvents.sendAppNotRunnableInQueueEvent()
+               }
+       }
+       sa.runnableInQueue = runnableInQueue
+
+       if sa.runnableByUserLimit != runnableByUserLimit {
+               if runnableByUserLimit {
+                       log.Log(log.SchedApplication).Info("Application is now 
runnable based on user/group quota",
+                               zap.String("appID", sa.ApplicationID),
+                               zap.String("queue", sa.queuePath),
+                               zap.String("user", sa.user.User),
+                               zap.Strings("groups", sa.user.Groups))
+                       sa.appEvents.sendAppRunnableQuotaEvent()
+               } else {
+                       log.Log(log.SchedApplication).Info("Maximum number of 
running applications reached the user/group limit",
+                               zap.String("appID", sa.ApplicationID),
+                               zap.String("queue", sa.queuePath),
+                               zap.String("user", sa.user.User),
+                               zap.Strings("groups", sa.user.Groups))
+                       sa.appEvents.sendAppNotRunnableQuotaEvent()
+               }
+       }
+       sa.runnableByUserLimit = runnableByUserLimit
+}
diff --git a/pkg/scheduler/objects/application_events.go 
b/pkg/scheduler/objects/application_events.go
index 58a24ab9..0f0aa5d6 100644
--- a/pkg/scheduler/objects/application_events.go
+++ b/pkg/scheduler/objects/application_events.go
@@ -83,7 +83,7 @@ func (evt *applicationEvents) sendRemoveAskEvent(request 
*AllocationAsk, detail
        if !evt.eventSystem.IsEventTrackingEnabled() {
                return
        }
-       event := events.CreateAppEventRecord(evt.app.ApplicationID, "", 
request.GetAllocationKey(), si.EventRecord_REMOVE, detail, 
request.GetAllocatedResource())
+       event := events.CreateAppEventRecord(evt.app.ApplicationID, 
common.Empty, request.GetAllocationKey(), si.EventRecord_REMOVE, detail, 
request.GetAllocatedResource())
        evt.eventSystem.AddEvent(event)
 }
 
@@ -91,7 +91,7 @@ func (evt *applicationEvents) sendNewApplicationEvent() {
        if !evt.eventSystem.IsEventTrackingEnabled() {
                return
        }
-       event := events.CreateAppEventRecord(evt.app.ApplicationID, "", "", 
si.EventRecord_ADD, si.EventRecord_DETAILS_NONE, evt.app.allocatedResource)
+       event := events.CreateAppEventRecord(evt.app.ApplicationID, 
common.Empty, common.Empty, si.EventRecord_ADD, si.EventRecord_DETAILS_NONE, 
evt.app.allocatedResource)
        evt.eventSystem.AddEvent(event)
 }
 
@@ -99,7 +99,7 @@ func (evt *applicationEvents) sendRemoveApplicationEvent() {
        if !evt.eventSystem.IsEventTrackingEnabled() {
                return
        }
-       event := events.CreateAppEventRecord(evt.app.ApplicationID, "", "", 
si.EventRecord_REMOVE, si.EventRecord_DETAILS_NONE, evt.app.allocatedResource)
+       event := events.CreateAppEventRecord(evt.app.ApplicationID, 
common.Empty, common.Empty, si.EventRecord_REMOVE, si.EventRecord_DETAILS_NONE, 
evt.app.allocatedResource)
        evt.eventSystem.AddEvent(event)
 }
 
@@ -107,7 +107,39 @@ func (evt *applicationEvents) 
sendStateChangeEvent(changeDetail si.EventRecord_C
        if !evt.eventSystem.IsEventTrackingEnabled() || 
!evt.app.sendStateChangeEvents {
                return
        }
-       event := events.CreateAppEventRecord(evt.app.ApplicationID, eventInfo, 
"", si.EventRecord_SET, changeDetail, evt.app.allocatedResource)
+       event := events.CreateAppEventRecord(evt.app.ApplicationID, eventInfo, 
common.Empty, si.EventRecord_SET, changeDetail, evt.app.allocatedResource)
+       evt.eventSystem.AddEvent(event)
+}
+
+func (evt *applicationEvents) sendAppNotRunnableInQueueEvent() {
+       if !evt.eventSystem.IsEventTrackingEnabled() {
+               return
+       }
+       event := events.CreateAppEventRecord(evt.app.ApplicationID, 
common.Empty, common.Empty, si.EventRecord_NONE, 
si.EventRecord_APP_CANNOTRUN_QUEUE, nil)
+       evt.eventSystem.AddEvent(event)
+}
+
+func (evt *applicationEvents) sendAppRunnableInQueueEvent() {
+       if !evt.eventSystem.IsEventTrackingEnabled() {
+               return
+       }
+       event := events.CreateAppEventRecord(evt.app.ApplicationID, 
common.Empty, common.Empty, si.EventRecord_NONE, 
si.EventRecord_APP_RUNNABLE_QUEUE, nil)
+       evt.eventSystem.AddEvent(event)
+}
+
+func (evt *applicationEvents) sendAppNotRunnableQuotaEvent() {
+       if !evt.eventSystem.IsEventTrackingEnabled() {
+               return
+       }
+       event := events.CreateAppEventRecord(evt.app.ApplicationID, 
common.Empty, common.Empty, si.EventRecord_NONE, 
si.EventRecord_APP_CANNOTRUN_QUOTA, nil)
+       evt.eventSystem.AddEvent(event)
+}
+
+func (evt *applicationEvents) sendAppRunnableQuotaEvent() {
+       if !evt.eventSystem.IsEventTrackingEnabled() {
+               return
+       }
+       event := events.CreateAppEventRecord(evt.app.ApplicationID, 
common.Empty, common.Empty, si.EventRecord_NONE, 
si.EventRecord_APP_RUNNABLE_QUOTA, nil)
        evt.eventSystem.AddEvent(event)
 }
 
diff --git a/pkg/scheduler/objects/application_events_test.go 
b/pkg/scheduler/objects/application_events_test.go
index 7e29f6ae..1dd01c4a 100644
--- a/pkg/scheduler/objects/application_events_test.go
+++ b/pkg/scheduler/objects/application_events_test.go
@@ -343,3 +343,95 @@ func TestSendStateChangeEvent(t *testing.T) {
        assert.Equal(t, "", event.ReferenceID)
        assert.Equal(t, "Failed to add application to partition (placement 
rejected)", event.Message)
 }
+
+func TestSendAppCannotRunInQueueEvent(t *testing.T) {
+       app := &Application{
+               ApplicationID:         appID0,
+               queuePath:             "root.test",
+               sendStateChangeEvents: true,
+       }
+       eventSystem := mock.NewEventSystemDisabled()
+       appEvents := newApplicationEvents(app, eventSystem)
+       appEvents.sendAppNotRunnableInQueueEvent()
+       assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
+
+       eventSystem = mock.NewEventSystem()
+       appEvents = newApplicationEvents(app, eventSystem)
+       appEvents.sendAppNotRunnableInQueueEvent()
+       event := eventSystem.Events[0]
+       assert.Equal(t, si.EventRecord_APP, event.Type)
+       assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
+       assert.Equal(t, si.EventRecord_APP_CANNOTRUN_QUEUE, 
event.EventChangeDetail)
+       assert.Equal(t, "app-0", event.ObjectID)
+       assert.Equal(t, "", event.ReferenceID)
+       assert.Equal(t, "", event.Message)
+}
+
+func TestSendAppCannotRunByQuotaEvent(t *testing.T) {
+       app := &Application{
+               ApplicationID:         appID0,
+               queuePath:             "root.test",
+               sendStateChangeEvents: true,
+       }
+       eventSystem := mock.NewEventSystemDisabled()
+       appEvents := newApplicationEvents(app, eventSystem)
+       appEvents.sendAppNotRunnableQuotaEvent()
+       assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
+
+       eventSystem = mock.NewEventSystem()
+       appEvents = newApplicationEvents(app, eventSystem)
+       appEvents.sendAppNotRunnableQuotaEvent()
+       event := eventSystem.Events[0]
+       assert.Equal(t, si.EventRecord_APP, event.Type)
+       assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
+       assert.Equal(t, si.EventRecord_APP_CANNOTRUN_QUOTA, 
event.EventChangeDetail)
+       assert.Equal(t, "app-0", event.ObjectID)
+       assert.Equal(t, "", event.ReferenceID)
+       assert.Equal(t, "", event.Message)
+}
+
+func TestSendAppRunnableInQueueEvent(t *testing.T) {
+       app := &Application{
+               ApplicationID:         appID0,
+               queuePath:             "root.test",
+               sendStateChangeEvents: true,
+       }
+       eventSystem := mock.NewEventSystemDisabled()
+       appEvents := newApplicationEvents(app, eventSystem)
+       appEvents.sendAppRunnableInQueueEvent()
+       assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
+
+       eventSystem = mock.NewEventSystem()
+       appEvents = newApplicationEvents(app, eventSystem)
+       appEvents.sendAppRunnableInQueueEvent()
+       event := eventSystem.Events[0]
+       assert.Equal(t, si.EventRecord_APP, event.Type)
+       assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
+       assert.Equal(t, si.EventRecord_APP_RUNNABLE_QUEUE, 
event.EventChangeDetail)
+       assert.Equal(t, "app-0", event.ObjectID)
+       assert.Equal(t, "", event.ReferenceID)
+       assert.Equal(t, "", event.Message)
+}
+
+func TestSendAppRunnableByQuotaEvent(t *testing.T) {
+       app := &Application{
+               ApplicationID:         appID0,
+               queuePath:             "root.test",
+               sendStateChangeEvents: true,
+       }
+       eventSystem := mock.NewEventSystemDisabled()
+       appEvents := newApplicationEvents(app, eventSystem)
+       appEvents.sendAppRunnableQuotaEvent()
+       assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
+
+       eventSystem = mock.NewEventSystem()
+       appEvents = newApplicationEvents(app, eventSystem)
+       appEvents.sendAppRunnableQuotaEvent()
+       event := eventSystem.Events[0]
+       assert.Equal(t, si.EventRecord_APP, event.Type)
+       assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
+       assert.Equal(t, si.EventRecord_APP_RUNNABLE_QUOTA, 
event.EventChangeDetail)
+       assert.Equal(t, "app-0", event.ObjectID)
+       assert.Equal(t, "", event.ReferenceID)
+       assert.Equal(t, "", event.Message)
+}
diff --git a/pkg/scheduler/objects/application_test.go 
b/pkg/scheduler/objects/application_test.go
index 3136d06a..bc055ee6 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -2601,6 +2601,61 @@ func TestGetRateLimitedAppLog(t *testing.T) {
        assert.Check(t, l != nil)
 }
 
+func TestUpdateRunnableStatus(t *testing.T) {
+       app := newApplication(appID0, "default", "root.unknown")
+       assert.Assert(t, app.runnableInQueue)
+       assert.Assert(t, app.runnableByUserLimit)
+       eventSystem := mock.NewEventSystem()
+       app.appEvents = newApplicationEvents(app, eventSystem)
+
+       // App runnable - no events
+       app.updateRunnableStatus(true, true)
+       assert.Equal(t, 0, len(eventSystem.Events))
+
+       // App not runnable in queue
+       eventSystem.Reset()
+       app.updateRunnableStatus(false, true)
+       assert.Equal(t, 1, len(eventSystem.Events))
+       assert.Equal(t, si.EventRecord_APP_CANNOTRUN_QUEUE, 
eventSystem.Events[0].EventChangeDetail)
+       // Try again - no new events
+       app.updateRunnableStatus(false, true)
+       assert.Equal(t, 1, len(eventSystem.Events))
+
+       // App becomes runnable in queue
+       eventSystem.Reset()
+       app.updateRunnableStatus(true, true)
+       assert.Equal(t, 1, len(eventSystem.Events))
+       assert.Equal(t, si.EventRecord_APP_RUNNABLE_QUEUE, 
eventSystem.Events[0].EventChangeDetail)
+
+       // Try again - no new events
+       app.updateRunnableStatus(true, true)
+       assert.Equal(t, 1, len(eventSystem.Events))
+
+       // App not runnable by UG quota
+       eventSystem.Reset()
+       app.updateRunnableStatus(true, false)
+       assert.Equal(t, 1, len(eventSystem.Events))
+       assert.Equal(t, si.EventRecord_APP_CANNOTRUN_QUOTA, 
eventSystem.Events[0].EventChangeDetail)
+       // Try again - no new events
+       app.updateRunnableStatus(true, false)
+       assert.Equal(t, 1, len(eventSystem.Events))
+
+       // App becomes runnable by user quota
+       eventSystem.Reset()
+       app.updateRunnableStatus(true, true)
+       assert.Equal(t, si.EventRecord_APP_RUNNABLE_QUOTA, 
eventSystem.Events[0].EventChangeDetail)
+       // Try again - no new events
+       app.updateRunnableStatus(true, true)
+       assert.Equal(t, 1, len(eventSystem.Events))
+
+       // Both false
+       eventSystem.Reset()
+       app.updateRunnableStatus(false, false)
+       assert.Equal(t, 2, len(eventSystem.Events))
+       assert.Equal(t, si.EventRecord_APP_CANNOTRUN_QUEUE, 
eventSystem.Events[0].EventChangeDetail)
+       assert.Equal(t, si.EventRecord_APP_CANNOTRUN_QUOTA, 
eventSystem.Events[1].EventChangeDetail)
+}
+
 func (sa *Application) addPlaceholderDataWithLocking(ask *AllocationAsk) {
        sa.Lock()
        defer sa.Unlock()
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index 43e0b4ed..ebf70e29 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -1355,7 +1355,10 @@ func (sq *Queue) TryAllocate(iterator func() 
NodeIterator, fullIterator func() N
 
                // process the apps (filters out app without pending requests)
                for _, app := range sq.sortApplications(true, false) {
-                       if app.IsAccepted() && 
(!sq.canRunApp(app.ApplicationID) || 
!ugm.GetUserManager().CanRunApp(sq.QueuePath, app.ApplicationID, app.user)) {
+                       runnableInQueue := sq.canRunApp(app.ApplicationID)
+                       runnableByUserLimit := 
ugm.GetUserManager().CanRunApp(sq.QueuePath, app.ApplicationID, app.user)
+                       app.updateRunnableStatus(runnableInQueue, 
runnableByUserLimit)
+                       if app.IsAccepted() && (!runnableInQueue || 
!runnableByUserLimit) {
                                continue
                        }
                        alloc := app.tryAllocate(headRoom, allowPreemption, 
preemptionDelay, &preemptAttemptsRemaining, iterator, fullIterator, getnode)
diff --git a/pkg/scheduler/tests/application_tracking_test.go 
b/pkg/scheduler/tests/application_tracking_test.go
index 9f38b2d5..4f20cfd3 100644
--- a/pkg/scheduler/tests/application_tracking_test.go
+++ b/pkg/scheduler/tests/application_tracking_test.go
@@ -126,9 +126,9 @@ func TestApplicationHistoryTracking(t *testing.T) {
        ms.mockRM.waitForAllocations(t, 1, 1000)
        eventsDao, err = client.GetBatchEvents()
        assert.NilError(t, err)
-       assert.Equal(t, 12, len(eventsDao.EventRecords), "number of events 
generated")
+       assert.Equal(t, 13, len(eventsDao.EventRecords), "number of events 
generated")
        verifyAllocationAskAddedEvents(t, eventsDao.EventRecords[7:])
-       events = getEventsFromStream(t, stream, 5)
+       events = getEventsFromStream(t, stream, 6)
        verifyAllocationAskAddedEvents(t, events)
 
        allocations := ms.mockRM.getAllocations()
@@ -163,9 +163,9 @@ func TestApplicationHistoryTracking(t *testing.T) {
 
        eventsDao, err = client.GetBatchEvents()
        assert.NilError(t, err)
-       assert.Equal(t, 15, len(eventsDao.EventRecords), "number of events 
generated")
-       verifyAllocationCancelledEvents(t, eventsDao.EventRecords[12:])
-       events = getEventsFromStream(t, stream, 3)
+       assert.Equal(t, 17, len(eventsDao.EventRecords), "number of events 
generated")
+       verifyAllocationCancelledEvents(t, eventsDao.EventRecords[13:])
+       events = getEventsFromStream(t, stream, 4)
        assert.NilError(t, err)
        verifyAllocationCancelledEvents(t, events)
 }
@@ -314,35 +314,51 @@ func verifyAllocationAskAddedEvents(t *testing.T, events 
[]*si.EventRecord) {
        assert.Equal(t, si.EventRecord_SET, events[3].EventChangeType)
        assert.Equal(t, si.EventRecord_APP_STARTING, 
events[3].EventChangeDetail)
 
-       // adding allocation to the App
-       assert.Equal(t, "app-1", events[4].ObjectID)
+       // Track resource usage for the user - increment
+       assert.Equal(t, "testuser", events[4].ObjectID)
        assert.Equal(t, "", events[4].Message)
-       assert.Equal(t, si.EventRecord_APP, events[4].Type)
+       assert.Equal(t, "root.singleleaf", events[4].ReferenceID)
+       assert.Equal(t, si.EventRecord_USERGROUP, events[4].Type)
        assert.Equal(t, si.EventRecord_ADD, events[4].EventChangeType)
-       assert.Equal(t, si.EventRecord_APP_ALLOC, events[4].EventChangeDetail)
+       assert.Equal(t, si.EventRecord_UG_USER_RESOURCE, 
events[4].EventChangeDetail)
+
+       // adding allocation to the App
+       assert.Equal(t, "app-1", events[5].ObjectID)
+       assert.Equal(t, "", events[5].Message)
+       assert.Equal(t, si.EventRecord_APP, events[5].Type)
+       assert.Equal(t, si.EventRecord_ADD, events[5].EventChangeType)
+       assert.Equal(t, si.EventRecord_APP_ALLOC, events[5].EventChangeDetail)
 }
 
 func verifyAllocationCancelledEvents(t *testing.T, events []*si.EventRecord) {
-       // state transition to Completing
-       assert.Equal(t, "app-1", events[0].ObjectID)
+       // Track resource usage for the user - decrement
+       assert.Equal(t, "testuser", events[0].ObjectID)
        assert.Equal(t, "", events[0].Message)
-       assert.Equal(t, "", events[0].ReferenceID)
-       assert.Equal(t, si.EventRecord_APP, events[0].Type)
-       assert.Equal(t, si.EventRecord_SET, events[0].EventChangeType)
-       assert.Equal(t, si.EventRecord_APP_COMPLETING, 
events[0].EventChangeDetail)
+       assert.Equal(t, "root.singleleaf", events[0].ReferenceID)
+       assert.Equal(t, si.EventRecord_USERGROUP, events[0].Type)
+       assert.Equal(t, si.EventRecord_REMOVE, events[0].EventChangeType)
+       assert.Equal(t, si.EventRecord_UG_USER_RESOURCE, 
events[0].EventChangeDetail)
 
-       // cancel allocation
+       // state transition to Completing
        assert.Equal(t, "app-1", events[1].ObjectID)
        assert.Equal(t, "", events[1].Message)
+       assert.Equal(t, "", events[1].ReferenceID)
        assert.Equal(t, si.EventRecord_APP, events[1].Type)
-       assert.Equal(t, si.EventRecord_REMOVE, events[1].EventChangeType)
-       assert.Equal(t, si.EventRecord_ALLOC_CANCEL, 
events[1].EventChangeDetail)
+       assert.Equal(t, si.EventRecord_SET, events[1].EventChangeType)
+       assert.Equal(t, si.EventRecord_APP_COMPLETING, 
events[1].EventChangeDetail)
 
-       // remove allocation from the node
-       assert.Equal(t, "node-1:1234", events[2].ObjectID)
+       // cancel allocation
+       assert.Equal(t, "app-1", events[2].ObjectID)
        assert.Equal(t, "", events[2].Message)
-       assert.Equal(t, "alloc-1", events[2].ReferenceID)
-       assert.Equal(t, si.EventRecord_NODE, events[2].Type)
+       assert.Equal(t, si.EventRecord_APP, events[2].Type)
        assert.Equal(t, si.EventRecord_REMOVE, events[2].EventChangeType)
-       assert.Equal(t, si.EventRecord_NODE_ALLOC, events[2].EventChangeDetail)
+       assert.Equal(t, si.EventRecord_ALLOC_CANCEL, 
events[2].EventChangeDetail)
+
+       // remove allocation from the node
+       assert.Equal(t, "node-1:1234", events[3].ObjectID)
+       assert.Equal(t, "", events[3].Message)
+       assert.Equal(t, "alloc-1", events[3].ReferenceID)
+       assert.Equal(t, si.EventRecord_NODE, events[3].Type)
+       assert.Equal(t, si.EventRecord_REMOVE, events[3].EventChangeType)
+       assert.Equal(t, si.EventRecord_NODE_ALLOC, events[3].EventChangeDetail)
 }
diff --git a/pkg/scheduler/ugm/group_tracker.go 
b/pkg/scheduler/ugm/group_tracker.go
index 255eeafa..fc94febd 100644
--- a/pkg/scheduler/ugm/group_tracker.go
+++ b/pkg/scheduler/ugm/group_tracker.go
@@ -19,9 +19,11 @@
 package ugm
 
 import (
+       "strings"
        "sync"
 
        "github.com/apache/yunikorn-core/pkg/common"
+       "github.com/apache/yunikorn-core/pkg/common/configs"
        "github.com/apache/yunikorn-core/pkg/common/resources"
        "github.com/apache/yunikorn-core/pkg/webservice/dao"
 )
@@ -30,40 +32,44 @@ type GroupTracker struct {
        groupName    string            // Name of the group for which usage is 
being tracked upon
        applications map[string]string // Hold applications currently run by 
all users belong to this group
        queueTracker *QueueTracker     // Holds the actual resource usage of 
queue path where application run
+       events       *ugmEvents
 
        sync.RWMutex
 }
 
-func newGroupTracker(groupName string) *GroupTracker {
+func newGroupTracker(groupName string, events *ugmEvents) *GroupTracker {
        queueTracker := newRootQueueTracker(group)
        groupTracker := &GroupTracker{
                groupName:    groupName,
                applications: make(map[string]string),
                queueTracker: queueTracker,
+               events:       events,
        }
        return groupTracker
 }
 
-func (gt *GroupTracker) increaseTrackedResource(hierarchy []string, 
applicationID string, usage *resources.Resource, user string) bool {
+func (gt *GroupTracker) increaseTrackedResource(queuePath, applicationID 
string, usage *resources.Resource, user string) bool {
        if gt == nil {
                return true
        }
        gt.Lock()
        defer gt.Unlock()
+       gt.events.sendIncResourceUsageForGroup(gt.groupName, queuePath, usage)
        gt.applications[applicationID] = user
-       return gt.queueTracker.increaseTrackedResource(hierarchy, 
applicationID, group, usage)
+       return gt.queueTracker.increaseTrackedResource(strings.Split(queuePath, 
configs.DOT), applicationID, group, usage)
 }
 
-func (gt *GroupTracker) decreaseTrackedResource(hierarchy []string, 
applicationID string, usage *resources.Resource, removeApp bool) (bool, bool) {
+func (gt *GroupTracker) decreaseTrackedResource(queuePath, applicationID 
string, usage *resources.Resource, removeApp bool) (bool, bool) {
        if gt == nil {
                return false, true
        }
        gt.Lock()
        defer gt.Unlock()
+       gt.events.sendDecResourceUsageForGroup(gt.groupName, queuePath, usage)
        if removeApp {
                delete(gt.applications, applicationID)
        }
-       return gt.queueTracker.decreaseTrackedResource(hierarchy, 
applicationID, usage, removeApp)
+       return gt.queueTracker.decreaseTrackedResource(strings.Split(queuePath, 
configs.DOT), applicationID, usage, removeApp)
 }
 
 func (gt *GroupTracker) getTrackedApplications() map[string]string {
@@ -72,10 +78,18 @@ func (gt *GroupTracker) getTrackedApplications() 
map[string]string {
        return gt.applications
 }
 
-func (gt *GroupTracker) setLimits(hierarchy []string, resource 
*resources.Resource, maxApps uint64) {
+func (gt *GroupTracker) setLimits(queuePath string, resource 
*resources.Resource, maxApps uint64) {
        gt.Lock()
        defer gt.Unlock()
-       gt.queueTracker.setLimit(hierarchy, resource, maxApps, false, group, 
false)
+       gt.events.sendLimitSetForGroup(gt.groupName, queuePath)
+       gt.queueTracker.setLimit(strings.Split(queuePath, configs.DOT), 
resource, maxApps, false, group, false)
+}
+
+func (gt *GroupTracker) clearLimits(queuePath string) {
+       gt.Lock()
+       defer gt.Unlock()
+       gt.events.sendLimitRemoveForGroup(gt.groupName, queuePath)
+       gt.queueTracker.setLimit(strings.Split(queuePath, configs.DOT), nil, 0, 
false, group, false)
 }
 
 func (gt *GroupTracker) headroom(hierarchy []string) *resources.Resource {
diff --git a/pkg/scheduler/ugm/group_tracker_test.go 
b/pkg/scheduler/ugm/group_tracker_test.go
index a9998ab2..08dca63f 100644
--- a/pkg/scheduler/ugm/group_tracker_test.go
+++ b/pkg/scheduler/ugm/group_tracker_test.go
@@ -19,12 +19,16 @@
 package ugm
 
 import (
+       "strings"
        "testing"
 
        "gotest.tools/v3/assert"
 
+       "github.com/apache/yunikorn-core/pkg/common/configs"
        "github.com/apache/yunikorn-core/pkg/common/resources"
        "github.com/apache/yunikorn-core/pkg/common/security"
+       "github.com/apache/yunikorn-core/pkg/events/mock"
+       "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
 )
 
 func TestGTIncreaseTrackedResource(t *testing.T) {
@@ -34,7 +38,8 @@ func TestGTIncreaseTrackedResource(t *testing.T) {
        // root->parent->child12 (similar name like above leaf queue, but it is 
being treated differently as similar names are allowed)
        manager := GetUserManager()
        user := &security.UserGroup{User: "test", Groups: []string{"test"}}
-       groupTracker := newGroupTracker(user.User)
+       eventSystem := mock.NewEventSystem()
+       groupTracker := newGroupTracker(user.User, newUGMEvents(eventSystem))
 
        usage1, err := resources.NewResourceFromConf(map[string]string{"mem": 
"10M", "vcore": "10"})
        if err != nil {
@@ -42,36 +47,39 @@ func TestGTIncreaseTrackedResource(t *testing.T) {
        }
 
        manager.Headroom(queuePath1, TestApp1, *user)
-       result := groupTracker.increaseTrackedResource(hierarchy1, TestApp1, 
usage1, user.User)
+       result := groupTracker.increaseTrackedResource(path1, TestApp1, usage1, 
user.User)
        if !result {
-               t.Fatalf("unable to increase tracked resource: queuepath %+q, 
app %s, res %v", hierarchy1, TestApp1, usage1)
+               t.Fatalf("unable to increase tracked resource: queuepath %s, 
app %s, res %v", path1, TestApp1, usage1)
        }
+       assert.Equal(t, 1, len(eventSystem.Events))
+       assert.Equal(t, si.EventRecord_UG_GROUP_RESOURCE, 
eventSystem.Events[0].EventChangeDetail)
+       assert.Equal(t, si.EventRecord_ADD, 
eventSystem.Events[0].EventChangeType)
 
        usage2, err := resources.NewResourceFromConf(map[string]string{"mem": 
"20M", "vcore": "20"})
        if err != nil {
                t.Errorf("new resource create returned error or wrong resource: 
error %t, res %v", err, usage2)
        }
-       result = groupTracker.increaseTrackedResource(hierarchy2, TestApp2, 
usage2, user.User)
+       result = groupTracker.increaseTrackedResource(path2, TestApp2, usage2, 
user.User)
        if !result {
-               t.Fatalf("unable to increase tracked resource: queuepath %+q, 
app %s, res %v", hierarchy2, TestApp2, usage2)
+               t.Fatalf("unable to increase tracked resource: queuepath %s, 
app %s, res %v", path2, TestApp2, usage2)
        }
 
        usage3, err := resources.NewResourceFromConf(map[string]string{"mem": 
"30M", "vcore": "30"})
        if err != nil {
                t.Errorf("new resource create returned error or wrong resource: 
error %t, res %v", err, usage3)
        }
-       result = groupTracker.increaseTrackedResource(hierarchy3, TestApp3, 
usage3, user.User)
+       result = groupTracker.increaseTrackedResource(path3, TestApp3, usage3, 
user.User)
        if !result {
-               t.Fatalf("unable to increase tracked resource: queuepath %+q, 
app %s, res %v", hierarchy3, TestApp3, usage3)
+               t.Fatalf("unable to increase tracked resource: queuepath %s, 
app %s, res %v", path3, TestApp3, usage3)
        }
 
        usage4, err := resources.NewResourceFromConf(map[string]string{"mem": 
"20M", "vcore": "20"})
        if err != nil {
                t.Errorf("new resource create returned error or wrong resource: 
error %t, res %v", err, usage3)
        }
-       result = groupTracker.increaseTrackedResource(hierarchy4, TestApp4, 
usage4, user.User)
+       result = groupTracker.increaseTrackedResource(path4, TestApp4, usage4, 
user.User)
        if !result {
-               t.Fatalf("unable to increase tracked resource: queuepath %+q, 
app %s, res %v", hierarchy4, TestApp4, usage4)
+               t.Fatalf("unable to increase tracked resource: queuepath %s, 
app %s, res %v", path4, TestApp4, usage4)
        }
        actualResources := getGroupResource(groupTracker)
 
@@ -91,14 +99,15 @@ func TestGTDecreaseTrackedResource(t *testing.T) {
        // Initialize ugm
        GetUserManager()
        user := &security.UserGroup{User: "test", Groups: []string{"test"}}
-       groupTracker := newGroupTracker(user.User)
+       eventSystem := mock.NewEventSystem()
+       groupTracker := newGroupTracker(user.User, newUGMEvents(eventSystem))
        usage1, err := resources.NewResourceFromConf(map[string]string{"mem": 
"70M", "vcore": "70"})
        if err != nil {
                t.Errorf("new resource create returned error or wrong resource: 
error %t, res %v", err, usage1)
        }
-       result := groupTracker.increaseTrackedResource(hierarchy1, TestApp1, 
usage1, user.User)
+       result := groupTracker.increaseTrackedResource(path1, TestApp1, usage1, 
user.User)
        if !result {
-               t.Fatalf("unable to increase tracked resource: queuepath %+q, 
app %s, res %v", hierarchy1, TestApp1, usage1)
+               t.Fatalf("unable to increase tracked resource: queuepath %s, 
app %s, res %v", path1, TestApp1, usage1)
        }
        assert.Equal(t, 1, len(groupTracker.getTrackedApplications()))
 
@@ -106,9 +115,9 @@ func TestGTDecreaseTrackedResource(t *testing.T) {
        if err != nil {
                t.Errorf("new resource create returned error or wrong resource: 
error %t, res %v", err, usage2)
        }
-       result = groupTracker.increaseTrackedResource(hierarchy2, TestApp2, 
usage2, user.User)
+       result = groupTracker.increaseTrackedResource(path2, TestApp2, usage2, 
user.User)
        if !result {
-               t.Fatalf("unable to increase tracked resource: queuepath %+q, 
app %s, res %v", hierarchy2, TestApp2, usage2)
+               t.Fatalf("unable to increase tracked resource: queuepath %s, 
app %s, res %v", path2, TestApp2, usage2)
        }
        actualResources := getGroupResource(groupTracker)
 
@@ -122,15 +131,19 @@ func TestGTDecreaseTrackedResource(t *testing.T) {
        if err != nil {
                t.Errorf("new resource create returned error or wrong resource: 
error %t, res %v", err, usage3)
        }
-       removeQT, decreased := groupTracker.decreaseTrackedResource(hierarchy1, 
TestApp1, usage3, false)
+       eventSystem.Reset()
+       removeQT, decreased := groupTracker.decreaseTrackedResource(path1, 
TestApp1, usage3, false)
        if !decreased {
-               t.Fatalf("unable to decrease tracked resource: queuepath %+q, 
app %s, res %v, error %t", hierarchy1, TestApp1, usage3, err)
+               t.Fatalf("unable to decrease tracked resource: queuepath %s, 
app %s, res %v, error %t", path1, TestApp1, usage3, err)
        }
        assert.Equal(t, removeQT, false, "wrong remove queue tracker value")
+       assert.Equal(t, 1, len(eventSystem.Events))
+       assert.Equal(t, si.EventRecord_UG_GROUP_RESOURCE, 
eventSystem.Events[0].EventChangeDetail)
+       assert.Equal(t, si.EventRecord_REMOVE, 
eventSystem.Events[0].EventChangeType)
 
-       removeQT, decreased = groupTracker.decreaseTrackedResource(hierarchy2, 
TestApp2, usage3, false)
+       removeQT, decreased = groupTracker.decreaseTrackedResource(path2, 
TestApp2, usage3, false)
        if !decreased {
-               t.Fatalf("unable to decrease tracked resource: queuepath %+q, 
app %s, res %v, error %t", hierarchy2, TestApp2, usage3, err)
+               t.Fatalf("unable to decrease tracked resource: queuepath %s, 
app %s, res %v, error %t", path2, TestApp2, usage3, err)
        }
        assert.Equal(t, removeQT, false, "wrong remove queue tracker value")
 
@@ -146,9 +159,9 @@ func TestGTDecreaseTrackedResource(t *testing.T) {
                t.Errorf("new resource create returned error or wrong resource: 
error %t, res %v", err, usage3)
        }
 
-       removeQT, decreased = groupTracker.decreaseTrackedResource(hierarchy1, 
TestApp1, usage4, true)
+       removeQT, decreased = groupTracker.decreaseTrackedResource(path1, 
TestApp1, usage4, true)
        if !decreased {
-               t.Fatalf("unable to decrease tracked resource: queuepath %+q, 
app %s, res %v, error %t", hierarchy1, TestApp1, usage1, err)
+               t.Fatalf("unable to decrease tracked resource: queuepath %s, 
app %s, res %v, error %t", path1, TestApp1, usage1, err)
        }
        assert.Equal(t, 1, len(groupTracker.getTrackedApplications()))
        assert.Equal(t, removeQT, false, "wrong remove queue tracker value")
@@ -158,44 +171,98 @@ func TestGTDecreaseTrackedResource(t *testing.T) {
                t.Errorf("new resource create returned error or wrong resource: 
error %t, res %v", err, usage3)
        }
 
-       removeQT, decreased = groupTracker.decreaseTrackedResource(hierarchy2, 
TestApp2, usage5, true)
+       removeQT, decreased = groupTracker.decreaseTrackedResource(path2, 
TestApp2, usage5, true)
        if !decreased {
-               t.Fatalf("unable to decrease tracked resource: queuepath %+q, 
app %s, res %v, error %t", hierarchy2, TestApp2, usage2, err)
+               t.Fatalf("unable to decrease tracked resource: queuepath %s, 
app %s, res %v, error %t", path2, TestApp2, usage2, err)
        }
        assert.Equal(t, 0, len(groupTracker.getTrackedApplications()))
        assert.Equal(t, removeQT, true, "wrong remove queue tracker value")
 }
 
-func TestGTSetMaxLimits(t *testing.T) {
+func TestGTSetAndClearMaxLimits(t *testing.T) {
        // Queue setup:
        // root->parent->child1
        // Initialize ugm
        GetUserManager()
        user := security.UserGroup{User: "test", Groups: []string{"test"}}
-       groupTracker := newGroupTracker(user.User)
+       eventSystem := mock.NewEventSystem()
+       groupTracker := newGroupTracker(user.User, newUGMEvents(eventSystem))
        usage1, err := resources.NewResourceFromConf(map[string]string{"mem": 
"10M", "vcore": "10"})
        if err != nil {
                t.Errorf("new resource create returned error or wrong resource: 
error %t, res %v", err, usage1)
        }
 
-       result := groupTracker.increaseTrackedResource(hierarchy1, TestApp1, 
usage1, user.User)
+       result := groupTracker.increaseTrackedResource(path1, TestApp1, usage1, 
user.User)
        if !result {
-               t.Fatalf("unable to increase tracked resource: queuepath %+q, 
app %s, res %v", hierarchy1, TestApp1, usage1)
+               t.Fatalf("unable to increase tracked resource: queuepath %s, 
app %s, res %v", path1, TestApp1, usage1)
        }
 
-       groupTracker.setLimits(hierarchy1, resources.Multiply(usage1, 5), 5)
-       groupTracker.setLimits(hierarchy5, resources.Multiply(usage1, 10), 10)
-
-       result = groupTracker.increaseTrackedResource(hierarchy1, TestApp2, 
usage1, user.User)
+       // higher limits - apps can run
+       eventSystem.Reset()
+       groupTracker.setLimits(path1, resources.Multiply(usage1, 5), 5)
+       groupTracker.setLimits(path5, resources.Multiply(usage1, 10), 10)
+       assert.Equal(t, 2, len(eventSystem.Events))
+       assert.Equal(t, si.EventRecord_UG_GROUP_LIMIT, 
eventSystem.Events[0].EventChangeDetail)
+       assert.Equal(t, si.EventRecord_SET, 
eventSystem.Events[0].EventChangeType)
+       assert.Equal(t, path1, eventSystem.Events[0].ReferenceID)
+       assert.Equal(t, si.EventRecord_UG_GROUP_LIMIT, 
eventSystem.Events[1].EventChangeDetail)
+       assert.Equal(t, si.EventRecord_SET, 
eventSystem.Events[1].EventChangeType)
+       assert.Equal(t, path5, eventSystem.Events[1].ReferenceID)
+       result = groupTracker.increaseTrackedResource(path1, TestApp2, usage1, 
user.User)
        if !result {
-               t.Fatalf("unable to increase tracked resource: queuepath %+q, 
app %s, res %v", hierarchy1, TestApp2, usage1)
+               t.Fatalf("unable to increase tracked resource: queuepath %s, 
app %s, res %v", path1, TestApp2, usage1)
        }
-       result = groupTracker.increaseTrackedResource(hierarchy1, TestApp3, 
usage1, user.User)
+       result = groupTracker.increaseTrackedResource(path1, TestApp3, usage1, 
user.User)
        if !result {
-               t.Fatalf("unable to increase tracked resource: queuepath %+q, 
app %s, res %v", hierarchy1, TestApp3, usage1)
+               t.Fatalf("unable to increase tracked resource: queuepath %s, 
app %s, res %v", path1, TestApp3, usage1)
        }
-       groupTracker.setLimits(hierarchy1, usage1, 1)
-       groupTracker.setLimits(hierarchy5, usage1, 1)
+       path1expectedHeadroom := 
resources.NewResourceFromMap(map[string]resources.Quantity{
+               "mem":   20000000,
+               "vcore": 20000,
+       })
+       hierarchy1 := strings.Split(path1, configs.DOT)
+       assert.Assert(t, resources.Equals(groupTracker.headroom(hierarchy1), 
path1expectedHeadroom))
+       path5expectedHeadroom := 
resources.NewResourceFromMap(map[string]resources.Quantity{
+               "mem":   70000000,
+               "vcore": 70000,
+       })
+       hierarchy5 := strings.Split(path5, configs.DOT)
+       assert.Assert(t, resources.Equals(groupTracker.headroom(hierarchy5), 
path5expectedHeadroom))
+       assert.Assert(t, groupTracker.canRunApp(hierarchy1, TestApp4))
+
+       // lower limits
+       groupTracker.setLimits(path1, usage1, 1)
+       groupTracker.setLimits(path5, resources.Multiply(usage1, 2), 1)
+       lowerChildHeadroom := 
resources.NewResourceFromMap(map[string]resources.Quantity{
+               "mem":   -20000000,
+               "vcore": -20000,
+       })
+       assert.Assert(t, resources.Equals(groupTracker.headroom(hierarchy1), 
lowerChildHeadroom))
+       lowerParentHeadroom := 
resources.NewResourceFromMap(map[string]resources.Quantity{
+               "mem":   -10000000,
+               "vcore": -10000,
+       })
+       assert.Assert(t, resources.Equals(groupTracker.headroom(hierarchy5), 
lowerParentHeadroom))
+       assert.Assert(t, !groupTracker.canRunApp(hierarchy1, TestApp4))
+       assert.Assert(t, !groupTracker.canRunApp(hierarchy5, TestApp4))
+
+       // clear limits
+       eventSystem.Reset()
+       groupTracker.clearLimits(path1)
+       assert.Assert(t, resources.Equals(groupTracker.headroom(hierarchy1), 
lowerParentHeadroom))
+       assert.Assert(t, resources.Equals(groupTracker.headroom(hierarchy5), 
lowerParentHeadroom))
+       assert.Assert(t, !groupTracker.canRunApp(hierarchy1, TestApp4))
+       assert.Assert(t, !groupTracker.canRunApp(hierarchy5, TestApp4))
+       groupTracker.clearLimits(path5)
+       assert.Assert(t, groupTracker.headroom(hierarchy1) == nil)
+       assert.Assert(t, groupTracker.headroom(hierarchy5) == nil)
+       assert.Assert(t, groupTracker.canRunApp(hierarchy1, TestApp4))
+       assert.Assert(t, groupTracker.canRunApp(hierarchy5, TestApp4))
+       assert.Equal(t, 2, len(eventSystem.Events))
+       assert.Equal(t, si.EventRecord_REMOVE, 
eventSystem.Events[0].EventChangeType)
+       assert.Equal(t, si.EventRecord_UG_GROUP_LIMIT, 
eventSystem.Events[0].EventChangeDetail)
+       assert.Equal(t, si.EventRecord_REMOVE, 
eventSystem.Events[1].EventChangeType)
+       assert.Equal(t, si.EventRecord_UG_GROUP_LIMIT, 
eventSystem.Events[1].EventChangeDetail)
 }
 
 func getGroupResource(gt *GroupTracker) map[string]*resources.Resource {
diff --git a/pkg/scheduler/ugm/manager.go b/pkg/scheduler/ugm/manager.go
index a4cff214..f0a76d56 100644
--- a/pkg/scheduler/ugm/manager.go
+++ b/pkg/scheduler/ugm/manager.go
@@ -29,6 +29,7 @@ import (
        "github.com/apache/yunikorn-core/pkg/common/configs"
        "github.com/apache/yunikorn-core/pkg/common/resources"
        "github.com/apache/yunikorn-core/pkg/common/security"
+       "github.com/apache/yunikorn-core/pkg/events"
        "github.com/apache/yunikorn-core/pkg/log"
 )
 
@@ -45,6 +46,7 @@ type Manager struct {
        configuredGroups          map[string][]string                // Hold 
groups for all configured queue paths.
        userLimits                map[string]map[string]*LimitConfig // Holds 
queue path * user limit config
        groupLimits               map[string]map[string]*LimitConfig // Holds 
queue path * group limit config
+       events                    *ugmEvents
        sync.RWMutex
 }
 
@@ -54,6 +56,7 @@ func newManager() *Manager {
                groupTrackers:             make(map[string]*GroupTracker),
                userWildCardLimitsConfig:  make(map[string]*LimitConfig),
                groupWildCardLimitsConfig: make(map[string]*LimitConfig),
+               events:                    
newUGMEvents(events.GetEventSystem()),
        }
        return manager
 }
@@ -93,7 +96,8 @@ func (m *Manager) IncreaseTrackedResource(queuePath, 
applicationID string, usage
        if !userTracker.hasGroupForApp(applicationID) {
                m.ensureGroupTrackerForApp(queuePath, applicationID, user)
        }
-       return userTracker.increaseTrackedResource(strings.Split(queuePath, 
configs.DOT), applicationID, usage)
+
+       return userTracker.increaseTrackedResource(queuePath, applicationID, 
usage)
 }
 
 // DecreaseTrackedResource Decrease the resource usage for the given user 
group and queue path combination.
@@ -107,7 +111,6 @@ func (m *Manager) DecreaseTrackedResource(queuePath, 
applicationID string, usage
                zap.String("application", applicationID),
                zap.Stringer("resource", usage),
                zap.Bool("removeApp", removeApp))
-       hierarchy := strings.Split(queuePath, configs.DOT)
        if queuePath == common.Empty || applicationID == common.Empty || usage 
== nil || user.User == common.Empty {
                log.Log(log.SchedUGM).Debug("Mandatory parameters are missing 
to decrease the resource usage")
                return false
@@ -119,6 +122,7 @@ func (m *Manager) DecreaseTrackedResource(queuePath, 
applicationID string, usage
                        zap.String("user", user.User))
                return false
        }
+
        // get the group now as the decrease might remove the app from the user 
if removeApp is true
        appGroup := userTracker.getGroupForApp(applicationID)
        log.Log(log.SchedUGM).Debug("Decreasing resource usage for user",
@@ -128,7 +132,7 @@ func (m *Manager) DecreaseTrackedResource(queuePath, 
applicationID string, usage
                zap.String("tracked group", appGroup),
                zap.Stringer("resource", usage),
                zap.Bool("removeApp", removeApp))
-       removeQT, decreased := userTracker.decreaseTrackedResource(hierarchy, 
applicationID, usage, removeApp)
+       removeQT, decreased := userTracker.decreaseTrackedResource(queuePath, 
applicationID, usage, removeApp)
        if !decreased {
                return decreased
        }
@@ -154,7 +158,7 @@ func (m *Manager) DecreaseTrackedResource(queuePath, 
applicationID string, usage
                zap.String("application", applicationID),
                zap.Stringer("resource", usage),
                zap.Bool("removeApp", removeApp))
-       removeQT, decreased = groupTracker.decreaseTrackedResource(hierarchy, 
applicationID, usage, removeApp)
+       removeQT, decreased = groupTracker.decreaseTrackedResource(queuePath, 
applicationID, usage, removeApp)
        if !decreased {
                return decreased
        }
@@ -222,7 +226,7 @@ func (m *Manager) ensureGroupTrackerForApp(queuePath, 
applicationID string, user
                        log.Log(log.SchedUGM).Info("Group tracker doesn't 
exists. Creating appGroup tracker",
                                zap.String("queue path", queuePath),
                                zap.String("appGroup", appGroup))
-                       groupTracker = newGroupTracker(appGroup)
+                       groupTracker = newGroupTracker(appGroup, m.events)
                        m.Lock()
                        m.groupTrackers[appGroup] = groupTracker
                        m.Unlock()
@@ -311,7 +315,6 @@ func (m *Manager) UpdateConfig(config configs.QueueConfig, 
queuePath string) err
 
 func (m *Manager) internalProcessConfig(cur configs.QueueConfig, queuePath 
string, newUserLimits map[string]map[string]*LimitConfig, newGroupLimits 
map[string]map[string]*LimitConfig,
        newUserWildCardLimitsConfig map[string]*LimitConfig, 
newGroupWildCardLimitsConfig map[string]*LimitConfig, newConfiguredGroups 
map[string][]string) error {
-       hierarchy := strings.Split(queuePath, configs.DOT)
        // Traverse limits of specific queue path
        for _, limit := range cur.Limits {
                var maxResource *resources.Resource
@@ -338,7 +341,7 @@ func (m *Manager) internalProcessConfig(cur 
configs.QueueConfig, queuePath strin
                                newUserWildCardLimitsConfig[queuePath] = 
limitConfig
                                continue
                        }
-                       if err := m.setUserLimits(user, limitConfig, 
hierarchy); err != nil {
+                       if err := m.setUserLimits(user, limitConfig, 
queuePath); err != nil {
                                return err
                        }
                        if _, ok := newUserLimits[queuePath]; !ok {
@@ -356,7 +359,7 @@ func (m *Manager) internalProcessConfig(cur 
configs.QueueConfig, queuePath strin
                                zap.String("queue path", queuePath),
                                zap.Uint64("max application", 
limit.MaxApplications),
                                zap.Any("max resources", limit.MaxResources))
-                       if err := m.setGroupLimits(group, limitConfig, 
hierarchy); err != nil {
+                       if err := m.setGroupLimits(group, limitConfig, 
queuePath); err != nil {
                                return err
                        }
                        if _, ok := newGroupLimits[queuePath]; !ok {
@@ -402,7 +405,6 @@ func (m *Manager) 
clearEarlierSetUserWildCardLimits(newUserWildCardLimits map[st
        m.RLock()
        defer m.RUnlock()
        for queuePath, currentLimitConfig := range m.userWildCardLimitsConfig {
-               hierarchy := strings.Split(queuePath, configs.DOT)
                _, currentQPExists := m.userLimits[queuePath]
                _, newQPExists := newUserLimits[queuePath]
 
@@ -414,7 +416,7 @@ func (m *Manager) 
clearEarlierSetUserWildCardLimits(newUserWildCardLimits map[st
                                        log.Log(log.SchedUGM).Debug("Need to 
clear earlier set configs for user because wild card limit has been applied 
earlier",
                                                zap.String("user", ut.userName),
                                                zap.String("queue path", 
queuePath))
-                                       ut.setLimits(hierarchy, nil, 0, false, 
true)
+                                       ut.clearLimits(queuePath, true)
                                }
                        }
                } else if !currentQPExists || !newQPExists {
@@ -428,7 +430,7 @@ func (m *Manager) 
clearEarlierSetUserWildCardLimits(newUserWildCardLimits map[st
                                                zap.String("queue path", 
queuePath))
                                        _, exists := 
m.userLimits[queuePath][ut.userName]
                                        if _, ok = 
newUserLimits[queuePath][ut.userName]; !ok || !exists {
-                                               ut.setLimits(hierarchy, 
newLimitConfig.maxResources, newLimitConfig.maxApplications, true, true)
+                                               ut.setLimits(queuePath, 
newLimitConfig.maxResources, newLimitConfig.maxApplications, true, true)
                                        }
                                }
                        }
@@ -443,7 +445,7 @@ func (m *Manager) 
applyWildCardUserLimits(newUserWildCardLimits map[string]*Limi
        for queuePath, newLimitConfig := range newUserWildCardLimits {
                for _, ut := range m.userTrackers {
                        if _, ok := newUserLimits[queuePath][ut.userName]; !ok {
-                               ut.setLimits(strings.Split(queuePath, "."), 
newLimitConfig.maxResources, newLimitConfig.maxApplications, true, false)
+                               ut.setLimits(queuePath, 
newLimitConfig.maxResources, newLimitConfig.maxApplications, true, false)
                        }
                }
        }
@@ -453,12 +455,11 @@ func (m *Manager) 
applyWildCardUserLimits(newUserWildCardLimits map[string]*Limi
 // by comparing with the existing config. Reset earlier usage only config set 
earlier but not now
 func (m *Manager) clearEarlierSetUserLimits(newUserLimits 
map[string]map[string]*LimitConfig) {
        for queuePath, limitConfig := range m.userLimits {
-               hierarchy := strings.Split(queuePath, configs.DOT)
                // Is queue path exists?
                if newUserLimit, ok := newUserLimits[queuePath]; !ok {
                        for u := range limitConfig {
                                if ut, utExists := m.userTrackers[u]; utExists {
-                                       m.resetUserEarlierUsage(ut, hierarchy)
+                                       m.resetUserEarlierUsage(ut, queuePath)
                                }
                        }
                } else {
@@ -466,7 +467,7 @@ func (m *Manager) clearEarlierSetUserLimits(newUserLimits 
map[string]map[string]
                        for u := range limitConfig {
                                if _, ulExists := newUserLimit[u]; !ulExists {
                                        if ut, utExists := m.userTrackers[u]; 
utExists {
-                                               m.resetUserEarlierUsage(ut, 
hierarchy)
+                                               m.resetUserEarlierUsage(ut, 
queuePath)
                                        }
                                }
                        }
@@ -477,13 +478,14 @@ func (m *Manager) clearEarlierSetUserLimits(newUserLimits 
map[string]map[string]
 // resetUserEarlierUsage Clear or reset earlier usage only when user already 
tracked for the queue path.
 // Reset the max apps and max resources to default, unlink the end leaf queue 
of queue path from its immediate parent and
 // eventually remove user tracker object itself from ugm if it can be removed.
-func (m *Manager) resetUserEarlierUsage(ut *UserTracker, hierarchy []string) {
+func (m *Manager) resetUserEarlierUsage(ut *UserTracker, queuePath string) {
        // Is this user already tracked for the queue path?
+       hierarchy := strings.Split(queuePath, configs.DOT)
        if ut.IsQueuePathTrackedCompletely(hierarchy) {
                log.Log(log.SchedUGM).Debug("Need to clear earlier set configs 
for user",
                        zap.String("user", ut.userName),
                        zap.Strings("queue path", hierarchy))
-               ut.setLimits(hierarchy, nil, 0, false, false)
+               ut.clearLimits(queuePath, false)
                // Is there any running applications in end queue of this queue 
path? If not, then remove the linkage between end queue and its immediate parent
                if ut.IsUnlinkRequired(hierarchy) {
                        ut.UnlinkQT(hierarchy)
@@ -501,12 +503,11 @@ func (m *Manager) resetUserEarlierUsage(ut *UserTracker, 
hierarchy []string) {
 // by comparing with the existing config. Reset earlier usage only config set 
earlier but not now
 func (m *Manager) clearEarlierSetGroupLimits(newGroupLimits 
map[string]map[string]*LimitConfig) {
        for queuePath, limitConfig := range m.groupLimits {
-               hierarchy := strings.Split(queuePath, configs.DOT)
                // Is queue path exists?
                if newGroupLimit, ok := newGroupLimits[queuePath]; !ok {
                        for g := range limitConfig {
                                if gt, gtExists := m.groupTrackers[g]; gtExists 
{
-                                       m.resetGroupEarlierUsage(gt, hierarchy)
+                                       m.resetGroupEarlierUsage(gt, queuePath)
                                }
                        }
                } else {
@@ -514,7 +515,7 @@ func (m *Manager) clearEarlierSetGroupLimits(newGroupLimits 
map[string]map[strin
                        for g := range limitConfig {
                                if _, glExists := newGroupLimit[g]; !glExists {
                                        if gt, gtExists := m.groupTrackers[g]; 
gtExists {
-                                               m.resetGroupEarlierUsage(gt, 
hierarchy)
+                                               m.resetGroupEarlierUsage(gt, 
queuePath)
                                        }
                                }
                        }
@@ -526,7 +527,8 @@ func (m *Manager) clearEarlierSetGroupLimits(newGroupLimits 
map[string]map[strin
 // Decrease the group usage and collect the list of applications for which 
user app group linkage needs to be broken.
 // Reset the max apps and max resources to default, unlink the end leaf queue 
of queue path from its immediate parent and
 // eventually remove group tracker object itself from ugm if it can be removed.
-func (m *Manager) resetGroupEarlierUsage(gt *GroupTracker, hierarchy []string) 
{
+func (m *Manager) resetGroupEarlierUsage(gt *GroupTracker, queuePath string) {
+       hierarchy := strings.Split(queuePath, configs.DOT)
        if gt.IsQueuePathTrackedCompletely(hierarchy) {
                log.Log(log.SchedUGM).Debug("Need to clear earlier set configs 
for group",
                        zap.String("group", gt.groupName),
@@ -536,7 +538,7 @@ func (m *Manager) resetGroupEarlierUsage(gt *GroupTracker, 
hierarchy []string) {
                        ut := m.userTrackers[u]
                        delete(ut.appGroupTrackers, app)
                }
-               gt.setLimits(hierarchy, nil, 0)
+               gt.clearLimits(queuePath)
                // Is there any running applications in end queue of this queue 
path? If not, then remove the linkage between end queue and its immediate parent
                if gt.IsUnlinkRequired(hierarchy) {
                        gt.UnlinkQT(hierarchy)
@@ -561,43 +563,43 @@ func (m *Manager) replaceLimitConfigs(newUserLimits 
map[string]map[string]*Limit
        m.configuredGroups = newConfiguredGroups
 }
 
-func (m *Manager) setUserLimits(user string, limitConfig *LimitConfig, 
hierarchy []string) error {
+func (m *Manager) setUserLimits(user string, limitConfig *LimitConfig, 
queuePath string) error {
        m.Lock()
        defer m.Unlock()
        log.Log(log.SchedUGM).Debug("Setting user limits",
                zap.String("user", user),
-               zap.Strings("queue path", hierarchy),
+               zap.String("queue path", queuePath),
                zap.Uint64("max application", limitConfig.maxApplications),
                zap.Stringer("max resources", limitConfig.maxResources))
        userTracker, ok := m.userTrackers[user]
        if !ok {
                log.Log(log.SchedUGM).Debug("User tracker does not exist. 
Creating user tracker object to set the limit configuration",
                        zap.String("user", user),
-                       zap.Strings("queue path", hierarchy))
-               userTracker = newUserTracker(user)
+                       zap.String("queue path", queuePath))
+               userTracker = newUserTracker(user, m.events)
                m.userTrackers[user] = userTracker
        }
-       userTracker.setLimits(hierarchy, limitConfig.maxResources, 
limitConfig.maxApplications, false, false)
+       userTracker.setLimits(queuePath, limitConfig.maxResources, 
limitConfig.maxApplications, false, false)
        return nil
 }
 
-func (m *Manager) setGroupLimits(group string, limitConfig *LimitConfig, 
hierarchy []string) error {
+func (m *Manager) setGroupLimits(group string, limitConfig *LimitConfig, 
queuePath string) error {
        m.Lock()
        defer m.Unlock()
        log.Log(log.SchedUGM).Debug("Setting group limits",
                zap.String("group", group),
-               zap.Strings("queue path", hierarchy),
+               zap.String("queue path", queuePath),
                zap.Uint64("max application", limitConfig.maxApplications),
                zap.Stringer("max resources", limitConfig.maxResources))
        groupTracker, ok := m.groupTrackers[group]
        if !ok {
                log.Log(log.SchedUGM).Debug("Group tracker does not exist. 
Creating group tracker object to set the limit configuration",
                        zap.String("group", group),
-                       zap.Strings("queue path", hierarchy))
-               groupTracker = newGroupTracker(group)
+                       zap.String("queue path", queuePath))
+               groupTracker = newGroupTracker(group, m.events)
                m.groupTrackers[group] = groupTracker
        }
-       groupTracker.setLimits(hierarchy, limitConfig.maxResources, 
limitConfig.maxApplications)
+       groupTracker.setLimits(queuePath, limitConfig.maxResources, 
limitConfig.maxApplications)
        return nil
 }
 
@@ -612,7 +614,7 @@ func (m *Manager) getUserTracker(user string) *UserTracker {
        }
        log.Log(log.SchedUGM).Info("User tracker doesn't exists. Creating user 
tracker.",
                zap.String("user", user))
-       userTracker := newUserTracker(user)
+       userTracker := newUserTracker(user, m.events)
        m.userTrackers[user] = userTracker
        return userTracker
 }
diff --git a/pkg/scheduler/ugm/ugm_events.go b/pkg/scheduler/ugm/ugm_events.go
new file mode 100644
index 00000000..712253e4
--- /dev/null
+++ b/pkg/scheduler/ugm/ugm_events.go
@@ -0,0 +1,116 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package ugm
+
+import (
+       "github.com/apache/yunikorn-core/pkg/common"
+       "github.com/apache/yunikorn-core/pkg/common/resources"
+       "github.com/apache/yunikorn-core/pkg/events"
+       "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+type ugmEvents struct {
+       eventSystem events.EventSystem
+}
+
+func (evt *ugmEvents) sendIncResourceUsageForUser(user, queuePath string, 
allocated *resources.Resource) {
+       if !evt.eventSystem.IsEventTrackingEnabled() {
+               return
+       }
+       event := events.CreateUserGroupEventRecord(user, common.Empty, 
queuePath, si.EventRecord_ADD, si.EventRecord_UG_USER_RESOURCE, allocated)
+       evt.eventSystem.AddEvent(event)
+}
+
+func (evt *ugmEvents) sendIncResourceUsageForGroup(group, queuePath string, 
allocated *resources.Resource) {
+       if !evt.eventSystem.IsEventTrackingEnabled() {
+               return
+       }
+       event := events.CreateUserGroupEventRecord(group, common.Empty, 
queuePath, si.EventRecord_ADD, si.EventRecord_UG_GROUP_RESOURCE, allocated)
+       evt.eventSystem.AddEvent(event)
+}
+
+func (evt *ugmEvents) sendDecResourceUsageForUser(user, queuePath string, 
allocated *resources.Resource) {
+       if !evt.eventSystem.IsEventTrackingEnabled() {
+               return
+       }
+       event := events.CreateUserGroupEventRecord(user, common.Empty, 
queuePath, si.EventRecord_REMOVE, si.EventRecord_UG_USER_RESOURCE, allocated)
+       evt.eventSystem.AddEvent(event)
+}
+
+func (evt *ugmEvents) sendDecResourceUsageForGroup(group, queuePath string, 
allocated *resources.Resource) {
+       if !evt.eventSystem.IsEventTrackingEnabled() {
+               return
+       }
+       event := events.CreateUserGroupEventRecord(group, common.Empty, 
queuePath, si.EventRecord_REMOVE, si.EventRecord_UG_GROUP_RESOURCE, allocated)
+       evt.eventSystem.AddEvent(event)
+}
+
+func (evt *ugmEvents) sendAppGroupLinked(group, applicationID string) {
+       if !evt.eventSystem.IsEventTrackingEnabled() {
+               return
+       }
+       event := events.CreateUserGroupEventRecord(group, common.Empty, 
applicationID, si.EventRecord_SET, si.EventRecord_UG_APP_LINK, nil)
+       evt.eventSystem.AddEvent(event)
+}
+
+func (evt *ugmEvents) sendAppGroupUnlinked(group, applicationID string) {
+       if !evt.eventSystem.IsEventTrackingEnabled() {
+               return
+       }
+       event := events.CreateUserGroupEventRecord(group, common.Empty, 
applicationID, si.EventRecord_REMOVE, si.EventRecord_UG_APP_LINK, nil)
+       evt.eventSystem.AddEvent(event)
+}
+
+func (evt *ugmEvents) sendLimitSetForUser(user, queuePath string) {
+       if !evt.eventSystem.IsEventTrackingEnabled() {
+               return
+       }
+       event := events.CreateUserGroupEventRecord(user, common.Empty, 
queuePath, si.EventRecord_SET, si.EventRecord_UG_USER_LIMIT, nil)
+       evt.eventSystem.AddEvent(event)
+}
+
+func (evt *ugmEvents) sendLimitSetForGroup(group, queuePath string) {
+       if !evt.eventSystem.IsEventTrackingEnabled() {
+               return
+       }
+       event := events.CreateUserGroupEventRecord(group, common.Empty, 
queuePath, si.EventRecord_SET, si.EventRecord_UG_GROUP_LIMIT, nil)
+       evt.eventSystem.AddEvent(event)
+}
+
+func (evt *ugmEvents) sendLimitRemoveForUser(user, queuePath string) {
+       if !evt.eventSystem.IsEventTrackingEnabled() {
+               return
+       }
+       event := events.CreateUserGroupEventRecord(user, common.Empty, 
queuePath, si.EventRecord_REMOVE, si.EventRecord_UG_USER_LIMIT, nil)
+       evt.eventSystem.AddEvent(event)
+}
+
+func (evt *ugmEvents) sendLimitRemoveForGroup(group, queuePath string) {
+       if !evt.eventSystem.IsEventTrackingEnabled() {
+               return
+       }
+       event := events.CreateUserGroupEventRecord(group, common.Empty, 
queuePath, si.EventRecord_REMOVE, si.EventRecord_UG_GROUP_LIMIT, nil)
+       evt.eventSystem.AddEvent(event)
+}
+
+func newUGMEvents(evt events.EventSystem) *ugmEvents {
+       return &ugmEvents{
+               eventSystem: evt,
+       }
+}
diff --git a/pkg/scheduler/ugm/ugm_events_test.go 
b/pkg/scheduler/ugm/ugm_events_test.go
new file mode 100644
index 00000000..7d9cc9cd
--- /dev/null
+++ b/pkg/scheduler/ugm/ugm_events_test.go
@@ -0,0 +1,239 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package ugm
+
+import (
+       "testing"
+
+       "gotest.tools/v3/assert"
+
+       "github.com/apache/yunikorn-core/pkg/common/resources"
+       "github.com/apache/yunikorn-core/pkg/events/mock"
+       "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+var usage = resources.NewResourceFromMap(map[string]resources.Quantity{"cpu": 
123})
+
+func TestIncUsageUser(t *testing.T) {
+       eventSystem := mock.NewEventSystemDisabled()
+       events := newUGMEvents(eventSystem)
+       events.sendIncResourceUsageForUser("user1", path1, usage)
+       assert.Equal(t, 0, len(eventSystem.Events))
+
+       eventSystem = mock.NewEventSystem()
+       events = newUGMEvents(eventSystem)
+       events.sendIncResourceUsageForUser("user1", path1, usage)
+       assert.Equal(t, 1, len(eventSystem.Events))
+       event := eventSystem.Events[0]
+       assert.Equal(t, "user1", event.ObjectID)
+       assert.Equal(t, path1, event.ReferenceID)
+       assert.Equal(t, si.EventRecord_USERGROUP, event.Type)
+       assert.Equal(t, si.EventRecord_UG_USER_RESOURCE, 
event.EventChangeDetail)
+       assert.Equal(t, si.EventRecord_ADD, event.EventChangeType)
+       assert.Equal(t, "", event.Message)
+       assert.Assert(t, event.Resource != nil)
+       assert.Equal(t, 1, len(event.Resource.Resources))
+       assert.Equal(t, int64(123), event.Resource.Resources["cpu"].Value)
+}
+
+func TestIncUsageGroup(t *testing.T) {
+       eventSystem := mock.NewEventSystemDisabled()
+       events := newUGMEvents(eventSystem)
+       events.sendIncResourceUsageForGroup("group1", path1, usage)
+       assert.Equal(t, 0, len(eventSystem.Events))
+
+       eventSystem = mock.NewEventSystem()
+       events = newUGMEvents(eventSystem)
+       events.sendIncResourceUsageForGroup("group1", path1, usage)
+       assert.Equal(t, 1, len(eventSystem.Events))
+       event := eventSystem.Events[0]
+       assert.Equal(t, "group1", event.ObjectID)
+       assert.Equal(t, path1, event.ReferenceID)
+       assert.Equal(t, si.EventRecord_USERGROUP, event.Type)
+       assert.Equal(t, si.EventRecord_UG_GROUP_RESOURCE, 
event.EventChangeDetail)
+       assert.Equal(t, si.EventRecord_ADD, event.EventChangeType)
+       assert.Equal(t, "", event.Message)
+       assert.Assert(t, event.Resource != nil)
+       assert.Equal(t, 1, len(event.Resource.Resources))
+       assert.Equal(t, int64(123), event.Resource.Resources["cpu"].Value)
+}
+
+func TestDecUsageUser(t *testing.T) {
+       eventSystem := mock.NewEventSystemDisabled()
+       events := newUGMEvents(eventSystem)
+       events.sendDecResourceUsageForUser("user1", path1, usage)
+       assert.Equal(t, 0, len(eventSystem.Events))
+
+       eventSystem = mock.NewEventSystem()
+       events = newUGMEvents(eventSystem)
+       events.sendDecResourceUsageForUser("user1", path1, usage)
+       assert.Equal(t, 1, len(eventSystem.Events))
+       event := eventSystem.Events[0]
+       assert.Equal(t, "user1", event.ObjectID)
+       assert.Equal(t, path1, event.ReferenceID)
+       assert.Equal(t, si.EventRecord_USERGROUP, event.Type)
+       assert.Equal(t, si.EventRecord_UG_USER_RESOURCE, 
event.EventChangeDetail)
+       assert.Equal(t, si.EventRecord_REMOVE, event.EventChangeType)
+       assert.Equal(t, "", event.Message)
+       assert.Assert(t, event.Resource != nil)
+       assert.Equal(t, 1, len(event.Resource.Resources))
+       assert.Equal(t, int64(123), event.Resource.Resources["cpu"].Value)
+}
+
+func TestDecUsageGroup(t *testing.T) {
+       eventSystem := mock.NewEventSystemDisabled()
+       events := newUGMEvents(eventSystem)
+       events.sendDecResourceUsageForGroup("group1", path1, usage)
+       assert.Equal(t, 0, len(eventSystem.Events))
+
+       eventSystem = mock.NewEventSystem()
+       events = newUGMEvents(eventSystem)
+       events.sendDecResourceUsageForGroup("group1", path1, usage)
+       assert.Equal(t, 1, len(eventSystem.Events))
+       event := eventSystem.Events[0]
+       assert.Equal(t, "group1", event.ObjectID)
+       assert.Equal(t, path1, event.ReferenceID)
+       assert.Equal(t, si.EventRecord_USERGROUP, event.Type)
+       assert.Equal(t, si.EventRecord_UG_GROUP_RESOURCE, 
event.EventChangeDetail)
+       assert.Equal(t, si.EventRecord_REMOVE, event.EventChangeType)
+       assert.Equal(t, "", event.Message)
+       assert.Assert(t, event.Resource != nil)
+       assert.Equal(t, 1, len(event.Resource.Resources))
+       assert.Equal(t, int64(123), event.Resource.Resources["cpu"].Value)
+}
+
+func TestAppGroupLinked(t *testing.T) {
+       eventSystem := mock.NewEventSystemDisabled()
+       events := newUGMEvents(eventSystem)
+       events.sendAppGroupLinked("group1", "app1")
+       assert.Equal(t, 0, len(eventSystem.Events))
+
+       eventSystem = mock.NewEventSystem()
+       events = newUGMEvents(eventSystem)
+       events.sendAppGroupLinked("group1", "app1")
+       assert.Equal(t, 1, len(eventSystem.Events))
+       event := eventSystem.Events[0]
+       assert.Equal(t, "group1", event.ObjectID)
+       assert.Equal(t, "app1", event.ReferenceID)
+       assert.Equal(t, si.EventRecord_USERGROUP, event.Type)
+       assert.Equal(t, si.EventRecord_UG_APP_LINK, event.EventChangeDetail)
+       assert.Equal(t, si.EventRecord_SET, event.EventChangeType)
+       assert.Equal(t, "", event.Message)
+       assert.Equal(t, 0, len(event.Resource.Resources))
+}
+
+func TestAppGroupUnlinked(t *testing.T) {
+       eventSystem := mock.NewEventSystemDisabled()
+       events := newUGMEvents(eventSystem)
+       events.sendAppGroupUnlinked("group1", "app1")
+       assert.Equal(t, 0, len(eventSystem.Events))
+
+       eventSystem = mock.NewEventSystem()
+       events = newUGMEvents(eventSystem)
+       events.sendAppGroupUnlinked("group1", "app1")
+       assert.Equal(t, 1, len(eventSystem.Events))
+       event := eventSystem.Events[0]
+       assert.Equal(t, "group1", event.ObjectID)
+       assert.Equal(t, "app1", event.ReferenceID)
+       assert.Equal(t, si.EventRecord_USERGROUP, event.Type)
+       assert.Equal(t, si.EventRecord_UG_APP_LINK, event.EventChangeDetail)
+       assert.Equal(t, si.EventRecord_REMOVE, event.EventChangeType)
+       assert.Equal(t, "", event.Message)
+       assert.Equal(t, 0, len(event.Resource.Resources))
+}
+
+func TestUserLimitSet(t *testing.T) {
+       eventSystem := mock.NewEventSystemDisabled()
+       events := newUGMEvents(eventSystem)
+       events.sendLimitSetForUser("user1", path1)
+       assert.Equal(t, 0, len(eventSystem.Events))
+
+       eventSystem = mock.NewEventSystem()
+       events = newUGMEvents(eventSystem)
+       events.sendLimitSetForUser("user1", path1)
+       assert.Equal(t, 1, len(eventSystem.Events))
+       event := eventSystem.Events[0]
+       assert.Equal(t, "user1", event.ObjectID)
+       assert.Equal(t, path1, event.ReferenceID)
+       assert.Equal(t, si.EventRecord_USERGROUP, event.Type)
+       assert.Equal(t, si.EventRecord_UG_USER_LIMIT, event.EventChangeDetail)
+       assert.Equal(t, si.EventRecord_SET, event.EventChangeType)
+       assert.Equal(t, "", event.Message)
+       assert.Equal(t, 0, len(event.Resource.Resources))
+}
+
+func TestUserLimitCleared(t *testing.T) {
+       eventSystem := mock.NewEventSystemDisabled()
+       events := newUGMEvents(eventSystem)
+       events.sendLimitRemoveForUser("user1", path1)
+       assert.Equal(t, 0, len(eventSystem.Events))
+
+       eventSystem = mock.NewEventSystem()
+       events = newUGMEvents(eventSystem)
+       events.sendLimitRemoveForUser("user1", path1)
+       assert.Equal(t, 1, len(eventSystem.Events))
+       event := eventSystem.Events[0]
+       assert.Equal(t, "user1", event.ObjectID)
+       assert.Equal(t, path1, event.ReferenceID)
+       assert.Equal(t, si.EventRecord_USERGROUP, event.Type)
+       assert.Equal(t, si.EventRecord_UG_USER_LIMIT, event.EventChangeDetail)
+       assert.Equal(t, si.EventRecord_REMOVE, event.EventChangeType)
+       assert.Equal(t, "", event.Message)
+       assert.Equal(t, 0, len(event.Resource.Resources))
+}
+
+func TestGroupLimitSet(t *testing.T) {
+       eventSystem := mock.NewEventSystemDisabled()
+       events := newUGMEvents(eventSystem)
+       events.sendLimitSetForGroup("group1", path1)
+       assert.Equal(t, 0, len(eventSystem.Events))
+
+       eventSystem = mock.NewEventSystem()
+       events = newUGMEvents(eventSystem)
+       events.sendLimitSetForGroup("group1", path1)
+       assert.Equal(t, 1, len(eventSystem.Events))
+       event := eventSystem.Events[0]
+       assert.Equal(t, "group1", event.ObjectID)
+       assert.Equal(t, path1, event.ReferenceID)
+       assert.Equal(t, si.EventRecord_USERGROUP, event.Type)
+       assert.Equal(t, si.EventRecord_UG_GROUP_LIMIT, event.EventChangeDetail)
+       assert.Equal(t, si.EventRecord_SET, event.EventChangeType)
+       assert.Equal(t, "", event.Message)
+       assert.Equal(t, 0, len(event.Resource.Resources))
+}
+
+func TestGroupLimitCleared(t *testing.T) {
+       eventSystem := mock.NewEventSystemDisabled()
+       events := newUGMEvents(eventSystem)
+       events.sendLimitRemoveForGroup("group1", path1)
+       assert.Equal(t, 0, len(eventSystem.Events))
+
+       eventSystem = mock.NewEventSystem()
+       events = newUGMEvents(eventSystem)
+       events.sendLimitRemoveForGroup("group1", path1)
+       assert.Equal(t, 1, len(eventSystem.Events))
+       event := eventSystem.Events[0]
+       assert.Equal(t, "group1", event.ObjectID)
+       assert.Equal(t, path1, event.ReferenceID)
+       assert.Equal(t, si.EventRecord_USERGROUP, event.Type)
+       assert.Equal(t, si.EventRecord_UG_GROUP_LIMIT, event.EventChangeDetail)
+       assert.Equal(t, si.EventRecord_REMOVE, event.EventChangeType)
+       assert.Equal(t, "", event.Message)
+       assert.Equal(t, 0, len(event.Resource.Resources))
+}
diff --git a/pkg/scheduler/ugm/user_tracker.go 
b/pkg/scheduler/ugm/user_tracker.go
index b93b5396..1f1ecfee 100644
--- a/pkg/scheduler/ugm/user_tracker.go
+++ b/pkg/scheduler/ugm/user_tracker.go
@@ -19,11 +19,13 @@
 package ugm
 
 import (
+       "strings"
        "sync"
 
        "go.uber.org/zap"
 
        "github.com/apache/yunikorn-core/pkg/common"
+       "github.com/apache/yunikorn-core/pkg/common/configs"
        "github.com/apache/yunikorn-core/pkg/common/resources"
        "github.com/apache/yunikorn-core/pkg/log"
        "github.com/apache/yunikorn-core/pkg/webservice/dao"
@@ -38,37 +40,41 @@ type UserTracker struct {
        // and group tracker object as value.
        appGroupTrackers map[string]*GroupTracker
        queueTracker     *QueueTracker // Holds the actual resource usage of 
queue path where application runs
+       events           *ugmEvents
 
        sync.RWMutex
 }
 
-func newUserTracker(userName string) *UserTracker {
+func newUserTracker(userName string, ugmEvents *ugmEvents) *UserTracker {
        queueTracker := newRootQueueTracker(user)
        userTracker := &UserTracker{
                userName:         userName,
                appGroupTrackers: make(map[string]*GroupTracker),
                queueTracker:     queueTracker,
+               events:           ugmEvents,
        }
        return userTracker
 }
 
-func (ut *UserTracker) increaseTrackedResource(hierarchy []string, 
applicationID string, usage *resources.Resource) bool {
+func (ut *UserTracker) increaseTrackedResource(queuePath string, applicationID 
string, usage *resources.Resource) bool {
        ut.Lock()
        defer ut.Unlock()
+       hierarchy := strings.Split(queuePath, configs.DOT)
+       ut.events.sendIncResourceUsageForUser(ut.userName, queuePath, usage)
        increased := ut.queueTracker.increaseTrackedResource(hierarchy, 
applicationID, user, usage)
        if increased {
                gt := ut.appGroupTrackers[applicationID]
                log.Log(log.SchedUGM).Debug("Increasing resource usage for 
group",
                        zap.String("group", gt.getName()),
-                       zap.Strings("queue path", hierarchy),
+                       zap.String("queue path", queuePath),
                        zap.String("application", applicationID),
                        zap.Stringer("resource", usage))
-               increasedGroupUsage := gt.increaseTrackedResource(hierarchy, 
applicationID, usage, ut.userName)
+               increasedGroupUsage := gt.increaseTrackedResource(queuePath, 
applicationID, usage, ut.userName)
                if !increasedGroupUsage {
                        _, decreased := 
ut.queueTracker.decreaseTrackedResource(hierarchy, applicationID, usage, false)
                        if !decreased {
                                log.Log(log.SchedUGM).Error("User resource 
usage rollback has failed",
-                                       zap.Strings("queue path", hierarchy),
+                                       zap.String("queue path", queuePath),
                                        zap.String("application", 
applicationID),
                                        zap.String("user", ut.userName))
                        }
@@ -78,13 +84,19 @@ func (ut *UserTracker) increaseTrackedResource(hierarchy 
[]string, applicationID
        return increased
 }
 
-func (ut *UserTracker) decreaseTrackedResource(hierarchy []string, 
applicationID string, usage *resources.Resource, removeApp bool) (bool, bool) {
+func (ut *UserTracker) decreaseTrackedResource(queuePath string, applicationID 
string, usage *resources.Resource, removeApp bool) (bool, bool) {
        ut.Lock()
        defer ut.Unlock()
+       ut.events.sendDecResourceUsageForUser(ut.userName, queuePath, usage)
        if removeApp {
+               tracker := ut.appGroupTrackers[applicationID]
+               if tracker != nil {
+                       appGroup := tracker.groupName
+                       ut.events.sendAppGroupUnlinked(appGroup, applicationID)
+               }
                delete(ut.appGroupTrackers, applicationID)
        }
-       return ut.queueTracker.decreaseTrackedResource(hierarchy, 
applicationID, usage, removeApp)
+       return ut.queueTracker.decreaseTrackedResource(strings.Split(queuePath, 
configs.DOT), applicationID, usage, removeApp)
 }
 
 func (ut *UserTracker) hasGroupForApp(applicationID string) bool {
@@ -97,6 +109,9 @@ func (ut *UserTracker) hasGroupForApp(applicationID string) 
bool {
 func (ut *UserTracker) setGroupForApp(applicationID string, groupTrack 
*GroupTracker) {
        ut.Lock()
        defer ut.Unlock()
+       if groupTrack != nil {
+               ut.events.sendAppGroupLinked(groupTrack.groupName, 
applicationID)
+       }
        ut.appGroupTrackers[applicationID] = groupTrack
 }
 
@@ -115,10 +130,18 @@ func (ut *UserTracker) getTrackedApplications() 
map[string]*GroupTracker {
        return ut.appGroupTrackers
 }
 
-func (ut *UserTracker) setLimits(hierarchy []string, resource 
*resources.Resource, maxApps uint64, useWildCard bool, doWildCardCheck bool) {
+func (ut *UserTracker) setLimits(queuePath string, resource 
*resources.Resource, maxApps uint64, useWildCard bool, doWildCardCheck bool) {
+       ut.Lock()
+       defer ut.Unlock()
+       ut.events.sendLimitSetForUser(ut.userName, queuePath)
+       ut.queueTracker.setLimit(strings.Split(queuePath, configs.DOT), 
resource, maxApps, useWildCard, user, doWildCardCheck)
+}
+
+func (ut *UserTracker) clearLimits(queuePath string, doWildCardCheck bool) {
        ut.Lock()
        defer ut.Unlock()
-       ut.queueTracker.setLimit(hierarchy, resource, maxApps, useWildCard, 
user, doWildCardCheck)
+       ut.events.sendLimitRemoveForUser(ut.userName, queuePath)
+       ut.queueTracker.setLimit(strings.Split(queuePath, configs.DOT), nil, 0, 
false, user, doWildCardCheck)
 }
 
 func (ut *UserTracker) headroom(hierarchy []string) *resources.Resource {
diff --git a/pkg/scheduler/ugm/user_tracker_test.go 
b/pkg/scheduler/ugm/user_tracker_test.go
index 258fb15d..28db413f 100644
--- a/pkg/scheduler/ugm/user_tracker_test.go
+++ b/pkg/scheduler/ugm/user_tracker_test.go
@@ -19,12 +19,16 @@
 package ugm
 
 import (
+       "strings"
        "testing"
 
        "gotest.tools/v3/assert"
 
+       "github.com/apache/yunikorn-core/pkg/common/configs"
        "github.com/apache/yunikorn-core/pkg/common/resources"
        "github.com/apache/yunikorn-core/pkg/common/security"
+       "github.com/apache/yunikorn-core/pkg/events/mock"
+       "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
 )
 
 const (
@@ -39,11 +43,13 @@ const (
        queuePath4 = "root.parent.child12"
 )
 
-var hierarchy1 = []string{"root", "parent", "child1"}
-var hierarchy2 = []string{"root", "parent", "child2"}
-var hierarchy3 = []string{"root", "parent", "child1", "child12"}
-var hierarchy4 = []string{"root", "parent", "child12"}
-var hierarchy5 = []string{"root", "parent"}
+const (
+       path1 = "root.parent.child1"
+       path2 = "root.parent.child2"
+       path3 = "root.parent.child1.child12"
+       path4 = "root.parent.child12"
+       path5 = "root.parent"
+)
 
 func TestIncreaseTrackedResource(t *testing.T) {
        // Queue setup:
@@ -53,35 +59,45 @@ func TestIncreaseTrackedResource(t *testing.T) {
        // Initialize ugm
        GetUserManager()
        user := security.UserGroup{User: "test", Groups: []string{"test"}}
-       userTracker := newUserTracker(user.User)
+       eventSystem := mock.NewEventSystem()
+       userTracker := newUserTracker(user.User, newUGMEvents(eventSystem))
        usage1, err := resources.NewResourceFromConf(map[string]string{"mem": 
"10M", "vcore": "10"})
        if err != nil {
                t.Errorf("new resource create returned error or wrong resource: 
error %t, res %v", err, usage1)
        }
-       result := userTracker.increaseTrackedResource(hierarchy1, TestApp1, 
usage1)
+       result := userTracker.increaseTrackedResource(path1, TestApp1, usage1)
        if !result {
-               t.Fatalf("unable to increase tracked resource: queuepath %+q, 
app %s, res %v", hierarchy1, TestApp1, usage1)
+               t.Fatalf("unable to increase tracked resource: queuepath %s, 
app %s, res %v", path1, TestApp1, usage1)
        }
-       groupTracker := newGroupTracker(user.User)
+       groupTracker := newGroupTracker(user.User, 
newUGMEvents(mock.NewEventSystemDisabled()))
        userTracker.setGroupForApp(TestApp1, groupTracker)
-
        usage2, err := resources.NewResourceFromConf(map[string]string{"mem": 
"20M", "vcore": "20"})
        if err != nil {
                t.Errorf("new resource create returned error or wrong resource: 
error %t, res %v", err, usage2)
        }
-       result = userTracker.increaseTrackedResource(hierarchy2, TestApp2, 
usage2)
+       result = userTracker.increaseTrackedResource(path2, TestApp2, usage2)
        if !result {
-               t.Fatalf("unable to increase tracked resource: queuepath %+q, 
app %s, res %v, error %t", hierarchy2, TestApp2, usage2, err)
+               t.Fatalf("unable to increase tracked resource: queuepath %s, 
app %s, res %v, error %t", path2, TestApp2, usage2, err)
        }
+       assert.Equal(t, 3, len(eventSystem.Events))
+       assert.Equal(t, si.EventRecord_UG_USER_RESOURCE, 
eventSystem.Events[0].EventChangeDetail)
+       assert.Equal(t, si.EventRecord_ADD, 
eventSystem.Events[0].EventChangeType)
+       assert.Equal(t, si.EventRecord_UG_APP_LINK, 
eventSystem.Events[1].EventChangeDetail)
+       assert.Equal(t, "test", eventSystem.Events[1].ObjectID)
+       assert.Equal(t, TestApp1, eventSystem.Events[1].ReferenceID)
+       assert.Equal(t, si.EventRecord_SET, 
eventSystem.Events[1].EventChangeType)
+       assert.Equal(t, si.EventRecord_UG_USER_RESOURCE, 
eventSystem.Events[2].EventChangeDetail)
+       assert.Equal(t, si.EventRecord_ADD, 
eventSystem.Events[2].EventChangeType)
+
        userTracker.setGroupForApp(TestApp2, groupTracker)
 
        usage3, err := resources.NewResourceFromConf(map[string]string{"mem": 
"30M", "vcore": "30"})
        if err != nil {
                t.Errorf("new resource create returned error or wrong resource: 
error %t, res %v", err, usage3)
        }
-       result = userTracker.increaseTrackedResource(hierarchy3, TestApp3, 
usage3)
+       result = userTracker.increaseTrackedResource(path3, TestApp3, usage3)
        if !result {
-               t.Fatalf("unable to increase tracked resource: queuepath %+q, 
app %s, res %v, error %t", hierarchy3, TestApp3, usage3, err)
+               t.Fatalf("unable to increase tracked resource: queuepath %s, 
app %s, res %v, error %t", path3, TestApp3, usage3, err)
        }
        userTracker.setGroupForApp(TestApp3, groupTracker)
 
@@ -89,9 +105,9 @@ func TestIncreaseTrackedResource(t *testing.T) {
        if err != nil {
                t.Errorf("new resource create returned error or wrong resource: 
error %t, res %v", err, usage3)
        }
-       result = userTracker.increaseTrackedResource(hierarchy4, TestApp4, 
usage4)
+       result = userTracker.increaseTrackedResource(path4, TestApp4, usage4)
        if !result {
-               t.Fatalf("unable to increase tracked resource: queuepath %+q, 
app %s, res %v, error %t", hierarchy4, TestApp4, usage4, err)
+               t.Fatalf("unable to increase tracked resource: queuepath %s, 
app %s, res %v, error %t", path4, TestApp4, usage4, err)
        }
        userTracker.setGroupForApp(TestApp4, groupTracker)
 
@@ -113,17 +129,18 @@ func TestDecreaseTrackedResource(t *testing.T) {
        // Initialize ugm
        GetUserManager()
        user := security.UserGroup{User: "test", Groups: []string{"test"}}
-       userTracker := newUserTracker(user.User)
+       eventSystem := mock.NewEventSystem()
+       userTracker := newUserTracker(user.User, newUGMEvents(eventSystem))
 
        usage1, err := resources.NewResourceFromConf(map[string]string{"mem": 
"70M", "vcore": "70"})
        if err != nil {
                t.Errorf("new resource create returned error or wrong resource: 
error %t, res %v", err, usage1)
        }
-       result := userTracker.increaseTrackedResource(hierarchy1, TestApp1, 
usage1)
+       result := userTracker.increaseTrackedResource(path1, TestApp1, usage1)
        if !result {
-               t.Fatalf("unable to increase tracked resource: queuepath %+q, 
app %s, res %v, error %t", hierarchy1, TestApp1, usage1, err)
+               t.Fatalf("unable to increase tracked resource: queuepath %s, 
app %s, res %v, error %t", path1, TestApp1, usage1, err)
        }
-       groupTracker := newGroupTracker(user.User)
+       groupTracker := newGroupTracker(user.User, 
newUGMEvents(mock.NewEventSystemDisabled()))
        userTracker.setGroupForApp(TestApp1, groupTracker)
        assert.Equal(t, 1, len(userTracker.getTrackedApplications()))
 
@@ -131,9 +148,9 @@ func TestDecreaseTrackedResource(t *testing.T) {
        if err != nil {
                t.Errorf("new resource create returned error or wrong resource: 
error %t, res %v", err, usage2)
        }
-       result = userTracker.increaseTrackedResource(hierarchy2, TestApp2, 
usage2)
+       result = userTracker.increaseTrackedResource(path2, TestApp2, usage2)
        if !result {
-               t.Fatalf("unable to increase tracked resource: queuepath %+q, 
app %s, res %v, error %t", hierarchy2, TestApp2, usage2, err)
+               t.Fatalf("unable to increase tracked resource: queuepath %s, 
app %s, res %v, error %t", path2, TestApp2, usage2, err)
        }
        userTracker.setGroupForApp(TestApp2, groupTracker)
        actualResources := getUserResource(userTracker)
@@ -148,15 +165,18 @@ func TestDecreaseTrackedResource(t *testing.T) {
        if err != nil {
                t.Errorf("new resource create returned error or wrong resource: 
error %t, res %v", err, usage3)
        }
-       removeQT, decreased := userTracker.decreaseTrackedResource(hierarchy1, 
TestApp1, usage3, false)
+       eventSystem.Reset()
+       removeQT, decreased := userTracker.decreaseTrackedResource(path1, 
TestApp1, usage3, false)
        if !decreased {
-               t.Fatalf("unable to decrease tracked resource: queuepath %+q, 
app %s, res %v, error %t", hierarchy1, TestApp1, usage3, err)
+               t.Fatalf("unable to decrease tracked resource: queuepath %s, 
app %s, res %v, error %t", path1, TestApp1, usage3, err)
        }
        assert.Equal(t, removeQT, false, "wrong remove queue tracker value")
+       assert.Equal(t, si.EventRecord_UG_USER_RESOURCE, 
eventSystem.Events[0].EventChangeDetail)
+       assert.Equal(t, si.EventRecord_REMOVE, 
eventSystem.Events[0].EventChangeType)
 
-       removeQT, decreased = userTracker.decreaseTrackedResource(hierarchy2, 
TestApp2, usage3, false)
+       removeQT, decreased = userTracker.decreaseTrackedResource(path2, 
TestApp2, usage3, false)
        if !decreased {
-               t.Fatalf("unable to decrease tracked resource: queuepath %+q, 
app %s, res %v, error %t", hierarchy2, TestApp2, usage3, err)
+               t.Fatalf("unable to decrease tracked resource: queuepath %s, 
app %s, res %v, error %t", path1, TestApp2, usage3, err)
        }
        actualResources1 := getUserResource(userTracker)
 
@@ -171,55 +191,115 @@ func TestDecreaseTrackedResource(t *testing.T) {
                t.Errorf("new resource create returned error or wrong resource: 
error %t, res %v", err, usage3)
        }
 
-       removeQT, decreased = userTracker.decreaseTrackedResource(hierarchy1, 
TestApp1, usage4, true)
+       eventSystem.Reset()
+       removeQT, decreased = userTracker.decreaseTrackedResource(path1, 
TestApp1, usage4, true)
        if !decreased {
-               t.Fatalf("unable to increase tracked resource: queuepath %+q, 
app %s, res %v, error %t", hierarchy1, TestApp1, usage1, err)
+               t.Fatalf("unable to increase tracked resource: queuepath %s, 
app %s, res %v, error %t", path1, TestApp1, usage1, err)
        }
        assert.Equal(t, 1, len(userTracker.getTrackedApplications()))
        assert.Equal(t, removeQT, false, "wrong remove queue tracker value")
+       assert.Equal(t, 2, len(eventSystem.Events))
+       assert.Equal(t, si.EventRecord_UG_USER_RESOURCE, 
eventSystem.Events[0].EventChangeDetail)
+       assert.Equal(t, si.EventRecord_REMOVE, 
eventSystem.Events[0].EventChangeType)
+       assert.Equal(t, si.EventRecord_UG_APP_LINK, 
eventSystem.Events[1].EventChangeDetail)
+       assert.Equal(t, si.EventRecord_REMOVE, 
eventSystem.Events[1].EventChangeType)
 
        usage5, err := resources.NewResourceFromConf(map[string]string{"mem": 
"10M", "vcore": "10"})
        if err != nil {
                t.Errorf("new resource create returned error or wrong resource: 
error %t, res %v", err, usage5)
        }
-       removeQT, decreased = userTracker.decreaseTrackedResource(hierarchy2, 
TestApp2, usage5, true)
+       removeQT, decreased = userTracker.decreaseTrackedResource(path2, 
TestApp2, usage5, true)
        if !decreased {
-               t.Fatalf("unable to increase tracked resource: queuepath %+q, 
app %s, res %v, error %t", hierarchy2, TestApp2, usage2, err)
+               t.Fatalf("unable to increase tracked resource: queuepath %s, 
app %s, res %v, error %t", path2, TestApp2, usage2, err)
        }
        assert.Equal(t, 0, len(userTracker.getTrackedApplications()))
        assert.Equal(t, removeQT, true, "wrong remove queue tracker value")
 }
 
-func TestSetMaxLimits(t *testing.T) {
+func TestSetAndClearMaxLimits(t *testing.T) {
        // Queue setup:
        // root->parent->child1
        // Initialize ugm
        GetUserManager()
        user := security.UserGroup{User: "test", Groups: []string{"test"}}
-       userTracker := newUserTracker(user.User)
+       eventSystem := mock.NewEventSystem()
+       userTracker := newUserTracker(user.User, newUGMEvents(eventSystem))
        usage1, err := resources.NewResourceFromConf(map[string]string{"mem": 
"10M", "vcore": "10"})
        if err != nil {
                t.Errorf("new resource create returned error or wrong resource: 
error %t, res %v", err, usage1)
        }
-       result := userTracker.increaseTrackedResource(hierarchy1, TestApp1, 
usage1)
+       result := userTracker.increaseTrackedResource(path1, TestApp1, usage1)
        if !result {
-               t.Fatalf("unable to increase tracked resource: queuepath %+q, 
app %s, res %v, error %t", hierarchy1, TestApp1, usage1, err)
+               t.Fatalf("unable to increase tracked resource: queuepath %s, 
app %s, res %v, error %t", path1, TestApp1, usage1, err)
        }
 
-       userTracker.setLimits(hierarchy1, resources.Multiply(usage1, 5), 5, 
false, false)
-       userTracker.setLimits(hierarchy5, resources.Multiply(usage1, 10), 10, 
false, false)
+       eventSystem.Reset()
+       userTracker.setLimits(path1, resources.Multiply(usage1, 5), 5, false, 
false)
+       userTracker.setLimits(path5, resources.Multiply(usage1, 10), 10, false, 
false)
+       assert.Equal(t, 2, len(eventSystem.Events))
+       assert.Equal(t, si.EventRecord_UG_USER_LIMIT, 
eventSystem.Events[0].EventChangeDetail)
+       assert.Equal(t, si.EventRecord_SET, 
eventSystem.Events[0].EventChangeType)
+       assert.Equal(t, path1, eventSystem.Events[0].ReferenceID)
+       assert.Equal(t, si.EventRecord_UG_USER_LIMIT, 
eventSystem.Events[1].EventChangeDetail)
+       assert.Equal(t, si.EventRecord_SET, 
eventSystem.Events[1].EventChangeType)
+       assert.Equal(t, path5, eventSystem.Events[1].ReferenceID)
 
-       result = userTracker.increaseTrackedResource(hierarchy1, TestApp1, 
usage1)
+       result = userTracker.increaseTrackedResource(path1, TestApp1, usage1)
        if !result {
-               t.Fatalf("unable to increase tracked resource: queuepath %+q, 
app %s, res %v", hierarchy1, TestApp1, usage1)
+               t.Fatalf("unable to increase tracked resource: queuepath %s, 
app %s, res %v", path1, TestApp1, usage1)
        }
 
-       result = userTracker.increaseTrackedResource(hierarchy1, TestApp2, 
usage1)
+       result = userTracker.increaseTrackedResource(path1, TestApp2, usage1)
        if !result {
-               t.Fatalf("unable to increase tracked resource: queuepath %+q, 
app %s, res %v", hierarchy1, TestApp2, usage1)
+               t.Fatalf("unable to increase tracked resource: queuepath %s, 
app %s, res %v", path1, TestApp2, usage1)
        }
-       userTracker.setLimits(hierarchy1, usage1, 1, false, false)
-       userTracker.setLimits(hierarchy5, usage1, 1, false, false)
+       path1expectedHeadroom := 
resources.NewResourceFromMap(map[string]resources.Quantity{
+               "mem":   20000000,
+               "vcore": 20000,
+       })
+       hierarchy1 := strings.Split(path1, configs.DOT)
+       assert.Assert(t, resources.Equals(userTracker.headroom(hierarchy1), 
path1expectedHeadroom))
+       path5expectedHeadroom := 
resources.NewResourceFromMap(map[string]resources.Quantity{
+               "mem":   70000000,
+               "vcore": 70000,
+       })
+       hierarchy5 := strings.Split(path5, configs.DOT)
+       assert.Assert(t, resources.Equals(userTracker.headroom(hierarchy5), 
path5expectedHeadroom))
+       assert.Assert(t, userTracker.canRunApp(hierarchy1, TestApp4))
+
+       // lower limits
+       userTracker.setLimits(path1, usage1, 1, false, false)
+       userTracker.setLimits(path5, resources.Multiply(usage1, 2), 1, false, 
false)
+       lowerChildHeadroom := 
resources.NewResourceFromMap(map[string]resources.Quantity{
+               "mem":   -20000000,
+               "vcore": -20000,
+       })
+       assert.Assert(t, resources.Equals(userTracker.headroom(hierarchy1), 
lowerChildHeadroom))
+       lowerParentHeadroom := 
resources.NewResourceFromMap(map[string]resources.Quantity{
+               "mem":   -10000000,
+               "vcore": -10000,
+       })
+       assert.Assert(t, resources.Equals(userTracker.headroom(hierarchy5), 
lowerParentHeadroom))
+       assert.Assert(t, !userTracker.canRunApp(hierarchy1, TestApp4))
+       assert.Assert(t, !userTracker.canRunApp(hierarchy5, TestApp4))
+
+       // clear limits
+       eventSystem.Reset()
+       userTracker.clearLimits(path1, true)
+       assert.Assert(t, resources.Equals(userTracker.headroom(hierarchy1), 
lowerParentHeadroom))
+       assert.Assert(t, resources.Equals(userTracker.headroom(hierarchy5), 
lowerParentHeadroom))
+       assert.Assert(t, !userTracker.canRunApp(hierarchy1, TestApp4))
+       assert.Assert(t, !userTracker.canRunApp(hierarchy5, TestApp4))
+       userTracker.clearLimits(path5, true)
+       assert.Assert(t, userTracker.headroom(hierarchy1) == nil)
+       assert.Assert(t, userTracker.headroom(hierarchy5) == nil)
+       assert.Assert(t, userTracker.canRunApp(hierarchy1, TestApp4))
+       assert.Assert(t, userTracker.canRunApp(hierarchy5, TestApp4))
+       assert.Equal(t, 2, len(eventSystem.Events))
+       assert.Equal(t, si.EventRecord_REMOVE, 
eventSystem.Events[0].EventChangeType)
+       assert.Equal(t, si.EventRecord_UG_USER_LIMIT, 
eventSystem.Events[0].EventChangeDetail)
+       assert.Equal(t, si.EventRecord_REMOVE, 
eventSystem.Events[1].EventChangeType)
+       assert.Equal(t, si.EventRecord_UG_USER_LIMIT, 
eventSystem.Events[1].EventChangeDetail)
 }
 
 func getUserResource(ut *UserTracker) map[string]*resources.Resource {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to